• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

butlergroup / rust-libp2p / 18610913338

18 Oct 2025 04:41AM UTC coverage: 78.379% (+2.5%) from 75.842%
18610913338

push

github

butlergroup
	modified:   .github/workflows/ci.yml

36944 of 47135 relevant lines covered (78.38%)

37728.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.94
/protocols/dcutr/src/handler/relayed.rs
1
// Copyright 2021 Protocol Labs.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a
4
// copy of this software and associated documentation files (the "Software"),
5
// to deal in the Software without restriction, including without limitation
6
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7
// and/or sell copies of the Software, and to permit persons to whom the
8
// Software is furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in
11
// all copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19
// DEALINGS IN THE SOFTWARE.
20

21
//! [`ConnectionHandler`] handling relayed connection potentially upgraded to a direct connection.
22

23
use std::{
24
    collections::VecDeque,
25
    io,
26
    task::{Context, Poll},
27
    time::Duration,
28
};
29

30
use either::Either;
31
use futures::future;
32
use libp2p_core::{
33
    multiaddr::Multiaddr,
34
    upgrade::{DeniedUpgrade, ReadyUpgrade},
35
    ConnectedPoint,
36
};
37
use libp2p_swarm::{
38
    handler::{
39
        ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
40
        ListenUpgradeError,
41
    },
42
    ConnectionHandler, ConnectionHandlerEvent, StreamProtocol, StreamUpgradeError,
43
    SubstreamProtocol,
44
};
45
use protocol::{inbound, outbound};
46

47
use crate::{behaviour::MAX_NUMBER_OF_UPGRADE_ATTEMPTS, protocol, PROTOCOL_NAME};
48

49
#[derive(Debug)]
50
pub enum Command {
51
    Connect,
52
}
53

54
#[derive(Debug)]
55
pub enum Event {
56
    InboundConnectNegotiated { remote_addrs: Vec<Multiaddr> },
57
    OutboundConnectNegotiated { remote_addrs: Vec<Multiaddr> },
58
    InboundConnectFailed { error: inbound::Error },
59
    OutboundConnectFailed { error: outbound::Error },
60
}
61

62
pub struct Handler {
63
    endpoint: ConnectedPoint,
64
    /// Queue of events to return when polled.
65
    queued_events: VecDeque<
66
        ConnectionHandlerEvent<
67
            <Self as ConnectionHandler>::OutboundProtocol,
68
            (),
69
            <Self as ConnectionHandler>::ToBehaviour,
70
        >,
71
    >,
72

73
    // Inbound DCUtR handshakes
74
    inbound_stream: futures_bounded::FuturesSet<Result<Vec<Multiaddr>, inbound::Error>>,
75

76
    // Outbound DCUtR handshake.
77
    outbound_stream: futures_bounded::FuturesSet<Result<Vec<Multiaddr>, outbound::Error>>,
78

79
    /// The addresses we will send to the other party for hole-punching attempts.
80
    holepunch_candidates: Vec<Multiaddr>,
81

82
    attempts: u8,
83
}
84

85
impl Handler {
86
    pub fn new(endpoint: ConnectedPoint, holepunch_candidates: Vec<Multiaddr>) -> Self {
2✔
87
        Self {
2✔
88
            endpoint,
2✔
89
            queued_events: Default::default(),
2✔
90
            inbound_stream: futures_bounded::FuturesSet::new(Duration::from_secs(10), 1),
2✔
91
            outbound_stream: futures_bounded::FuturesSet::new(Duration::from_secs(10), 1),
2✔
92
            holepunch_candidates,
2✔
93
            attempts: 0,
2✔
94
        }
2✔
95
    }
2✔
96

97
    fn on_fully_negotiated_inbound(
1✔
98
        &mut self,
1✔
99
        FullyNegotiatedInbound {
1✔
100
            protocol: output, ..
1✔
101
        }: FullyNegotiatedInbound<<Self as ConnectionHandler>::InboundProtocol>,
1✔
102
    ) {
1✔
103
        match output {
1✔
104
            future::Either::Left(stream) => {
1✔
105
                if self
1✔
106
                    .inbound_stream
1✔
107
                    .try_push(inbound::handshake(
1✔
108
                        stream,
1✔
109
                        self.holepunch_candidates.clone(),
1✔
110
                    ))
1✔
111
                    .is_err()
1✔
112
                {
113
                    tracing::warn!(
×
114
                        "New inbound connect stream while still upgrading previous one. Replacing previous with new.",
×
115
                    );
116
                }
1✔
117
                self.attempts += 1;
1✔
118
            }
119
            // A connection listener denies all incoming substreams, thus none can ever be fully
120
            // negotiated.
121
            future::Either::Right(output) => libp2p_core::util::unreachable(output),
122
        }
123
    }
1✔
124

125
    fn on_fully_negotiated_outbound(
1✔
126
        &mut self,
1✔
127
        FullyNegotiatedOutbound {
1✔
128
            protocol: stream, ..
1✔
129
        }: FullyNegotiatedOutbound<<Self as ConnectionHandler>::OutboundProtocol>,
1✔
130
    ) {
1✔
131
        assert!(
1✔
132
            self.endpoint.is_listener(),
1✔
133
            "A connection dialer never initiates a connection upgrade."
×
134
        );
135
        if self
1✔
136
            .outbound_stream
1✔
137
            .try_push(outbound::handshake(
1✔
138
                stream,
1✔
139
                self.holepunch_candidates.clone(),
1✔
140
            ))
1✔
141
            .is_err()
1✔
142
        {
143
            tracing::warn!(
×
144
                "New outbound connect stream while still upgrading previous one. Replacing previous with new.",
×
145
            );
146
        }
1✔
147
    }
1✔
148

149
    fn on_listen_upgrade_error(
×
150
        &mut self,
×
151
        ListenUpgradeError { error, .. }: ListenUpgradeError<
×
152
            (),
×
153
            <Self as ConnectionHandler>::InboundProtocol,
×
154
        >,
×
155
    ) {
×
156
        libp2p_core::util::unreachable(error.into_inner());
×
157
    }
158

159
    fn on_dial_upgrade_error(
×
160
        &mut self,
×
161
        DialUpgradeError { error, .. }: DialUpgradeError<
×
162
            (),
×
163
            <Self as ConnectionHandler>::OutboundProtocol,
×
164
        >,
×
165
    ) {
×
166
        let error = match error {
×
167
            StreamUpgradeError::Apply(v) => libp2p_core::util::unreachable(v),
×
168
            StreamUpgradeError::NegotiationFailed => outbound::Error::Unsupported,
×
169
            StreamUpgradeError::Io(e) => outbound::Error::Io(e),
×
170
            StreamUpgradeError::Timeout => outbound::Error::Io(io::ErrorKind::TimedOut.into()),
×
171
        };
172

173
        self.queued_events
×
174
            .push_back(ConnectionHandlerEvent::NotifyBehaviour(
×
175
                Event::OutboundConnectFailed { error },
×
176
            ))
×
177
    }
×
178
}
179

180
impl ConnectionHandler for Handler {
181
    type FromBehaviour = Command;
182
    type ToBehaviour = Event;
183
    type InboundProtocol = Either<ReadyUpgrade<StreamProtocol>, DeniedUpgrade>;
184
    type OutboundProtocol = ReadyUpgrade<StreamProtocol>;
185
    type OutboundOpenInfo = ();
186
    type InboundOpenInfo = ();
187

188
    fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
27✔
189
        match self.endpoint {
27✔
190
            ConnectedPoint::Dialer { .. } => {
191
                SubstreamProtocol::new(Either::Left(ReadyUpgrade::new(PROTOCOL_NAME)), ())
13✔
192
            }
193
            ConnectedPoint::Listener { .. } => {
194
                // By the protocol specification the listening side of a relayed connection
195
                // initiates the _direct connection upgrade_. In other words the listening side of
196
                // the relayed connection opens a substream to the dialing side. (Connection roles
197
                // and substream roles are reversed.) The listening side on a relayed connection
198
                // never expects incoming substreams, hence the denied upgrade below.
199
                SubstreamProtocol::new(Either::Right(DeniedUpgrade), ())
14✔
200
            }
201
        }
202
    }
27✔
203

204
    fn on_behaviour_event(&mut self, event: Self::FromBehaviour) {
1✔
205
        match event {
1✔
206
            Command::Connect => {
1✔
207
                self.queued_events
1✔
208
                    .push_back(ConnectionHandlerEvent::OutboundSubstreamRequest {
1✔
209
                        protocol: SubstreamProtocol::new(ReadyUpgrade::new(PROTOCOL_NAME), ()),
1✔
210
                    });
1✔
211
                self.attempts += 1;
1✔
212
            }
1✔
213
        }
214
    }
1✔
215

216
    fn connection_keep_alive(&self) -> bool {
6✔
217
        if self.attempts < MAX_NUMBER_OF_UPGRADE_ATTEMPTS {
6✔
218
            return true;
6✔
219
        }
×
220

221
        false
×
222
    }
6✔
223

224
    #[tracing::instrument(level = "trace", name = "ConnectionHandler::poll", skip(self, cx))]
225
    fn poll(
43✔
226
        &mut self,
43✔
227
        cx: &mut Context<'_>,
43✔
228
    ) -> Poll<ConnectionHandlerEvent<Self::OutboundProtocol, (), Self::ToBehaviour>> {
43✔
229
        // Return queued events.
230
        if let Some(event) = self.queued_events.pop_front() {
43✔
231
            return Poll::Ready(event);
1✔
232
        }
42✔
233

234
        match self.inbound_stream.poll_unpin(cx) {
42✔
235
            Poll::Ready(Ok(Ok(addresses))) => {
1✔
236
                return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
1✔
237
                    Event::InboundConnectNegotiated {
1✔
238
                        remote_addrs: addresses,
1✔
239
                    },
1✔
240
                ))
1✔
241
            }
242
            Poll::Ready(Ok(Err(error))) => {
×
243
                return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
×
244
                    Event::InboundConnectFailed { error },
×
245
                ))
×
246
            }
247
            Poll::Ready(Err(futures_bounded::Timeout { .. })) => {
248
                return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
×
249
                    Event::InboundConnectFailed {
×
250
                        error: inbound::Error::Io(io::ErrorKind::TimedOut.into()),
×
251
                    },
×
252
                ))
×
253
            }
254
            Poll::Pending => {}
41✔
255
        }
256

257
        match self.outbound_stream.poll_unpin(cx) {
41✔
258
            Poll::Ready(Ok(Ok(addresses))) => {
1✔
259
                return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
1✔
260
                    Event::OutboundConnectNegotiated {
1✔
261
                        remote_addrs: addresses,
1✔
262
                    },
1✔
263
                ))
1✔
264
            }
265
            Poll::Ready(Ok(Err(error))) => {
×
266
                return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
×
267
                    Event::OutboundConnectFailed { error },
×
268
                ))
×
269
            }
270
            Poll::Ready(Err(futures_bounded::Timeout { .. })) => {
271
                return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
×
272
                    Event::OutboundConnectFailed {
×
273
                        error: outbound::Error::Io(io::ErrorKind::TimedOut.into()),
×
274
                    },
×
275
                ))
×
276
            }
277
            Poll::Pending => {}
40✔
278
        }
279

280
        Poll::Pending
40✔
281
    }
43✔
282

283
    fn on_connection_event(
6✔
284
        &mut self,
6✔
285
        event: ConnectionEvent<Self::InboundProtocol, Self::OutboundProtocol>,
6✔
286
    ) {
6✔
287
        match event {
6✔
288
            ConnectionEvent::FullyNegotiatedInbound(fully_negotiated_inbound) => {
1✔
289
                self.on_fully_negotiated_inbound(fully_negotiated_inbound)
1✔
290
            }
291
            ConnectionEvent::FullyNegotiatedOutbound(fully_negotiated_outbound) => {
1✔
292
                self.on_fully_negotiated_outbound(fully_negotiated_outbound)
1✔
293
            }
294
            ConnectionEvent::ListenUpgradeError(listen_upgrade_error) => {
×
295
                self.on_listen_upgrade_error(listen_upgrade_error)
×
296
            }
297
            ConnectionEvent::DialUpgradeError(dial_upgrade_error) => {
×
298
                self.on_dial_upgrade_error(dial_upgrade_error)
×
299
            }
300
            _ => {}
4✔
301
        }
302
    }
6✔
303
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc