• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

butlergroup / rust-libp2p / 18610913338

18 Oct 2025 04:41AM UTC coverage: 78.379% (+2.5%) from 75.842%
18610913338

push

github

butlergroup
	modified:   .github/workflows/ci.yml

36944 of 47135 relevant lines covered (78.38%)

37728.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.16
/swarm/src/connection/pool/task.rs
1
// Copyright 2021 Protocol Labs.
2
// Copyright 2018 Parity Technologies (UK) Ltd.
3
//
4
// Permission is hereby granted, free of charge, to any person obtaining a
5
// copy of this software and associated documentation files (the "Software"),
6
// to deal in the Software without restriction, including without limitation
7
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
8
// and/or sell copies of the Software, and to permit persons to whom the
9
// Software is furnished to do so, subject to the following conditions:
10
//
11
// The above copyright notice and this permission notice shall be included in
12
// all copies or substantial portions of the Software.
13
//
14
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
20
// DEALINGS IN THE SOFTWARE.
21

22
//! Async functions driving pending and established connections in the form of a task.
23

24
use std::{convert::Infallible, pin::Pin};
25

26
use futures::{
27
    channel::{mpsc, oneshot},
28
    future::{poll_fn, Either, Future},
29
    SinkExt, StreamExt,
30
};
31
use libp2p_core::muxing::StreamMuxerBox;
32

33
use super::concurrent_dial::ConcurrentDial;
34
use crate::{
35
    connection::{
36
        self, ConnectionError, ConnectionId, PendingInboundConnectionError,
37
        PendingOutboundConnectionError,
38
    },
39
    transport::TransportError,
40
    ConnectionHandler, Multiaddr, PeerId,
41
};
42

43
/// Commands that can be sent to a task driving an established connection.
44
#[derive(Debug)]
45
pub(crate) enum Command<T> {
46
    /// Notify the connection handler of an event.
47
    NotifyHandler(T),
48
    /// Gracefully close the connection (active close) before
49
    /// terminating the task.
50
    Close,
51
}
52

53
pub(crate) enum PendingConnectionEvent {
54
    ConnectionEstablished {
55
        id: ConnectionId,
56
        output: (PeerId, StreamMuxerBox),
57
        /// [`Some`] when the new connection is an outgoing connection.
58
        /// Addresses are dialed in parallel. Contains the addresses and errors
59
        /// of dial attempts that failed before the one successful dial.
60
        outgoing: Option<(Multiaddr, Vec<(Multiaddr, TransportError<std::io::Error>)>)>,
61
    },
62
    /// A pending connection failed.
63
    PendingFailed {
64
        id: ConnectionId,
65
        error: Either<PendingOutboundConnectionError, PendingInboundConnectionError>,
66
    },
67
}
68

69
#[derive(Debug)]
70
pub(crate) enum EstablishedConnectionEvent<ToBehaviour> {
71
    /// A node we are connected to has changed its address.
72
    AddressChange {
73
        id: ConnectionId,
74
        peer_id: PeerId,
75
        new_address: Multiaddr,
76
    },
77
    /// Notify the manager of an event from the connection.
78
    Notify {
79
        id: ConnectionId,
80
        peer_id: PeerId,
81
        event: ToBehaviour,
82
    },
83
    /// A connection closed, possibly due to an error.
84
    ///
85
    /// If `error` is `None`, the connection has completed
86
    /// an active orderly close.
87
    Closed {
88
        id: ConnectionId,
89
        peer_id: PeerId,
90
        error: Option<ConnectionError>,
91
    },
92
}
93

94
pub(crate) async fn new_for_pending_outgoing_connection(
28,306✔
95
    connection_id: ConnectionId,
28,306✔
96
    dial: ConcurrentDial,
28,306✔
97
    abort_receiver: oneshot::Receiver<Infallible>,
28,306✔
98
    mut events: mpsc::Sender<PendingConnectionEvent>,
28,306✔
99
) {
28,306✔
100
    match futures::future::select(abort_receiver, Box::pin(dial)).await {
28,267✔
101
        Either::Left((Err(oneshot::Canceled), _)) => {
102
            let _ = events
1,115✔
103
                .send(PendingConnectionEvent::PendingFailed {
1,115✔
104
                    id: connection_id,
1,115✔
105
                    error: Either::Left(PendingOutboundConnectionError::Aborted),
1,115✔
106
                })
1,115✔
107
                .await;
1,115✔
108
        }
109
        Either::Left((Ok(v), _)) => libp2p_core::util::unreachable(v),
110
        Either::Right((Ok((address, output, errors)), _)) => {
25,852✔
111
            let _ = events
25,852✔
112
                .send(PendingConnectionEvent::ConnectionEstablished {
25,852✔
113
                    id: connection_id,
25,852✔
114
                    output,
25,852✔
115
                    outgoing: Some((address, errors)),
25,852✔
116
                })
25,852✔
117
                .await;
25,852✔
118
        }
119
        Either::Right((Err(e), _)) => {
350✔
120
            let _ = events
350✔
121
                .send(PendingConnectionEvent::PendingFailed {
350✔
122
                    id: connection_id,
350✔
123
                    error: Either::Left(PendingOutboundConnectionError::Transport(e)),
350✔
124
                })
350✔
125
                .await;
350✔
126
        }
127
    }
128
}
27,056✔
129

130
pub(crate) async fn new_for_pending_incoming_connection<TFut>(
15,676✔
131
    connection_id: ConnectionId,
15,676✔
132
    future: TFut,
15,676✔
133
    abort_receiver: oneshot::Receiver<Infallible>,
15,676✔
134
    mut events: mpsc::Sender<PendingConnectionEvent>,
15,676✔
135
) where
15,676✔
136
    TFut: Future<Output = Result<(PeerId, StreamMuxerBox), std::io::Error>> + Send + 'static,
15,676✔
137
{
15,664✔
138
    match futures::future::select(abort_receiver, Box::pin(future)).await {
15,664✔
139
        Either::Left((Err(oneshot::Canceled), _)) => {
140
            let _ = events
219✔
141
                .send(PendingConnectionEvent::PendingFailed {
219✔
142
                    id: connection_id,
219✔
143
                    error: Either::Right(PendingInboundConnectionError::Aborted),
219✔
144
                })
219✔
145
                .await;
219✔
146
        }
147
        Either::Left((Ok(v), _)) => libp2p_core::util::unreachable(v),
148
        Either::Right((Ok(output), _)) => {
14,895✔
149
            let _ = events
14,895✔
150
                .send(PendingConnectionEvent::ConnectionEstablished {
14,895✔
151
                    id: connection_id,
14,895✔
152
                    output,
14,895✔
153
                    outgoing: None,
14,895✔
154
                })
14,895✔
155
                .await;
14,895✔
156
        }
157
        Either::Right((Err(e), _)) => {
159✔
158
            let _ = events
159✔
159
                .send(PendingConnectionEvent::PendingFailed {
159✔
160
                    id: connection_id,
159✔
161
                    error: Either::Right(PendingInboundConnectionError::Transport(
159✔
162
                        TransportError::Other(e),
159✔
163
                    )),
159✔
164
                })
159✔
165
                .await;
159✔
166
        }
167
    }
168
}
15,080✔
169

170
pub(crate) async fn new_for_established_connection<THandler>(
28,960✔
171
    connection_id: ConnectionId,
28,960✔
172
    peer_id: PeerId,
28,960✔
173
    mut connection: crate::connection::Connection<THandler>,
28,960✔
174
    mut command_receiver: mpsc::Receiver<Command<THandler::FromBehaviour>>,
28,960✔
175
    mut events: mpsc::Sender<EstablishedConnectionEvent<THandler::ToBehaviour>>,
28,960✔
176
) where
28,960✔
177
    THandler: ConnectionHandler,
28,960✔
178
{
28,713✔
179
    loop {
180
        match futures::future::select(
109,642✔
181
            command_receiver.next(),
109,642✔
182
            poll_fn(|cx| Pin::new(&mut connection).poll(cx)),
289,971✔
183
        )
184
        .await
109,642✔
185
        {
186
            Either::Left((Some(command), _)) => match command {
24,264✔
187
                Command::NotifyHandler(event) => connection.on_behaviour_event(event),
24,234✔
188
                Command::Close => {
189
                    command_receiver.close();
30✔
190
                    let (remaining_events, closing_muxer) = connection.close();
30✔
191

192
                    let _ = events
30✔
193
                        .send_all(&mut remaining_events.map(|event| {
32✔
194
                            Ok(EstablishedConnectionEvent::Notify {
3✔
195
                                id: connection_id,
3✔
196
                                event,
3✔
197
                                peer_id,
3✔
198
                            })
3✔
199
                        }))
3✔
200
                        .await;
30✔
201

202
                    let error = closing_muxer.await.err().map(ConnectionError::IO);
30✔
203

204
                    let _ = events
30✔
205
                        .send(EstablishedConnectionEvent::Closed {
30✔
206
                            id: connection_id,
30✔
207
                            peer_id,
30✔
208
                            error,
30✔
209
                        })
30✔
210
                        .await;
30✔
211
                    return;
30✔
212
                }
213
            },
214

215
            // The manager has disappeared; abort.
216
            Either::Left((None, _)) => return,
9,691✔
217

218
            Either::Right((event, _)) => {
58,980✔
219
                match event {
56,695✔
220
                    Ok(connection::Event::Handler(event)) => {
56,695✔
221
                        let _ = events
56,695✔
222
                            .send(EstablishedConnectionEvent::Notify {
56,695✔
223
                                id: connection_id,
56,695✔
224
                                peer_id,
56,695✔
225
                                event,
56,695✔
226
                            })
56,695✔
227
                            .await;
56,695✔
228
                    }
229
                    Ok(connection::Event::AddressChange(new_address)) => {
×
230
                        let _ = events
×
231
                            .send(EstablishedConnectionEvent::AddressChange {
×
232
                                id: connection_id,
×
233
                                peer_id,
×
234
                                new_address,
×
235
                            })
×
236
                            .await;
×
237
                    }
238
                    Err(error) => {
2,285✔
239
                        command_receiver.close();
2,285✔
240
                        let (remaining_events, _closing_muxer) = connection.close();
2,285✔
241

242
                        let _ = events
2,285✔
243
                            .send_all(&mut remaining_events.map(|event| {
2,287✔
244
                                Ok(EstablishedConnectionEvent::Notify {
3✔
245
                                    id: connection_id,
3✔
246
                                    event,
3✔
247
                                    peer_id,
3✔
248
                                })
3✔
249
                            }))
3✔
250
                            .await;
2,285✔
251

252
                        // Terminate the task with the error, dropping the connection.
253
                        let _ = events
2,285✔
254
                            .send(EstablishedConnectionEvent::Closed {
2,285✔
255
                                id: connection_id,
2,285✔
256
                                peer_id,
2,285✔
257
                                error: Some(error),
2,285✔
258
                            })
2,285✔
259
                            .await;
2,285✔
260
                        return;
2,285✔
261
                    }
262
                }
263
            }
264
        }
265
    }
266
}
12,006✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc