• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Neptune-Crypto / neptune-core / 15025629296

14 May 2025 04:07PM UTC coverage: 71.734% (-0.01%) from 71.744%
15025629296

Pull #587

github

web-flow
Merge f639d7c98 into 825a3b2f8
Pull Request #587: Thv/benchmark wallet state update

4 of 4 new or added lines in 4 files covered. (100.0%)

13 existing lines in 2 files now uncovered.

19983 of 27857 relevant lines covered (71.73%)

382537.19 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.79
/src/connect_to_peers.rs
1
use std::fmt::Debug;
2
use std::net::SocketAddr;
3
use std::time::SystemTime;
4

5
use anyhow::bail;
6
use anyhow::ensure;
7
use anyhow::Result;
8
use futures::FutureExt;
9
use futures::SinkExt;
10
use futures::TryStreamExt;
11
use tokio::io::AsyncRead;
12
use tokio::io::AsyncWrite;
13
use tokio::sync::broadcast;
14
use tokio::sync::mpsc;
15
use tokio_serde::formats::Bincode;
16
use tokio_serde::formats::SymmetricalBincode;
17
use tokio_serde::SymmetricallyFramed;
18
use tokio_util::codec::Framed;
19
use tokio_util::codec::LengthDelimitedCodec;
20
use tracing::debug;
21
use tracing::error;
22
use tracing::info;
23
use tracing::warn;
24

25
use crate::models::channel::MainToPeerTask;
26
use crate::models::channel::PeerTaskToMain;
27
use crate::models::peer::ConnectionRefusedReason;
28
use crate::models::peer::InternalConnectionStatus;
29
use crate::models::peer::NegativePeerSanction;
30
use crate::models::peer::PeerMessage;
31
use crate::models::peer::PeerSanction;
32
use crate::models::peer::PeerStanding;
33
use crate::models::peer::TransferConnectionStatus;
34
use crate::models::state::GlobalStateLock;
35
use crate::peer_loop::PeerLoopHandler;
36
use crate::HandshakeData;
37
use crate::MAGIC_STRING_REQUEST;
38
use crate::MAGIC_STRING_RESPONSE;
39

40
// Max peer message size is 500MB. Should be enough to send 250 blocks in a
41
// block batch-response.
42
pub const MAX_PEER_FRAME_LENGTH_IN_BYTES: usize = 500 * 1024 * 1024;
43

44
/// Use this function to ensure that the same rules apply for both
45
/// ingoing and outgoing connections. This limits the size of messages
46
/// peers can send.
47
fn get_codec_rules() -> LengthDelimitedCodec {
16✔
48
    let mut codec_rules = LengthDelimitedCodec::new();
16✔
49
    codec_rules.set_max_frame_length(MAX_PEER_FRAME_LENGTH_IN_BYTES);
16✔
50
    codec_rules
16✔
51
}
16✔
52

53
/// Returns true iff version numbers are compatible. Returns false otherwise.
54
///
55
/// # Panics
56
///
57
/// panics if own version could not be parsed.
58
fn versions_are_compatible(own_version: &str, other_version: &str) -> bool {
85✔
59
    let own_version = semver::Version::parse(own_version)
85✔
60
        .unwrap_or_else(|_| panic!("Must be able to parse own version string. Got: {own_version}"));
85✔
61
    let other_version = match semver::Version::parse(other_version) {
85✔
62
        Ok(version) => version,
83✔
63
        Err(err) => {
2✔
64
            warn!("Peer version is not a valid semver version. Got error: {err}",);
2✔
65
            return false;
2✔
66
        }
67
    };
68

69
    // All alphanet and betanet versions are incompatible with each other.
70
    // Alpha and betanet have versions "0.0.n". Alpha and betanet are
71
    // incompatible with all other versions.
72
    if own_version.major == 0 && own_version.minor == 0
83✔
73
        || other_version.major == 0 && other_version.minor == 0
82✔
74
    {
75
        return own_version == other_version;
1✔
76
    }
82✔
77

78
    true
82✔
79
}
85✔
80

81
/// Check if connection is allowed. Used for both ingoing and outgoing connections.
82
///
83
/// Locking:
84
///   * acquires `global_state_lock` for read
85
async fn check_if_connection_is_allowed(
28✔
86
    global_state_lock: GlobalStateLock,
28✔
87
    own_handshake: &HandshakeData,
28✔
88
    other_handshake: &HandshakeData,
28✔
89
    peer_address: &SocketAddr,
28✔
90
) -> InternalConnectionStatus {
28✔
91
    let cli_arguments = global_state_lock.cli();
28✔
92
    let global_state = global_state_lock.lock_guard().await;
28✔
93

94
    // Disallow connection if peer is banned via CLI arguments
95
    if cli_arguments.ban.contains(&peer_address.ip()) {
28✔
96
        let ip = peer_address.ip();
1✔
97
        warn!("Peer {ip}, banned via CLI argument, attempted to connect. Disallowing.");
1✔
98
        return InternalConnectionStatus::Refused(ConnectionRefusedReason::BadStanding);
1✔
99
    }
27✔
100

101
    // Disallow connection if peer is in bad standing
102
    let standing = global_state
27✔
103
        .net
27✔
104
        .get_peer_standing_from_database(peer_address.ip())
27✔
105
        .await;
27✔
106

107
    if standing.is_some_and(|s| s.is_bad()) {
27✔
108
        let ip = peer_address.ip();
2✔
109
        warn!("Peer {ip}, banned because of bad standing, attempted to connect. Disallowing.");
2✔
110
        return InternalConnectionStatus::Refused(ConnectionRefusedReason::BadStanding);
2✔
111
    }
25✔
112

113
    if let Some(time) = global_state
25✔
114
        .net
25✔
115
        .last_disconnection_time_of_peer(other_handshake.instance_id)
25✔
116
    {
117
        if SystemTime::now()
2✔
118
            .duration_since(time)
2✔
119
            .is_ok_and(|d| d < cli_arguments.reconnect_cooldown)
2✔
120
        {
121
            info!(
1✔
122
                "Refusing connection with {peer_address} \
×
123
                 due to reconnect cooldown ({cooldown} seconds).",
×
124
                cooldown = cli_arguments.reconnect_cooldown.as_secs(),
×
125
            );
126

127
            // A “wrong” reason is given because of backwards compatibility.
128
            // todo: Use next breaking release to give a more accurate reason here.
129
            let reason = ConnectionRefusedReason::MaxPeerNumberExceeded;
1✔
130
            return InternalConnectionStatus::Refused(reason);
1✔
131
        }
1✔
132
    }
23✔
133

134
    // Disallow connection if max number of peers has been reached or
135
    // exceeded. There is another test in `answer_peer_inner` that precedes
136
    // this one; however this test is still necessary to resolve potential
137
    // race conditions.
138
    // Note that if we are bootstrapping, then we *do* want to accept the
139
    // connection and temporarily exceed the maximum. In this case a
140
    // `DisconnectFromLongestLivedPeer` message should have been sent to
141
    // the main loop already but that message need not have been processed by
142
    // the time we get here.
143
    if cli_arguments.max_num_peers <= global_state.net.peer_map.len() && !cli_arguments.bootstrap {
24✔
144
        return InternalConnectionStatus::Refused(ConnectionRefusedReason::MaxPeerNumberExceeded);
2✔
145
    }
22✔
146

147
    // Disallow connection to already connected peer.
148
    if global_state.net.peer_map.values().any(|peer| {
29✔
149
        peer.instance_id() == other_handshake.instance_id
29✔
150
            || *peer_address == peer.connected_address()
28✔
151
    }) {
29✔
152
        return InternalConnectionStatus::Refused(ConnectionRefusedReason::AlreadyConnected);
1✔
153
    }
21✔
154

155
    // Cap connections per IP, if specified.
156
    if let Some(max_connections_per_ip) = cli_arguments.max_connections_per_ip {
21✔
157
        let peer_ip = peer_address.ip();
6✔
158
        let num_connections_to_this_ip = global_state
6✔
159
            .net
6✔
160
            .peer_map
6✔
161
            .keys()
6✔
162
            .map(|x| x.ip())
15✔
163
            .filter(|ip| *ip == peer_ip)
15✔
164
            .count();
6✔
165
        if num_connections_to_this_ip >= max_connections_per_ip {
6✔
166
            return InternalConnectionStatus::Refused(
1✔
167
                ConnectionRefusedReason::MaxPeerNumberExceeded,
1✔
168
            );
1✔
169
        }
5✔
170
    }
15✔
171

172
    // Disallow connection to self
173
    if own_handshake.instance_id == other_handshake.instance_id {
20✔
174
        return InternalConnectionStatus::Refused(ConnectionRefusedReason::SelfConnect);
1✔
175
    }
19✔
176

177
    // Disallow connection if versions are incompatible
178
    if !versions_are_compatible(&own_handshake.version, &other_handshake.version) {
19✔
179
        warn!(
1✔
180
            "Attempting to connect to incompatible version. You might have to upgrade, or the other node does. Own version: {}, other version: {}",
×
181
            own_handshake.version,
182
            other_handshake.version);
183
        return InternalConnectionStatus::Refused(ConnectionRefusedReason::IncompatibleVersion);
1✔
184
    }
18✔
185

186
    // If this connection touches the maximum number of peer connections, say
187
    // so with special OK code.
188
    if cli_arguments.max_num_peers == global_state.net.peer_map.len() + 1 {
18✔
189
        info!("ConnectionStatus::Accepted, but max # connections is now reached");
1✔
190
        return InternalConnectionStatus::AcceptedMaxReached;
1✔
191
    }
17✔
192

193
    info!("ConnectionStatus::Accepted");
17✔
194
    InternalConnectionStatus::Accepted
17✔
195
}
28✔
196

197
/// Respond to an incoming connection initiation.
198
///
199
/// Catch and process errors (if any) gracefully.
200
///
201
/// All incoming connections from peers must go through this function.
202
pub(crate) async fn answer_peer<S>(
4✔
203
    stream: S,
4✔
204
    state_lock: GlobalStateLock,
4✔
205
    peer_address: std::net::SocketAddr,
4✔
206
    main_to_peer_task_rx: broadcast::Receiver<MainToPeerTask>,
4✔
207
    peer_task_to_main_tx: mpsc::Sender<PeerTaskToMain>,
4✔
208
    own_handshake_data: HandshakeData,
4✔
209
) -> Result<()>
4✔
210
where
4✔
211
    S: AsyncRead + AsyncWrite + std::fmt::Debug + std::marker::Unpin,
4✔
212
{
4✔
213
    let state_lock_clone = state_lock.clone();
4✔
214
    let peer_task_to_main_tx_clone = peer_task_to_main_tx.clone();
4✔
215
    let mut inner_ret: anyhow::Result<()> = Ok(());
4✔
216

217
    let panic_result = std::panic::AssertUnwindSafe(async {
4✔
218
        inner_ret = answer_peer_inner(
4✔
219
            stream,
4✔
220
            state_lock_clone,
4✔
221
            peer_address,
4✔
222
            main_to_peer_task_rx,
4✔
223
            peer_task_to_main_tx,
4✔
224
            own_handshake_data,
4✔
225
        )
4✔
226
        .await;
4✔
UNCOV
227
    })
×
228
    .catch_unwind()
4✔
229
    .await;
4✔
230

UNCOV
231
    match panic_result {
×
232
        Ok(_) => (),
×
UNCOV
233
        Err(_err) => {
×
UNCOV
234
            error!("Peer task (incoming) for {peer_address} panicked. Invoking close connection callback");
×
UNCOV
235
            close_peer_connected_callback(
×
UNCOV
236
                state_lock.clone(),
×
UNCOV
237
                peer_address,
×
UNCOV
238
                &peer_task_to_main_tx_clone,
×
UNCOV
239
            )
×
UNCOV
240
            .await;
×
241
        }
242
    }
243

244
    inner_ret
×
245
}
×
246

247
async fn answer_peer_inner<S>(
12✔
248
    stream: S,
12✔
249
    state: GlobalStateLock,
12✔
250
    peer_address: SocketAddr,
12✔
251
    main_to_peer_task_rx: broadcast::Receiver<MainToPeerTask>,
12✔
252
    peer_task_to_main_tx: mpsc::Sender<PeerTaskToMain>,
12✔
253
    own_handshake_data: HandshakeData,
12✔
254
) -> Result<()>
12✔
255
where
12✔
256
    S: AsyncRead + AsyncWrite + Debug + Unpin,
12✔
257
{
12✔
258
    info!("Established incoming TCP connection with {peer_address}");
12✔
259

260
    // Build the communication/serialization/frame handler
261
    let length_delimited = Framed::new(stream, get_codec_rules());
12✔
262
    let mut peer = SymmetricallyFramed::<
12✔
263
        Framed<S, LengthDelimitedCodec>,
12✔
264
        PeerMessage,
12✔
265
        Bincode<PeerMessage, PeerMessage>,
12✔
266
    >::new(length_delimited, SymmetricalBincode::default());
12✔
267

268
    // Complete Neptune handshake
269
    let Some(PeerMessage::Handshake(payload)) = peer.try_next().await? else {
12✔
270
        bail!("Didn't get handshake on connection attempt");
×
271
    };
272
    let (magic_string_request, peer_handshake_data) = *payload;
12✔
273
    ensure!(
12✔
274
        magic_string_request == MAGIC_STRING_REQUEST,
12✔
275
        "Expected magic value, got {magic_string_request:?}",
1✔
276
    );
277

278
    let handshake_response = Box::new((MAGIC_STRING_RESPONSE.to_vec(), own_handshake_data.clone()));
11✔
279
    peer.send(PeerMessage::Handshake(handshake_response))
11✔
280
        .await?;
11✔
281

282
    // Verify peer network before moving on
283
    let peer_network = peer_handshake_data.network;
11✔
284
    let own_network = own_handshake_data.network;
11✔
285
    ensure!(
11✔
286
        peer_network == own_network,
11✔
287
        "Cannot connect with {peer_address}: \
2✔
288
        Peer runs {peer_network}, this client runs {own_network}."
2✔
289
    );
290

291
    // Check if incoming connection is allowed
292
    let connection_status = check_if_connection_is_allowed(
9✔
293
        state.clone(),
9✔
294
        &own_handshake_data,
9✔
295
        &peer_handshake_data,
9✔
296
        &peer_address,
9✔
297
    )
9✔
298
    .await;
9✔
299
    peer.send(PeerMessage::ConnectionStatus(connection_status.into()))
9✔
300
        .await?;
9✔
301
    if let InternalConnectionStatus::Refused(reason) = connection_status {
9✔
302
        let reason = format!("Refusing incoming connection. Reason: {reason:?}");
3✔
303
        warn!("{reason}");
3✔
304
        bail!("{reason}");
3✔
305
    }
6✔
306

307
    // Whether the incoming connection comes from a peer in bad standing is
308
    // checked in `check_if_connection_is_allowed`. So if we get here, we are
309
    // good to go.
310
    info!("Connection accepted from {peer_address}");
6✔
311

312
    // If necessary, disconnect from another, existing peer.
313
    if connection_status == InternalConnectionStatus::AcceptedMaxReached && state.cli().bootstrap {
6✔
314
        info!("Maximum # peers reached, so disconnecting from an existing peer.");
1✔
315
        peer_task_to_main_tx
1✔
316
            .send(PeerTaskToMain::DisconnectFromLongestLivedPeer)
1✔
317
            .await?;
1✔
318
    }
5✔
319

320
    let peer_distance = 1; // All incoming connections have distance 1
6✔
321
    let mut peer_loop_handler = PeerLoopHandler::new(
6✔
322
        peer_task_to_main_tx,
6✔
323
        state,
6✔
324
        peer_address,
6✔
325
        peer_handshake_data,
6✔
326
        true,
327
        peer_distance,
6✔
328
    );
329

330
    peer_loop_handler
6✔
331
        .run_wrapper(peer, main_to_peer_task_rx)
6✔
332
        .await?;
6✔
333

334
    Ok(())
2✔
335
}
8✔
336

337
/// Perform handshake and establish connection to a new peer while handling any
338
/// panics in the peer task gracefully.
339
///
340
/// All outgoing connections to peers must go through this function.
341
pub(crate) async fn call_peer(
6✔
342
    peer_address: std::net::SocketAddr,
6✔
343
    state: GlobalStateLock,
6✔
344
    main_to_peer_task_rx: broadcast::Receiver<MainToPeerTask>,
6✔
345
    peer_task_to_main_tx: mpsc::Sender<PeerTaskToMain>,
6✔
346
    own_handshake_data: HandshakeData,
6✔
347
    peer_distance: u8,
6✔
348
) {
6✔
349
    let state_clone = state.clone();
6✔
350
    let peer_task_to_main_tx_clone = peer_task_to_main_tx.clone();
6✔
351
    let panic_result = std::panic::AssertUnwindSafe(async {
6✔
352
        debug!("Attempting to initiate connection to {peer_address}");
6✔
353
        match tokio::net::TcpStream::connect(peer_address).await {
6✔
354
            Err(e) => {
3✔
355
                let msg = format!("Failed to establish TCP connection to {peer_address}: {e}");
3✔
356
                if peer_distance == 1 {
3✔
357
                    // outgoing connection to peer of distance 1 means user has
358
                    // requested a connection to this peer through CLI
359
                    // arguments, and should be warned if this fails.
360
                    warn!("{msg}");
3✔
361
                } else {
362
                    info!("{msg}");
×
363
                }
364
            }
365
            Ok(stream) => {
3✔
366
                match call_peer_inner(
3✔
367
                    stream,
3✔
368
                    state,
3✔
369
                    peer_address,
3✔
370
                    main_to_peer_task_rx,
3✔
371
                    peer_task_to_main_tx,
3✔
372
                    &own_handshake_data,
3✔
373
                    peer_distance,
3✔
374
                )
375
                .await
3✔
376
                {
377
                    Ok(()) => (),
×
378
                    Err(e) => {
×
379
                        let msg = format!("{e}. Failed to establish connection.");
×
380
                        // outgoing connection to peer of distance 1 means user has
381
                        // requested a connection to this peer through CLI
382
                        // arguments, and should be warned if this fails.
383
                        if peer_distance == 1 {
×
384
                            warn!("{msg}");
×
385
                        } else {
386
                            info!("{msg}");
×
387
                        }
388
                    }
389
                }
390
            }
391
        };
392

393
        info!("Connection to {peer_address} closing");
3✔
394
    })
3✔
395
    .catch_unwind()
6✔
396
    .await;
6✔
397

398
    match panic_result {
4✔
399
        Ok(_) => (),
3✔
400
        Err(_) => {
401
            error!("Peer task (outgoing) for {peer_address} panicked. Invoking close connection callback");
1✔
402
            close_peer_connected_callback(state_clone, peer_address, &peer_task_to_main_tx_clone)
1✔
403
                .await;
1✔
404
        }
405
    }
406
}
3✔
407

408
async fn call_peer_inner<S>(
4✔
409
    stream: S,
4✔
410
    state: GlobalStateLock,
4✔
411
    peer_address: std::net::SocketAddr,
4✔
412
    main_to_peer_task_rx: broadcast::Receiver<MainToPeerTask>,
4✔
413
    peer_task_to_main_tx: mpsc::Sender<PeerTaskToMain>,
4✔
414
    own_handshake: &HandshakeData,
4✔
415
    peer_distance: u8,
4✔
416
) -> Result<()>
4✔
417
where
4✔
418
    S: AsyncRead + AsyncWrite + Debug + Unpin,
4✔
419
{
4✔
420
    info!("Established outgoing TCP connection with {peer_address}");
4✔
421

422
    // Build the communication/serialization/frame handler
423
    let length_delimited = Framed::new(stream, get_codec_rules());
4✔
424
    let mut peer: tokio_serde::Framed<
4✔
425
        Framed<S, LengthDelimitedCodec>,
4✔
426
        PeerMessage,
4✔
427
        PeerMessage,
4✔
428
        Bincode<PeerMessage, PeerMessage>,
4✔
429
    > = SymmetricallyFramed::new(length_delimited, SymmetricalBincode::default());
4✔
430

431
    // Make Neptune handshake
432
    peer.send(PeerMessage::Handshake(Box::new((
4✔
433
        Vec::from(MAGIC_STRING_REQUEST),
4✔
434
        own_handshake.to_owned(),
4✔
435
    ))))
4✔
436
    .await?;
4✔
437
    debug!("Awaiting connection status response from {peer_address}");
4✔
438

439
    let Some(PeerMessage::Handshake(handshake_payload)) = peer.try_next().await? else {
4✔
440
        bail!("Didn't get handshake response from {peer_address}");
×
441
    };
442
    let (magic_string_response, other_handshake) = *handshake_payload;
4✔
443
    ensure!(
4✔
444
        magic_string_response == MAGIC_STRING_RESPONSE,
4✔
445
        "Didn't get expected magic value for handshake from {peer_address}",
×
446
    );
447

448
    debug!("Got correct magic value response from {peer_address}!");
4✔
449
    if other_handshake.network != own_handshake.network {
4✔
450
        let other = other_handshake.network;
×
451
        let own = own_handshake.network;
×
452
        bail!("Cannot connect with {peer_address}: Peer runs {other}, this client runs {own}.");
×
453
    }
4✔
454

455
    match peer.try_next().await? {
4✔
456
        Some(PeerMessage::ConnectionStatus(TransferConnectionStatus::Accepted)) => {
457
            info!("Outgoing connection accepted by {peer_address}");
4✔
458
        }
459
        Some(PeerMessage::ConnectionStatus(TransferConnectionStatus::Refused(reason))) => {
×
460
            bail!("Outgoing connection attempt to {peer_address} refused. Reason: {reason:?}");
×
461
        }
462
        _ => {
463
            bail!(
×
464
                "Got invalid connection status response from {peer_address} on outgoing connection"
×
465
            );
466
        }
467
    }
468

469
    // Peer accepted us. Check if we accept the peer. Note that the protocol does not stipulate
470
    // that we answer with a connection status here, so if the connection is *not* accepted, we
471
    // simply hang up but log the reason for the refusal.
472
    let connection_status = check_if_connection_is_allowed(
4✔
473
        state.clone(),
4✔
474
        own_handshake,
4✔
475
        &other_handshake,
4✔
476
        &peer_address,
4✔
477
    )
4✔
478
    .await;
4✔
479
    if let InternalConnectionStatus::Refused(refused_reason) = connection_status {
4✔
480
        warn!(
×
481
            "Outgoing connection to {peer_address} refused. Reason: {:?}\nNow hanging up.",
×
482
            refused_reason
483
        );
484
        peer.send(PeerMessage::Bye).await?;
×
485
        bail!("Attempted to connect to peer ({peer_address}) that was not allowed. This connection attempt should not have been made.");
×
486
    }
4✔
487

488
    // By default, start by asking the peer for its peers. In an adversarial
489
    // context, we want the network topology to be as robust as possible.
490
    // Blockchain data can be obtained from other peers, if this connection
491
    // fails.
492
    peer.send(PeerMessage::PeerListRequest).await?;
4✔
493

494
    let mut peer_loop_handler = PeerLoopHandler::new(
4✔
495
        peer_task_to_main_tx,
4✔
496
        state,
4✔
497
        peer_address,
4✔
498
        other_handshake,
4✔
499
        false,
500
        peer_distance,
4✔
501
    );
502
    peer_loop_handler
4✔
503
        .run_wrapper(peer, main_to_peer_task_rx)
4✔
504
        .await?;
4✔
505

506
    Ok(())
1✔
507
}
1✔
508

509
/// Remove peer from state. This function must be called every time
510
/// a peer is disconnected. Whether this happens through a panic
511
/// in the peer task or through a regular disconnect.
512
///
513
/// Locking:
514
///   * acquires `global_state_lock` for write
515
pub(crate) async fn close_peer_connected_callback(
33✔
516
    mut global_state_lock: GlobalStateLock,
33✔
517
    peer_address: SocketAddr,
33✔
518
    to_main_tx: &mpsc::Sender<PeerTaskToMain>,
33✔
519
) {
33✔
520
    let cli_arguments = global_state_lock.cli().clone();
33✔
521
    let mut global_state_mut = global_state_lock.lock_guard_mut().await;
33✔
522

523
    // Store any new peer-standing to database
524
    let peer_info_writeback = global_state_mut.net.peer_map.remove(&peer_address);
33✔
525
    let new_standing = if let Some(new) = peer_info_writeback {
33✔
526
        new.standing()
32✔
527
    } else {
528
        error!("Could not find peer standing for {peer_address}");
1✔
529
        let mut standing = PeerStanding::new(cli_arguments.peer_tolerance);
1✔
530
        let sanction = NegativePeerSanction::NoStandingFoundMaybeCrash;
1✔
531

532
        // Don't return early: _must_ send message to main loop at the end of this
533
        // function.
534
        // If the peer has now reached bad standing, the connection to it should be
535
        // dropped, which is currently happening anyway.
536
        let _ = standing.sanction(PeerSanction::Negative(sanction));
1✔
537
        standing
1✔
538
    };
539
    debug!("Fetched peer info standing {new_standing} for peer {peer_address}");
33✔
540

541
    global_state_mut
33✔
542
        .net
33✔
543
        .write_peer_standing_on_decrease(peer_address.ip(), new_standing)
33✔
544
        .await;
33✔
545
    drop(global_state_mut); // avoid holding across mpsc::Sender::send()
31✔
546
    debug!("Stored peer info standing {new_standing} for peer {peer_address}");
31✔
547

548
    // This message is used to determine if we are to exit synchronization mode
549
    to_main_tx
31✔
550
        .send(PeerTaskToMain::RemovePeerMaxBlockHeight(peer_address))
31✔
551
        .await
31✔
552
        .expect("channel to main task should exist");
31✔
553
}
31✔
554

555
#[cfg(test)]
556
#[cfg_attr(coverage_nightly, coverage(off))]
557
mod tests {
558
    use std::str::FromStr;
559
    use std::time::Duration;
560
    use std::time::SystemTime;
561

562
    use anyhow::bail;
563
    use anyhow::Result;
564
    use macro_rules_attr::apply;
565
    use tokio_test::io::Builder;
566
    use tracing_test::traced_test;
567
    use twenty_first::math::digest::Digest;
568

569
    use super::*;
570
    use crate::config_models::cli_args;
571
    use crate::config_models::network::Network;
572
    use crate::models::peer::handshake_data::VersionString;
573
    use crate::models::peer::peer_info::PeerInfo;
574
    use crate::models::peer::InternalConnectionStatus;
575
    use crate::models::peer::NegativePeerSanction;
576
    use crate::models::peer::PeerMessage;
577
    use crate::models::peer::PeerStanding;
578
    use crate::prelude::twenty_first;
579
    use crate::tests::shared::get_dummy_handshake_data_for_genesis;
580
    use crate::tests::shared::get_dummy_peer_connection_data_genesis;
581
    use crate::tests::shared::get_dummy_peer_incoming;
582
    use crate::tests::shared::get_dummy_socket_address;
583
    use crate::tests::shared::get_test_genesis_setup;
584
    use crate::tests::shared::to_bytes;
585
    use crate::tests::shared_tokio_runtime;
586
    use crate::MAGIC_STRING_REQUEST;
587
    use crate::MAGIC_STRING_RESPONSE;
588

589
    #[traced_test]
590
    #[apply(shared_tokio_runtime)]
591
    async fn test_outgoing_connection_succeed() -> Result<()> {
592
        let network = Network::Alpha;
593
        let other_handshake = get_dummy_handshake_data_for_genesis(network);
594
        let own_handshake = get_dummy_handshake_data_for_genesis(network);
595
        let mock = Builder::new()
596
            .write(&to_bytes(&PeerMessage::Handshake(Box::new((
597
                MAGIC_STRING_REQUEST.to_vec(),
598
                own_handshake.clone(),
599
            ))))?)
600
            .read(&to_bytes(&PeerMessage::Handshake(Box::new((
601
                MAGIC_STRING_RESPONSE.to_vec(),
602
                other_handshake,
603
            ))))?)
604
            .read(&to_bytes(&PeerMessage::ConnectionStatus(
605
                TransferConnectionStatus::Accepted,
606
            ))?)
607
            .write(&to_bytes(&PeerMessage::PeerListRequest)?)
608
            .read(&to_bytes(&PeerMessage::Bye)?)
609
            .build();
610

611
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, state, _hsd) =
612
            get_test_genesis_setup(Network::Alpha, 0, cli_args::Args::default()).await?;
613
        call_peer_inner(
614
            mock,
615
            state.clone(),
616
            get_dummy_socket_address(0),
617
            from_main_rx_clone,
618
            to_main_tx,
619
            &own_handshake,
620
            1,
621
        )
622
        .await?;
623

624
        // Verify that peer map is empty after connection has been closed
625
        match state.lock(|s| s.net.peer_map.keys().len()).await {
626
            0 => (),
627
            _ => bail!("Incorrect number of maps in peer map"),
628
        };
629

630
        Ok(())
631
    }
632

633
    #[test]
634
    fn malformed_version_from_peer_doesnt_crash() {
635
        let version_numbers = ["potato", "&&&&"];
636
        for b in version_numbers {
637
            assert!(!versions_are_compatible("0.1.0", b));
638
        }
639
    }
640

641
    #[test]
642
    fn versions_are_compatible_for_all_versions_above_0_1_0() {
643
        let version_numbers = [
644
            "0.1.0",
645
            "0.1.1",
646
            "0.1.99",
647
            "0.2.0",
648
            "1.2.0",
649
            "2.2.0",
650
            "3.2.0",
651
            "9999.99999.9999",
652
        ];
653
        for a in version_numbers {
654
            for b in version_numbers {
655
                assert!(versions_are_compatible(a, b));
656
            }
657
        }
658
    }
659

660
    #[traced_test]
661
    #[apply(shared_tokio_runtime)]
662
    async fn test_get_connection_status() -> Result<()> {
663
        let network = Network::Alpha;
664
        let (_, _, _, _, mut state_lock, own_handshake) =
665
            get_test_genesis_setup(network, 1, cli_args::Args::default()).await?;
666

667
        // Get an address for a peer that's not already connected
668
        let (other_handshake, peer_sa) = get_dummy_peer_connection_data_genesis(network, 1);
669

670
        let mut status = check_if_connection_is_allowed(
671
            state_lock.clone(),
672
            &own_handshake,
673
            &other_handshake,
674
            &peer_sa,
675
        )
676
        .await;
677
        assert_eq!(InternalConnectionStatus::Accepted, status);
678

679
        status = check_if_connection_is_allowed(
680
            state_lock.clone(),
681
            &own_handshake,
682
            &own_handshake,
683
            &peer_sa,
684
        )
685
        .await;
686
        assert_eq!(
687
            InternalConnectionStatus::Refused(ConnectionRefusedReason::SelfConnect),
688
            status,
689
        );
690

691
        // pretend --max_peers is 1.
692
        let mut cli = state_lock.cli().clone();
693
        cli.max_num_peers = 1;
694
        state_lock.set_cli(cli.clone()).await;
695

696
        status = check_if_connection_is_allowed(
697
            state_lock.clone(),
698
            &own_handshake,
699
            &other_handshake,
700
            &peer_sa,
701
        )
702
        .await;
703
        assert_eq!(
704
            InternalConnectionStatus::Refused(ConnectionRefusedReason::MaxPeerNumberExceeded),
705
            status,
706
        );
707

708
        // pretend --max-peers is 100
709
        cli.max_num_peers = 100;
710
        state_lock.set_cli(cli.clone()).await;
711

712
        // Attempt to connect to already connected peer
713
        let connected_peer: PeerInfo = state_lock
714
            .lock(|s| s.net.peer_map.values().collect::<Vec<_>>()[0].clone())
715
            .await;
716
        let mut mutated_other_handshake = other_handshake.clone();
717
        mutated_other_handshake.instance_id = connected_peer.instance_id();
718
        status = check_if_connection_is_allowed(
719
            state_lock.clone(),
720
            &own_handshake,
721
            &mutated_other_handshake,
722
            &peer_sa,
723
        )
724
        .await;
725
        assert_eq!(
726
            InternalConnectionStatus::Refused(ConnectionRefusedReason::AlreadyConnected),
727
            status,
728
        );
729

730
        // pretend --ban <peer_sa>
731
        cli.ban.push(peer_sa.ip());
732
        state_lock.set_cli(cli.clone()).await;
733

734
        // Verify that banned peers are rejected by this check
735
        // First check that peers can be banned by command-line arguments
736
        status = check_if_connection_is_allowed(
737
            state_lock.clone(),
738
            &own_handshake,
739
            &other_handshake,
740
            &peer_sa,
741
        )
742
        .await;
743
        assert_eq!(
744
            InternalConnectionStatus::Refused(ConnectionRefusedReason::BadStanding),
745
            status,
746
        );
747

748
        // pretend --ban ""
749
        cli.ban.pop();
750
        state_lock.set_cli(cli.clone()).await;
751

752
        status = check_if_connection_is_allowed(
753
            state_lock.clone(),
754
            &own_handshake,
755
            &other_handshake,
756
            &peer_sa,
757
        )
758
        .await;
759
        assert_eq!(InternalConnectionStatus::Accepted, status);
760

761
        // Then check that peers can be banned by bad behavior
762
        let bad_standing: PeerStanding = PeerStanding::init(
763
            i32::MIN,
764
            Some((
765
                NegativePeerSanction::InvalidBlock((7u64.into(), Digest::default())),
766
                SystemTime::now(),
767
            )),
768
            None,
769
            i32::from(cli.peer_tolerance),
770
        );
771

772
        state_lock
773
            .lock_guard_mut()
774
            .await
775
            .net
776
            .write_peer_standing_on_decrease(peer_sa.ip(), bad_standing)
777
            .await;
778

779
        status = check_if_connection_is_allowed(
780
            state_lock.clone(),
781
            &own_handshake,
782
            &other_handshake,
783
            &peer_sa,
784
        )
785
        .await;
786
        assert_eq!(
787
            InternalConnectionStatus::Refused(ConnectionRefusedReason::BadStanding),
788
            status,
789
        );
790

791
        Ok(())
792
    }
793

794
    #[traced_test]
795
    #[apply(shared_tokio_runtime)]
796
    async fn node_refuses_reconnects_within_disconnect_cooldown_period() -> Result<()> {
797
        let network = Network::Main;
798
        let reconnect_cooldown = Duration::from_secs(8);
799
        let args = cli_args::Args {
800
            network,
801
            reconnect_cooldown,
802
            ..Default::default()
803
        };
804

805
        let (broadcast_tx, _broadcast_rx, to_main_tx, _to_main_rx, mut state_lock, handshake) =
806
            get_test_genesis_setup(network, 0, args).await?;
807

808
        // fake a graceful disconnect
809
        let node_0_address = get_dummy_socket_address(0);
810
        let node_0_handshake_data = get_dummy_handshake_data_for_genesis(network);
811
        state_lock
812
            .lock_guard_mut()
813
            .await
814
            .net
815
            .register_peer_disconnection(node_0_handshake_data.instance_id, SystemTime::now());
816

817
        // check that an immediate reconnection attempt is rejected
818
        let handshake_request = PeerMessage::Handshake(Box::new((
819
            MAGIC_STRING_REQUEST.to_vec(),
820
            node_0_handshake_data,
821
        )));
822
        let handshake_response = PeerMessage::Handshake(Box::new((
823
            MAGIC_STRING_RESPONSE.to_vec(),
824
            handshake.clone(),
825
        )));
826
        let rejected_connection = Builder::new()
827
            .read(&to_bytes(&handshake_request)?)
828
            .write(&to_bytes(&handshake_response)?)
829
            .write(&to_bytes(&PeerMessage::ConnectionStatus(
830
                TransferConnectionStatus::Refused(ConnectionRefusedReason::MaxPeerNumberExceeded),
831
            ))?)
832
            .build();
833
        let err = answer_peer_inner(
834
            rejected_connection,
835
            state_lock.clone(),
836
            node_0_address,
837
            broadcast_tx.subscribe(),
838
            to_main_tx.clone(),
839
            handshake.clone(),
840
        )
841
        .await
842
        .unwrap_err();
843
        assert!(err.to_string().contains("Refusing incoming connection."));
844

845
        // check that a reconnection attempt after some time goes through
846
        let accepted_connection = Builder::new()
847
            .wait(reconnect_cooldown)
848
            .read(&to_bytes(&handshake_request)?)
849
            .write(&to_bytes(&handshake_response)?)
850
            .write(&to_bytes(&PeerMessage::ConnectionStatus(
851
                TransferConnectionStatus::Accepted,
852
            ))?)
853
            .read(&to_bytes(&PeerMessage::Bye)?)
854
            .build();
855
        answer_peer_inner(
856
            accepted_connection,
857
            state_lock,
858
            node_0_address,
859
            broadcast_tx.subscribe(),
860
            to_main_tx,
861
            handshake,
862
        )
863
        .await?;
864

865
        Ok(())
866
    }
867

868
    #[traced_test]
869
    #[apply(shared_tokio_runtime)]
870
    async fn test_incoming_connection_succeed() -> Result<()> {
871
        // This builds a mock object which expects to have a certain
872
        // sequence of methods called on it: First it expects to have
873
        // the `MAGIC_STRING_REQUEST` and then the `MAGIC_STRING_RESPONSE`
874
        // value written. This is followed by a read of the bye message,
875
        // as this is a way to close the connection by the peer initiating
876
        // the connection. If this sequence is not followed, the `mock`
877
        // object will panic, and the `await` operator will evaluate
878
        // to Error.
879
        let network = Network::Alpha;
880
        let other_handshake = get_dummy_handshake_data_for_genesis(network);
881
        let own_handshake = get_dummy_handshake_data_for_genesis(network);
882
        let mock = Builder::new()
883
            .read(&to_bytes(&PeerMessage::Handshake(Box::new((
884
                MAGIC_STRING_REQUEST.to_vec(),
885
                other_handshake,
886
            ))))?)
887
            .write(&to_bytes(&PeerMessage::Handshake(Box::new((
888
                MAGIC_STRING_RESPONSE.to_vec(),
889
                own_handshake.clone(),
890
            ))))?)
891
            .write(&to_bytes(&PeerMessage::ConnectionStatus(
892
                TransferConnectionStatus::Accepted,
893
            ))?)
894
            .read(&to_bytes(&PeerMessage::Bye)?)
895
            .build();
896
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, state_lock, _hsd) =
897
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
898
        answer_peer_inner(
899
            mock,
900
            state_lock.clone(),
901
            get_dummy_socket_address(0),
902
            from_main_rx_clone,
903
            to_main_tx,
904
            own_handshake,
905
        )
906
        .await?;
907

908
        // Verify that peer map is empty after connection has been closed
909
        match state_lock.lock(|s| s.net.peer_map.keys().len()).await {
910
            0 => (),
911
            _ => bail!("Incorrect number of maps in peer map"),
912
        };
913

914
        Ok(())
915
    }
916

917
    #[traced_test]
918
    #[apply(shared_tokio_runtime)]
919
    async fn test_incoming_connection_fail_bad_magic_value() -> Result<()> {
920
        let network = Network::Alpha;
921
        let other_handshake = get_dummy_handshake_data_for_genesis(network);
922
        let own_handshake = get_dummy_handshake_data_for_genesis(network);
923
        let mock = Builder::new()
924
            .read(&to_bytes(&PeerMessage::Handshake(Box::new((
925
                MAGIC_STRING_RESPONSE.to_vec(),
926
                other_handshake,
927
            ))))?)
928
            .build();
929

930
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, state, _hsd) =
931
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
932

933
        let answer = answer_peer_inner(
934
            mock,
935
            state,
936
            get_dummy_socket_address(0),
937
            from_main_rx_clone,
938
            to_main_tx,
939
            own_handshake,
940
        )
941
        .await;
942
        assert!(answer.is_err(), "expected bad magic value failure");
943

944
        Ok(())
945
    }
946

947
    #[traced_test]
948
    #[apply(shared_tokio_runtime)]
949
    async fn test_incoming_connection_fail_bad_network() -> Result<()> {
950
        let other_handshake = get_dummy_handshake_data_for_genesis(Network::Testnet);
951
        let own_handshake = get_dummy_handshake_data_for_genesis(Network::Alpha);
952
        let mock = Builder::new()
953
            .read(&to_bytes(&PeerMessage::Handshake(Box::new((
954
                MAGIC_STRING_REQUEST.to_vec(),
955
                other_handshake,
956
            ))))?)
957
            .write(&to_bytes(&PeerMessage::Handshake(Box::new((
958
                MAGIC_STRING_RESPONSE.to_vec(),
959
                own_handshake.clone(),
960
            ))))?)
961
            .build();
962

963
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, state, _hsd) =
964
            get_test_genesis_setup(Network::Alpha, 0, cli_args::Args::default()).await?;
965

966
        let answer = answer_peer_inner(
967
            mock,
968
            state,
969
            get_dummy_socket_address(0),
970
            from_main_rx_clone,
971
            to_main_tx,
972
            own_handshake,
973
        )
974
        .await;
975
        assert!(answer.is_err(), "bad network must result in error");
976

977
        Ok(())
978
    }
979

980
    #[traced_test]
981
    #[apply(shared_tokio_runtime)]
982
    async fn test_incoming_connection_fail_bad_version() {
983
        let mut other_handshake = get_dummy_handshake_data_for_genesis(Network::Testnet);
984
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, state_lock, _hsd) =
985
            get_test_genesis_setup(Network::Alpha, 0, cli_args::Args::default())
986
                .await
987
                .unwrap();
988
        let state = state_lock.lock_guard().await;
989
        let mut own_handshake = state.get_own_handshakedata();
990

991
        // Set reported versions to something incompatible
992
        VersionString::try_from_str("0.0.3")
993
            .unwrap()
994
            .clone_into(&mut own_handshake.version);
995
        VersionString::try_from_str("0.0.0")
996
            .unwrap()
997
            .clone_into(&mut other_handshake.version);
998

999
        let peer_address = get_dummy_socket_address(55);
1000
        let connection_status = check_if_connection_is_allowed(
1001
            state_lock.clone(),
1002
            &own_handshake,
1003
            &other_handshake,
1004
            &peer_address,
1005
        )
1006
        .await;
1007
        assert_eq!(
1008
            InternalConnectionStatus::Refused(ConnectionRefusedReason::IncompatibleVersion),
1009
            connection_status,
1010
            "Connection status must be refused for incompatible version"
1011
        );
1012

1013
        // Test that the same logic is applied when going through the full connection process
1014
        let mock = Builder::new()
1015
            .read(
1016
                &to_bytes(&PeerMessage::Handshake(Box::new((
1017
                    MAGIC_STRING_REQUEST.to_vec(),
1018
                    other_handshake,
1019
                ))))
1020
                .unwrap(),
1021
            )
1022
            .write(
1023
                &to_bytes(&PeerMessage::Handshake(Box::new((
1024
                    MAGIC_STRING_RESPONSE.to_vec(),
1025
                    own_handshake.clone(),
1026
                ))))
1027
                .unwrap(),
1028
            )
1029
            .build();
1030

1031
        let answer = answer_peer_inner(
1032
            mock,
1033
            state_lock.clone(),
1034
            get_dummy_socket_address(0),
1035
            from_main_rx_clone,
1036
            to_main_tx,
1037
            own_handshake,
1038
        )
1039
        .await;
1040
        assert!(
1041
            answer.is_err(),
1042
            "incompatible version numbers must result in error in call to answer_peer"
1043
        );
1044
    }
1045

1046
    #[traced_test]
1047
    #[apply(shared_tokio_runtime)]
1048
    async fn test_incoming_connection_fail_max_peers_exceeded() -> Result<()> {
1049
        // In this scenario a node attempts to make an ingoing connection but the max
1050
        // peer count should prevent a new incoming connection from being accepted.
1051
        let network = Network::Alpha;
1052
        let other_handshake = get_dummy_handshake_data_for_genesis(network);
1053
        let own_handshake = get_dummy_handshake_data_for_genesis(network);
1054
        let mock = Builder::new()
1055
            .read(&to_bytes(&PeerMessage::Handshake(Box::new((
1056
                MAGIC_STRING_REQUEST.to_vec(),
1057
                other_handshake,
1058
            ))))?)
1059
            .write(&to_bytes(&PeerMessage::Handshake(Box::new((
1060
                MAGIC_STRING_RESPONSE.to_vec(),
1061
                own_handshake.clone(),
1062
            ))))?)
1063
            .write(&to_bytes(&PeerMessage::ConnectionStatus(
1064
                TransferConnectionStatus::Refused(ConnectionRefusedReason::MaxPeerNumberExceeded),
1065
            ))?)
1066
            .build();
1067

1068
        let (
1069
            _peer_broadcast_tx,
1070
            from_main_rx_clone,
1071
            to_main_tx,
1072
            _to_main_rx1,
1073
            mut state_lock,
1074
            _hsd,
1075
        ) = get_test_genesis_setup(Network::Alpha, 2, cli_args::Args::default()).await?;
1076

1077
        // set max_peers to 2 to ensure failure on next connection attempt
1078
        let mut cli = state_lock.cli().clone();
1079
        cli.max_num_peers = 2;
1080
        state_lock.set_cli(cli).await;
1081

1082
        let answer = answer_peer_inner(
1083
            mock,
1084
            state_lock.clone(),
1085
            get_dummy_socket_address(2),
1086
            from_main_rx_clone,
1087
            to_main_tx,
1088
            own_handshake,
1089
        )
1090
        .await;
1091
        assert!(answer.is_err(), "max peers exceeded must result in error");
1092

1093
        Ok(())
1094
    }
1095

1096
    #[apply(shared_tokio_runtime)]
1097
    async fn allow_capping_number_of_peers_per_ip() {
1098
        let allow_5_connections_from_same_ip = cli_args::Args {
1099
            max_connections_per_ip: Some(5),
1100
            ..Default::default()
1101
        };
1102
        let (
1103
            _peer_broadcast_tx,
1104
            _from_main_rx_clone,
1105
            _to_main_tx,
1106
            _to_main_rx1,
1107
            mut state_lock,
1108
            _hsd,
1109
        ) = get_test_genesis_setup(Network::Main, 0, allow_5_connections_from_same_ip)
1110
            .await
1111
            .unwrap();
1112

1113
        let dummy_address =
1114
            |i: usize| std::net::SocketAddr::from_str(&format!("253.4.5.1:2801{i}")).unwrap();
1115
        let five_dummy_addresses = (1..=5).map(dummy_address);
1116

1117
        let own_handshake = state_lock.lock_guard().await.get_own_handshakedata();
1118

1119
        // First five connections are allowed, from the same IP.
1120
        for peer_address in five_dummy_addresses {
1121
            let peer_info = get_dummy_peer_incoming(peer_address);
1122
            let peer_handshake = get_dummy_handshake_data_for_genesis(Network::Main);
1123
            let accepted = check_if_connection_is_allowed(
1124
                state_lock.clone(),
1125
                &own_handshake,
1126
                &peer_handshake,
1127
                &peer_address,
1128
            )
1129
            .await;
1130
            assert_eq!(InternalConnectionStatus::Accepted, accepted);
1131

1132
            state_lock
1133
                .lock_guard_mut()
1134
                .await
1135
                .net
1136
                .peer_map
1137
                .insert(peer_address, peer_info.clone());
1138
        }
1139

1140
        // The next connection from the same IP is rejected, as the limit per
1141
        // IP is reached.
1142
        let sixth_peer = dummy_address(6);
1143
        let peer_handshake = get_dummy_handshake_data_for_genesis(Network::Main);
1144
        let refused = check_if_connection_is_allowed(
1145
            state_lock.clone(),
1146
            &own_handshake,
1147
            &peer_handshake,
1148
            &sixth_peer,
1149
        )
1150
        .await;
1151
        assert_eq!(
1152
            InternalConnectionStatus::Refused(ConnectionRefusedReason::MaxPeerNumberExceeded),
1153
            refused
1154
        );
1155

1156
        // But if connections per IP is not capped, allow this sixth connection.
1157
        let allow_all_ips = cli_args::Args::default();
1158
        state_lock.set_cli(allow_all_ips).await;
1159

1160
        assert_eq!(
1161
            InternalConnectionStatus::Accepted,
1162
            check_if_connection_is_allowed(
1163
                state_lock.clone(),
1164
                &own_handshake,
1165
                &peer_handshake,
1166
                &sixth_peer,
1167
            )
1168
            .await
1169
        );
1170
    }
1171

1172
    #[traced_test]
1173
    #[apply(shared_tokio_runtime)]
1174
    async fn disallow_ingoing_connections_from_banned_peers_test() -> Result<()> {
1175
        // In this scenario a peer has been banned, and is attempting to make an ingoing
1176
        // connection. This should not be possible.
1177
        let network = Network::Alpha;
1178
        let other_handshake = get_dummy_handshake_data_for_genesis(network);
1179
        let own_handshake = get_dummy_handshake_data_for_genesis(network);
1180
        let mock = Builder::new()
1181
            .read(&to_bytes(&PeerMessage::Handshake(Box::new((
1182
                MAGIC_STRING_REQUEST.to_vec(),
1183
                other_handshake,
1184
            ))))?)
1185
            .write(&to_bytes(&PeerMessage::Handshake(Box::new((
1186
                MAGIC_STRING_RESPONSE.to_vec(),
1187
                own_handshake.clone(),
1188
            ))))?)
1189
            .write(&to_bytes(&PeerMessage::ConnectionStatus(
1190
                TransferConnectionStatus::Refused(ConnectionRefusedReason::BadStanding),
1191
            ))?)
1192
            .build();
1193

1194
        let peer_count_before_incoming_connection_request = 3;
1195
        let (
1196
            _peer_broadcast_tx,
1197
            from_main_rx_clone,
1198
            to_main_tx,
1199
            _to_main_rx1,
1200
            mut state_lock,
1201
            _hsd,
1202
        ) = get_test_genesis_setup(
1203
            Network::Alpha,
1204
            peer_count_before_incoming_connection_request,
1205
            cli_args::Args::default(),
1206
        )
1207
        .await?;
1208
        let bad_standing: PeerStanding = PeerStanding::init(
1209
            i32::MIN,
1210
            Some((
1211
                NegativePeerSanction::InvalidBlock((7u64.into(), Digest::default())),
1212
                SystemTime::now(),
1213
            )),
1214
            None,
1215
            i32::from(cli_args::Args::default().peer_tolerance),
1216
        );
1217
        let peer_address = get_dummy_socket_address(3);
1218

1219
        state_lock
1220
            .lock_guard_mut()
1221
            .await
1222
            .net
1223
            .write_peer_standing_on_decrease(peer_address.ip(), bad_standing)
1224
            .await;
1225

1226
        let answer = answer_peer_inner(
1227
            mock,
1228
            state_lock.clone(),
1229
            peer_address,
1230
            from_main_rx_clone,
1231
            to_main_tx,
1232
            own_handshake,
1233
        )
1234
        .await;
1235
        assert!(
1236
            answer.is_err(),
1237
            "ingoing connection from banned peers must be disallowed"
1238
        );
1239

1240
        // Verify that peer map is empty after connection has been refused
1241
        match state_lock.lock(|s| s.net.peer_map.keys().len()).await {
1242
            3 => (),
1243
            _ => bail!("Incorrect number of maps in peer map"),
1244
        };
1245

1246
        Ok(())
1247
    }
1248
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc