• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Neptune-Crypto / neptune-core / 14699768938

28 Apr 2025 03:34AM UTC coverage: 79.726% (-0.01%) from 79.739%
14699768938

push

github

dan-da
test: use localhost to avoid windows firewall

Integration tests that involve one peer connecting to another were
failing on windows (only).

This was because windows firewall by default prevents (silently?)
application attempts to bind to public interfaces.  The default value
of cli_args::Args::listen_addr is "::" which means bind to all
interfaces, and thus the bind gets blocked, even for localhost.

The fix is to set listen_addr for tests to 127.0.0.1 before starting
the node.

with this change, the tests pass, and CI should succceed for windows
once again.

0 of 1 new or added line in 1 file covered. (0.0%)

7 existing lines in 3 files now uncovered.

37185 of 46641 relevant lines covered (79.73%)

231391.14 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.02
/src/peer_loop.rs
1
use std::cmp;
2
use std::marker::Unpin;
3
use std::net::SocketAddr;
4
use std::time::SystemTime;
5

6
use anyhow::bail;
7
use anyhow::Result;
8
use chrono::DateTime;
9
use chrono::Utc;
10
use futures::sink::Sink;
11
use futures::sink::SinkExt;
12
use futures::stream::TryStream;
13
use futures::stream::TryStreamExt;
14
use itertools::Itertools;
15
use rand::rngs::StdRng;
16
use rand::Rng;
17
use rand::SeedableRng;
18
use tasm_lib::triton_vm::prelude::Digest;
19
use tasm_lib::twenty_first::prelude::Mmr;
20
use tasm_lib::twenty_first::prelude::MmrMembershipProof;
21
use tasm_lib::twenty_first::util_types::mmr::mmr_accumulator::MmrAccumulator;
22
use tokio::select;
23
use tokio::sync::broadcast;
24
use tokio::sync::mpsc;
25
use tracing::debug;
26
use tracing::error;
27
use tracing::info;
28
use tracing::warn;
29

30
use crate::connect_to_peers::close_peer_connected_callback;
31
use crate::macros::fn_name;
32
use crate::macros::log_slow_scope;
33
use crate::main_loop::MAX_NUM_DIGESTS_IN_BATCH_REQUEST;
34
use crate::models::blockchain::block::block_height::BlockHeight;
35
use crate::models::blockchain::block::mutator_set_update::MutatorSetUpdate;
36
use crate::models::blockchain::block::Block;
37
use crate::models::blockchain::transaction::transaction_kernel::TransactionConfirmabilityError;
38
use crate::models::blockchain::transaction::Transaction;
39
use crate::models::channel::MainToPeerTask;
40
use crate::models::channel::PeerTaskToMain;
41
use crate::models::channel::PeerTaskToMainTransaction;
42
use crate::models::peer::handshake_data::HandshakeData;
43
use crate::models::peer::peer_info::PeerConnectionInfo;
44
use crate::models::peer::peer_info::PeerInfo;
45
use crate::models::peer::transfer_block::TransferBlock;
46
use crate::models::peer::BlockProposalRequest;
47
use crate::models::peer::BlockRequestBatch;
48
use crate::models::peer::IssuedSyncChallenge;
49
use crate::models::peer::MutablePeerState;
50
use crate::models::peer::NegativePeerSanction;
51
use crate::models::peer::PeerMessage;
52
use crate::models::peer::PeerSanction;
53
use crate::models::peer::PeerStanding;
54
use crate::models::peer::PositivePeerSanction;
55
use crate::models::peer::SyncChallenge;
56
use crate::models::proof_abstractions::mast_hash::MastHash;
57
use crate::models::proof_abstractions::timestamp::Timestamp;
58
use crate::models::state::block_proposal::BlockProposalRejectError;
59
use crate::models::state::mempool::MEMPOOL_IGNORE_TRANSACTIONS_THIS_MANY_SECS_AHEAD;
60
use crate::models::state::mempool::MEMPOOL_TX_THRESHOLD_AGE_IN_SECS;
61
use crate::models::state::GlobalState;
62
use crate::models::state::GlobalStateLock;
63
use crate::util_types::mutator_set::removal_record::RemovalRecordValidityError;
64

65
const STANDARD_BLOCK_BATCH_SIZE: usize = 250;
66
const MAX_PEER_LIST_LENGTH: usize = 10;
67
const MINIMUM_BLOCK_BATCH_SIZE: usize = 2;
68

69
const KEEP_CONNECTION_ALIVE: bool = false;
70
const DISCONNECT_CONNECTION: bool = true;
71

72
pub type PeerStandingNumber = i32;
73

74
/// Handles messages from peers via TCP
75
///
76
/// also handles messages from main task over the main-to-peer-tasks broadcast
77
/// channel.
78
#[derive(Debug, Clone)]
79
pub struct PeerLoopHandler {
80
    to_main_tx: mpsc::Sender<PeerTaskToMain>,
81
    global_state_lock: GlobalStateLock,
82
    peer_address: SocketAddr,
83
    peer_handshake_data: HandshakeData,
84
    inbound_connection: bool,
85
    distance: u8,
86
    rng: StdRng,
87
    #[cfg(test)]
88
    mock_now: Option<Timestamp>,
89
}
90

91
impl PeerLoopHandler {
92
    pub(crate) fn new(
28✔
93
        to_main_tx: mpsc::Sender<PeerTaskToMain>,
28✔
94
        global_state_lock: GlobalStateLock,
28✔
95
        peer_address: SocketAddr,
28✔
96
        peer_handshake_data: HandshakeData,
28✔
97
        inbound_connection: bool,
28✔
98
        distance: u8,
28✔
99
    ) -> Self {
28✔
100
        Self {
28✔
101
            to_main_tx,
28✔
102
            global_state_lock,
28✔
103
            peer_address,
28✔
104
            peer_handshake_data,
28✔
105
            inbound_connection,
28✔
106
            distance,
28✔
107
            rng: StdRng::from_rng(&mut rand::rng()),
28✔
108
            #[cfg(test)]
28✔
109
            mock_now: None,
28✔
110
        }
28✔
111
    }
28✔
112

113
    /// Allows for mocked timestamps such that time dependencies may be tested.
114
    #[cfg(test)]
115
    pub(crate) fn with_mocked_time(
20✔
116
        to_main_tx: mpsc::Sender<PeerTaskToMain>,
20✔
117
        global_state_lock: GlobalStateLock,
20✔
118
        peer_address: SocketAddr,
20✔
119
        peer_handshake_data: HandshakeData,
20✔
120
        inbound_connection: bool,
20✔
121
        distance: u8,
20✔
122
        mocked_time: Timestamp,
20✔
123
    ) -> Self {
20✔
124
        Self {
20✔
125
            to_main_tx,
20✔
126
            global_state_lock,
20✔
127
            peer_address,
20✔
128
            peer_handshake_data,
20✔
129
            inbound_connection,
20✔
130
            distance,
20✔
131
            mock_now: Some(mocked_time),
20✔
132
            rng: StdRng::from_rng(&mut rand::rng()),
20✔
133
        }
20✔
134
    }
20✔
135

136
    /// Overwrite the random number generator object with a specific one.
137
    ///
138
    /// Useful for derandomizing tests.
139
    #[cfg(test)]
140
    fn set_rng(&mut self, rng: StdRng) {
1✔
141
        self.rng = rng;
1✔
142
    }
1✔
143

144
    fn now(&self) -> Timestamp {
54✔
145
        #[cfg(not(test))]
54✔
146
        {
54✔
147
            Timestamp::now()
54✔
148
        }
54✔
149
        #[cfg(test)]
54✔
150
        {
54✔
151
            self.mock_now.unwrap_or(Timestamp::now())
54✔
152
        }
54✔
153
    }
54✔
154

155
    /// Punish a peer for bad behavior.
156
    ///
157
    /// Return `Err` if the peer in question is (now) banned.
158
    ///
159
    /// # Locking:
160
    ///   * acquires `global_state_lock` for write
161
    async fn punish(&mut self, reason: NegativePeerSanction) -> Result<()> {
6✔
162
        let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
6✔
163
        warn!("Punishing peer {} for {:?}", self.peer_address.ip(), reason);
6✔
164
        debug!(
6✔
165
            "Peer standing before punishment is {}",
×
166
            global_state_mut
×
167
                .net
×
168
                .peer_map
×
169
                .get(&self.peer_address)
×
170
                .unwrap()
×
171
                .standing
172
        );
173

174
        let Some(peer_info) = global_state_mut.net.peer_map.get_mut(&self.peer_address) else {
6✔
175
            bail!("Could not read peer map.");
×
176
        };
177
        let sanction_result = peer_info.standing.sanction(PeerSanction::Negative(reason));
6✔
178
        if let Err(err) = sanction_result {
6✔
179
            warn!("Banning peer: {err}");
1✔
180
        }
5✔
181

182
        sanction_result.map_err(|err| anyhow::anyhow!("Banning peer: {err}"))
6✔
183
    }
6✔
184

185
    /// Reward a peer for good behavior.
186
    ///
187
    /// Return `Err` if the peer in question is banned.
188
    ///
189
    /// # Locking:
190
    ///   * acquires `global_state_lock` for write
191
    async fn reward(&mut self, reason: PositivePeerSanction) -> Result<()> {
19✔
192
        let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
19✔
193
        info!("Rewarding peer {} for {:?}", self.peer_address.ip(), reason);
19✔
194
        let Some(peer_info) = global_state_mut.net.peer_map.get_mut(&self.peer_address) else {
19✔
195
            error!("Could not read peer map.");
1✔
196
            return Ok(());
1✔
197
        };
198
        let sanction_result = peer_info.standing.sanction(PeerSanction::Positive(reason));
18✔
199
        if sanction_result.is_err() {
18✔
200
            error!("Cannot reward banned peer");
×
201
        }
18✔
202

203
        sanction_result.map_err(|err| anyhow::anyhow!("Cannot reward banned peer: {err}"))
18✔
204
    }
19✔
205

206
    /// Construct a batch response, with blocks and their MMR membership proofs
207
    /// relative to a specified anchor.
208
    ///
209
    /// Returns `None` if the anchor has a lower leaf count than the blocks, or
210
    /// a block height of the response exceeds own tip height.
211
    async fn batch_response(
16✔
212
        state: &GlobalState,
16✔
213
        blocks: Vec<Block>,
16✔
214
        anchor: &MmrAccumulator,
16✔
215
    ) -> Option<Vec<(TransferBlock, MmrMembershipProof)>> {
16✔
216
        let own_tip_height = state.chain.light_state().header().height;
16✔
217
        let block_heights_match_anchor = blocks
16✔
218
            .iter()
16✔
219
            .all(|bl| bl.header().height < anchor.num_leafs().into());
46✔
220
        let block_heights_known = blocks.iter().all(|bl| bl.header().height <= own_tip_height);
46✔
221
        if !block_heights_match_anchor || !block_heights_known {
16✔
222
            let max_block_height = match blocks.iter().map(|bl| bl.header().height).max() {
×
223
                Some(height) => height.to_string(),
×
224
                None => "None".to_owned(),
×
225
            };
226

227
            debug!("max_block_height: {max_block_height}");
×
228
            debug!("own_tip_height: {own_tip_height}");
×
229
            debug!("anchor.num_leafs(): {}", anchor.num_leafs());
×
230
            debug!("block_heights_match_anchor: {block_heights_match_anchor}");
×
231
            debug!("block_heights_known: {block_heights_known}");
×
232
            return None;
×
233
        }
16✔
234

16✔
235
        let mut ret = vec![];
16✔
236
        for block in blocks {
62✔
237
            let mmr_mp = state
46✔
238
                .chain
46✔
239
                .archival_state()
46✔
240
                .archival_block_mmr
46✔
241
                .ammr()
46✔
242
                .prove_membership_relative_to_smaller_mmr(
46✔
243
                    block.header().height.into(),
46✔
244
                    anchor.num_leafs(),
46✔
245
                )
46✔
246
                .await;
46✔
247
            let block: TransferBlock = block.try_into().unwrap();
46✔
248
            ret.push((block, mmr_mp));
46✔
249
        }
250

251
        Some(ret)
16✔
252
    }
16✔
253

254
    /// Handle validation and send all blocks to the main task if they're all
255
    /// valid. Use with a list of blocks or a single block. When the
256
    /// `received_blocks` is a list, the parent of the `i+1`th block in the
257
    /// list is the `i`th block. The parent of element zero in this list is
258
    /// `parent_of_first_block`.
259
    ///
260
    /// # Return Value
261
    ///  - `Err` when the connection should be closed;
262
    ///  - `Ok(None)` if some block is invalid
263
    ///  - `Ok(None)` if the last block has insufficient cumulative PoW and we
264
    ///    are not syncing;
265
    ///  - `Ok(None)` if the last block has insufficient height and we are
266
    ///    syncing;
267
    ///  - `Ok(Some(block_height))` otherwise, referring to the block with the
268
    ///    highest height in the batch.
269
    ///
270
    /// A return value of Ok(Some(_)) means that the message was passed on to
271
    /// main loop.
272
    ///
273
    /// # Locking
274
    ///   * Acquires `global_state_lock` for write via `self.punish(..)` and
275
    ///     `self.reward(..)`.
276
    ///
277
    /// # Panics
278
    ///
279
    ///  - Panics if called with the empty list.
280
    async fn handle_blocks(
20✔
281
        &mut self,
20✔
282
        received_blocks: Vec<Block>,
20✔
283
        parent_of_first_block: Block,
20✔
284
    ) -> Result<Option<BlockHeight>> {
20✔
285
        debug!(
20✔
286
            "attempting to validate {} {}",
×
287
            received_blocks.len(),
×
288
            if received_blocks.len() == 1 {
×
289
                "block"
×
290
            } else {
291
                "blocks"
×
292
            }
293
        );
294
        let now = self.now();
20✔
295
        debug!("validating with respect to current timestamp {now}");
20✔
296
        let mut previous_block = &parent_of_first_block;
20✔
297
        for new_block in &received_blocks {
48✔
298
            let new_block_has_proof_of_work = new_block.has_proof_of_work(previous_block.header());
29✔
299
            debug!("new block has proof of work? {new_block_has_proof_of_work}");
29✔
300
            let new_block_is_valid = new_block
29✔
301
                .is_valid(previous_block, now, self.global_state_lock.cli().network)
29✔
302
                .await;
29✔
303
            debug!("new block is valid? {new_block_is_valid}");
29✔
304
            if !new_block_has_proof_of_work {
29✔
305
                warn!(
1✔
306
                    "Received invalid proof-of-work for block of height {} from peer with IP {}",
×
307
                    new_block.kernel.header.height, self.peer_address
×
308
                );
309
                warn!("Difficulty is {}.", previous_block.kernel.header.difficulty);
1✔
310
                warn!(
1✔
311
                    "Proof of work should be {} (or more) but was [{}].",
×
312
                    previous_block.kernel.header.difficulty.target(),
×
313
                    new_block.hash().values().iter().join(", ")
×
314
                );
315
                self.punish(NegativePeerSanction::InvalidBlock((
1✔
316
                    new_block.kernel.header.height,
1✔
317
                    new_block.hash(),
1✔
318
                )))
1✔
319
                .await?;
1✔
320
                warn!("Failed to validate block due to insufficient PoW");
1✔
321
                return Ok(None);
1✔
322
            } else if !new_block_is_valid {
28✔
323
                warn!(
×
324
                    "Received invalid block of height {} from peer with IP {}",
×
325
                    new_block.kernel.header.height, self.peer_address
×
326
                );
327
                self.punish(NegativePeerSanction::InvalidBlock((
×
328
                    new_block.kernel.header.height,
×
329
                    new_block.hash(),
×
330
                )))
×
331
                .await?;
×
332
                warn!("Failed to validate block: invalid block");
×
333
                return Ok(None);
×
334
            }
28✔
335
            info!(
28✔
336
                "Block with height {} is valid. mined: {}",
×
337
                new_block.kernel.header.height,
×
338
                new_block.kernel.header.timestamp.standard_format()
×
339
            );
340

341
            previous_block = new_block;
28✔
342
        }
343

344
        // evaluate the fork choice rule
345
        debug!("Checking last block's canonicity ...");
19✔
346
        let last_block = received_blocks.last().unwrap();
19✔
347
        let is_canonical = self
19✔
348
            .global_state_lock
19✔
349
            .lock_guard()
19✔
350
            .await
19✔
351
            .incoming_block_is_more_canonical(last_block);
19✔
352
        let last_block_height = last_block.header().height;
19✔
353
        let sync_mode_active_and_have_new_champion = self
19✔
354
            .global_state_lock
19✔
355
            .lock_guard()
19✔
356
            .await
19✔
357
            .net
358
            .sync_anchor
359
            .as_ref()
19✔
360
            .is_some_and(|x| {
19✔
361
                x.champion
×
362
                    .is_none_or(|(height, _)| height < last_block_height)
×
363
            });
19✔
364
        if !is_canonical && !sync_mode_active_and_have_new_champion {
19✔
365
            warn!(
1✔
366
                "Received {} blocks from peer but incoming blocks are less \
×
367
            canonical than current tip, or current sync-champion.",
×
368
                received_blocks.len()
×
369
            );
370
            return Ok(None);
1✔
371
        }
18✔
372

18✔
373
        // Send the new blocks to the main task which handles the state update
18✔
374
        // and storage to the database.
18✔
375
        let number_of_received_blocks = received_blocks.len();
18✔
376
        self.to_main_tx
18✔
377
            .send(PeerTaskToMain::NewBlocks(received_blocks))
18✔
378
            .await?;
18✔
379
        info!(
18✔
380
            "Updated block info by block from peer. block height {}",
×
381
            last_block_height
382
        );
383

384
        // Valuable, new, hard-to-produce information. Reward peer.
385
        self.reward(PositivePeerSanction::ValidBlocks(number_of_received_blocks))
18✔
386
            .await?;
18✔
387

388
        Ok(Some(last_block_height))
18✔
389
    }
20✔
390

391
    /// Take a single block received from a peer and (attempt to) find a path
392
    /// between the received block and a common ancestor stored in the blocks
393
    /// database.
394
    ///
395
    /// This function attempts to find the parent of the received block, either
396
    /// by searching the database or by requesting it from a peer.
397
    ///  - If the parent is not stored, it is requested from the peer and the
398
    ///    received block is pushed to the fork reconciliation list for later
399
    ///    handling by this function. The fork reconciliation list starts out
400
    ///    empty, but grows as more parents are requested and transmitted.
401
    ///  - If the parent is found in the database, a) block handling continues:
402
    ///    the entire list of fork reconciliation blocks are passed down the
403
    ///    pipeline, potentially leading to a state update; and b) the fork
404
    ///    reconciliation list is cleared.
405
    ///
406
    /// Locking:
407
    ///   * Acquires `global_state_lock` for write via `self.punish(..)` and
408
    ///     `self.reward(..)`.
409
    async fn try_ensure_path<S>(
32✔
410
        &mut self,
32✔
411
        received_block: Box<Block>,
32✔
412
        peer: &mut S,
32✔
413
        peer_state: &mut MutablePeerState,
32✔
414
    ) -> Result<()>
32✔
415
    where
32✔
416
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
32✔
417
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
32✔
418
        <S as TryStream>::Error: std::error::Error,
32✔
419
    {
32✔
420
        // Does the received block match the fork reconciliation list?
421
        let received_block_matches_fork_reconciliation_list = if let Some(successor) =
32✔
422
            peer_state.fork_reconciliation_blocks.last()
32✔
423
        {
424
            let valid = successor
10✔
425
                .is_valid(
10✔
426
                    received_block.as_ref(),
10✔
427
                    self.now(),
10✔
428
                    self.global_state_lock.cli().network,
10✔
429
                )
10✔
430
                .await;
10✔
431
            if !valid {
10✔
432
                warn!(
×
433
                        "Fork reconciliation failed after receiving {} blocks: successor of received block is invalid",
×
434
                        peer_state.fork_reconciliation_blocks.len() + 1
×
435
                    );
436
            }
10✔
437
            valid
10✔
438
        } else {
439
            true
22✔
440
        };
441

442
        // Are we running out of RAM?
443
        let too_many_blocks = peer_state.fork_reconciliation_blocks.len() + 1
32✔
444
            >= self.global_state_lock.cli().sync_mode_threshold;
32✔
445
        if too_many_blocks {
32✔
446
            warn!(
1✔
447
                "Fork reconciliation failed after receiving {} blocks: block count exceeds sync mode threshold",
×
448
                peer_state.fork_reconciliation_blocks.len() + 1
×
449
            );
450
        }
31✔
451

452
        // Block mismatch or too many blocks: abort!
453
        if !received_block_matches_fork_reconciliation_list || too_many_blocks {
32✔
454
            self.punish(NegativePeerSanction::ForkResolutionError((
1✔
455
                received_block.header().height,
1✔
456
                peer_state.fork_reconciliation_blocks.len() as u16,
1✔
457
                received_block.hash(),
1✔
458
            )))
1✔
459
            .await?;
1✔
460
            peer_state.fork_reconciliation_blocks = vec![];
1✔
461
            return Ok(());
1✔
462
        }
31✔
463

31✔
464
        // otherwise, append
31✔
465
        peer_state.fork_reconciliation_blocks.push(*received_block);
31✔
466

31✔
467
        // Try fetch parent
31✔
468
        let received_block_header = *peer_state
31✔
469
            .fork_reconciliation_blocks
31✔
470
            .last()
31✔
471
            .unwrap()
31✔
472
            .header();
31✔
473

31✔
474
        let parent_digest = received_block_header.prev_block_digest;
31✔
475
        let parent_height = received_block_header.height.previous()
31✔
476
            .expect("transferred block must have previous height because genesis block cannot be transferred");
31✔
477
        debug!("Try ensure path: fetching parent block");
31✔
478
        let parent_block = self
31✔
479
            .global_state_lock
31✔
480
            .lock_guard()
31✔
481
            .await
31✔
482
            .chain
483
            .archival_state()
31✔
484
            .get_block(parent_digest)
31✔
485
            .await?;
31✔
486
        debug!(
31✔
487
            "Completed parent block fetching from DB: {}",
×
488
            if parent_block.is_some() {
×
489
                "found".to_string()
×
490
            } else {
491
                "not found".to_string()
×
492
            }
493
        );
494

495
        // If parent is not known (but not genesis) request it.
496
        let Some(parent_block) = parent_block else {
31✔
497
            if parent_height.is_genesis() {
11✔
498
                peer_state.fork_reconciliation_blocks.clear();
1✔
499
                self.punish(NegativePeerSanction::DifferentGenesis).await?;
1✔
500
                return Ok(());
×
501
            }
10✔
502
            info!(
10✔
503
                "Parent not known: Requesting previous block with height {} from peer",
×
504
                parent_height
505
            );
506

507
            peer.send(PeerMessage::BlockRequestByHash(parent_digest))
10✔
508
                .await?;
10✔
509

510
            return Ok(());
10✔
511
        };
512

513
        // We want to treat the received fork reconciliation blocks (plus the
514
        // received block) in reverse order, from oldest to newest, because
515
        // they were requested from high to low block height.
516
        let mut new_blocks = peer_state.fork_reconciliation_blocks.clone();
20✔
517
        new_blocks.reverse();
20✔
518

20✔
519
        // Reset the fork resolution state since we got all the way back to a
20✔
520
        // block that we have.
20✔
521
        let fork_reconciliation_event = !peer_state.fork_reconciliation_blocks.is_empty();
20✔
522
        peer_state.fork_reconciliation_blocks.clear();
20✔
523

524
        if let Some(new_block_height) = self.handle_blocks(new_blocks, parent_block).await? {
20✔
525
            // If `BlockNotification` was received during a block reconciliation
526
            // event, then the peer might have one (or more (unlikely)) blocks
527
            // that we do not have. We should thus request those blocks.
528
            if fork_reconciliation_event
18✔
529
                && peer_state.highest_shared_block_height > new_block_height
18✔
530
            {
531
                peer.send(PeerMessage::BlockRequestByHeight(
1✔
532
                    peer_state.highest_shared_block_height,
1✔
533
                ))
1✔
534
                .await?;
1✔
535
            }
17✔
536
        }
2✔
537

538
        Ok(())
20✔
539
    }
32✔
540

541
    /// Handle peer messages and returns Ok(true) if connection should be closed.
542
    /// Connection should also be closed if an error is returned.
543
    /// Otherwise, returns OK(false).
544
    ///
545
    /// Locking:
546
    ///   * Acquires `global_state_lock` for read.
547
    ///   * Acquires `global_state_lock` for write via `self.punish(..)` and
548
    ///     `self.reward(..)`.
549
    async fn handle_peer_message<S>(
147✔
550
        &mut self,
147✔
551
        msg: PeerMessage,
147✔
552
        peer: &mut S,
147✔
553
        peer_state_info: &mut MutablePeerState,
147✔
554
    ) -> Result<bool>
147✔
555
    where
147✔
556
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
147✔
557
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
147✔
558
        <S as TryStream>::Error: std::error::Error,
147✔
559
    {
147✔
560
        debug!(
147✔
561
            "Received {} from peer {}",
×
562
            msg.get_type(),
×
563
            self.peer_address
564
        );
565
        match msg {
147✔
566
            PeerMessage::Bye => {
567
                // Note that the current peer is not removed from the global_state.peer_map here
568
                // but that this is done by the caller.
569
                info!("Got bye. Closing connection to peer");
40✔
570
                Ok(DISCONNECT_CONNECTION)
40✔
571
            }
572
            PeerMessage::PeerListRequest => {
573
                let peer_info = {
5✔
574
                    log_slow_scope!(fn_name!() + "::PeerMessage::PeerListRequest");
5✔
575

576
                    // We are interested in the address on which peers accept ingoing connections,
577
                    // not in the address in which they are connected to us. We are only interested in
578
                    // peers that accept incoming connections.
579
                    let mut peer_info: Vec<(SocketAddr, u128)> = self
5✔
580
                        .global_state_lock
5✔
581
                        .lock_guard()
5✔
582
                        .await
5✔
583
                        .net
584
                        .peer_map
585
                        .values()
5✔
586
                        .filter(|peer_info| peer_info.listen_address().is_some())
8✔
587
                        .take(MAX_PEER_LIST_LENGTH) // limit length of response
5✔
588
                        .map(|peer_info| {
8✔
589
                            (
8✔
590
                                // unwrap is safe bc of above `filter`
8✔
591
                                peer_info.listen_address().unwrap(),
8✔
592
                                peer_info.instance_id(),
8✔
593
                            )
8✔
594
                        })
8✔
595
                        .collect();
5✔
596

5✔
597
                    // We sort the returned list, so this function is easier to test
5✔
598
                    peer_info.sort_by_cached_key(|x| x.0);
5✔
599
                    peer_info
5✔
600
                };
5✔
601

5✔
602
                debug!("Responding with: {:?}", peer_info);
5✔
603
                peer.send(PeerMessage::PeerListResponse(peer_info)).await?;
5✔
604
                Ok(KEEP_CONNECTION_ALIVE)
5✔
605
            }
606
            PeerMessage::PeerListResponse(peers) => {
3✔
607
                log_slow_scope!(fn_name!() + "::PeerMessage::PeerListResponse");
3✔
608

3✔
609
                if peers.len() > MAX_PEER_LIST_LENGTH {
3✔
610
                    self.punish(NegativePeerSanction::FloodPeerListResponse)
×
611
                        .await?;
×
612
                }
3✔
613
                self.to_main_tx
3✔
614
                    .send(PeerTaskToMain::PeerDiscoveryAnswer((
3✔
615
                        peers,
3✔
616
                        self.peer_address,
3✔
617
                        // The distance to the revealed peers is 1 + this peer's distance
3✔
618
                        self.distance + 1,
3✔
619
                    )))
3✔
620
                    .await?;
3✔
621
                Ok(KEEP_CONNECTION_ALIVE)
3✔
622
            }
623
            PeerMessage::BlockNotificationRequest => {
624
                debug!("Got BlockNotificationRequest");
×
625

626
                peer.send(PeerMessage::BlockNotification(
×
627
                    self.global_state_lock
×
628
                        .lock_guard()
×
629
                        .await
×
630
                        .chain
631
                        .light_state()
×
632
                        .into(),
×
633
                ))
×
634
                .await?;
×
635

636
                Ok(KEEP_CONNECTION_ALIVE)
×
637
            }
638
            PeerMessage::BlockNotification(block_notification) => {
16✔
639
                const SYNC_CHALLENGE_COOLDOWN: Timestamp = Timestamp::minutes(10);
640

641
                let (tip_header, sync_anchor_is_set) = {
16✔
642
                    let state = self.global_state_lock.lock_guard().await;
16✔
643
                    (
16✔
644
                        *state.chain.light_state().header(),
16✔
645
                        state.net.sync_anchor.is_some(),
16✔
646
                    )
16✔
647
                };
16✔
648
                debug!(
16✔
649
                    "Got BlockNotification of height {}. Own height is {}",
×
650
                    block_notification.height, tip_header.height
651
                );
652

653
                let sync_mode_threshold = self.global_state_lock.cli().sync_mode_threshold;
16✔
654
                let now = self.now();
16✔
655
                let time_since_latest_successful_challenge = peer_state_info
16✔
656
                    .successful_sync_challenge_response_time
16✔
657
                    .map(|then| now - then);
16✔
658
                let cooldown_expired = time_since_latest_successful_challenge
16✔
659
                    .is_none_or(|time_passed| time_passed > SYNC_CHALLENGE_COOLDOWN);
16✔
660
                let exceeds_sync_mode_threshold = GlobalState::sync_mode_threshold_stateless(
16✔
661
                    &tip_header,
16✔
662
                    block_notification.height,
16✔
663
                    block_notification.cumulative_proof_of_work,
16✔
664
                    sync_mode_threshold,
16✔
665
                );
16✔
666
                if cooldown_expired && exceeds_sync_mode_threshold {
16✔
667
                    debug!("sync mode criterion satisfied.");
1✔
668

669
                    if peer_state_info.sync_challenge.is_some() {
1✔
670
                        warn!("Cannot launch new sync challenge because one is already on-going.");
×
671
                        return Ok(KEEP_CONNECTION_ALIVE);
×
672
                    }
1✔
673

1✔
674
                    info!(
1✔
675
                        "Peer indicates block which satisfies sync mode criterion, issuing challenge."
×
676
                    );
677
                    let challenge = SyncChallenge::generate(
1✔
678
                        &block_notification,
1✔
679
                        tip_header.height,
1✔
680
                        self.rng.random(),
1✔
681
                    );
1✔
682
                    peer_state_info.sync_challenge = Some(IssuedSyncChallenge::new(
1✔
683
                        challenge,
1✔
684
                        block_notification.cumulative_proof_of_work,
1✔
685
                        self.now(),
1✔
686
                    ));
1✔
687

1✔
688
                    debug!("sending challenge ...");
1✔
689
                    peer.send(PeerMessage::SyncChallenge(challenge)).await?;
1✔
690

691
                    return Ok(KEEP_CONNECTION_ALIVE);
1✔
692
                }
15✔
693

15✔
694
                peer_state_info.highest_shared_block_height = block_notification.height;
15✔
695
                let block_is_new = tip_header.cumulative_proof_of_work
15✔
696
                    < block_notification.cumulative_proof_of_work;
15✔
697

15✔
698
                debug!("block_is_new: {}", block_is_new);
15✔
699

700
                if block_is_new
15✔
701
                    && peer_state_info.fork_reconciliation_blocks.is_empty()
15✔
702
                    && !sync_anchor_is_set
14✔
703
                    && !exceeds_sync_mode_threshold
14✔
704
                {
705
                    debug!(
13✔
706
                        "sending BlockRequestByHeight to peer for block with height {}",
×
707
                        block_notification.height
708
                    );
709
                    peer.send(PeerMessage::BlockRequestByHeight(block_notification.height))
13✔
710
                        .await?;
13✔
711
                } else {
712
                    debug!(
2✔
713
                        "ignoring peer block. height {}. new: {}, reconciling_fork: {}",
×
714
                        block_notification.height,
×
715
                        block_is_new,
×
716
                        !peer_state_info.fork_reconciliation_blocks.is_empty()
×
717
                    );
718
                }
719

720
                Ok(KEEP_CONNECTION_ALIVE)
15✔
721
            }
722
            PeerMessage::SyncChallenge(sync_challenge) => {
2✔
723
                let response = {
×
724
                    log_slow_scope!(fn_name!() + "::PeerMessage::SyncChallenge");
2✔
725

2✔
726
                    info!("Got sync challenge from {}", self.peer_address.ip());
2✔
727

728
                    let response = self
2✔
729
                        .global_state_lock
2✔
730
                        .lock_guard()
2✔
731
                        .await
2✔
732
                        .response_to_sync_challenge(sync_challenge)
2✔
733
                        .await;
2✔
734

735
                    match response {
2✔
736
                        Ok(resp) => resp,
×
737
                        Err(e) => {
2✔
738
                            warn!("could not generate sync challenge response:\n{e}");
2✔
739
                            self.punish(NegativePeerSanction::InvalidSyncChallenge)
2✔
740
                                .await?;
2✔
741
                            return Ok(KEEP_CONNECTION_ALIVE);
2✔
742
                        }
743
                    }
744
                };
745

746
                info!(
×
747
                    "Responding to sync challenge from {}",
×
748
                    self.peer_address.ip()
×
749
                );
750
                peer.send(PeerMessage::SyncChallengeResponse(Box::new(response)))
×
751
                    .await?;
×
752

753
                Ok(KEEP_CONNECTION_ALIVE)
×
754
            }
755
            PeerMessage::SyncChallengeResponse(challenge_response) => {
1✔
756
                const SYNC_RESPONSE_TIMEOUT: Timestamp = Timestamp::seconds(45);
757

758
                log_slow_scope!(fn_name!() + "::PeerMessage::SyncChallengeResponse");
1✔
759
                info!(
1✔
760
                    "Got sync challenge response from {}",
×
761
                    self.peer_address.ip()
×
762
                );
763

764
                // The purpose of the sync challenge and sync challenge response
765
                // is to avoid going into sync mode based on a malicious target
766
                // fork. Instead of verifying that the claimed proof-of-work
767
                // number is correct (which would require sending and verifying,
768
                // at least, all blocks between luca (whatever that is) and the
769
                // claimed tip), we use a heuristic that requires less
770
                // communication and less verification work. The downside of
771
                // using a heuristic here is a nonzero false positive and false
772
                // negative rate. Note that the false negative event
773
                // (maliciously sending someone into sync mode based on a bogus
774
                // fork) still requires a significant amount of work from the
775
                // attacker, *in addition* to being lucky. Also, down the line
776
                // succinctness (and more specifically, recursive block
777
                // validation) obviates this entire subprotocol.
778

779
                // Did we issue a challenge?
780
                let Some(issued_challenge) = peer_state_info.sync_challenge else {
1✔
781
                    warn!("Sync challenge response was not prompted.");
×
782
                    self.punish(NegativePeerSanction::UnexpectedSyncChallengeResponse)
×
783
                        .await?;
×
784
                    return Ok(KEEP_CONNECTION_ALIVE);
×
785
                };
786

787
                // Reset the challenge, regardless of the response's success.
788
                peer_state_info.sync_challenge = None;
1✔
789

1✔
790
                // Does response match issued challenge?
1✔
791
                if !challenge_response.matches(issued_challenge) {
1✔
792
                    self.punish(NegativePeerSanction::InvalidSyncChallengeResponse)
×
793
                        .await?;
×
794
                    return Ok(KEEP_CONNECTION_ALIVE);
×
795
                }
1✔
796

1✔
797
                // Does response verify?
1✔
798
                let claimed_tip_height = challenge_response.tip.header.height;
1✔
799
                let now = self.now();
1✔
800
                if !challenge_response
1✔
801
                    .is_valid(now, self.global_state_lock.cli().network)
1✔
802
                    .await
1✔
803
                {
804
                    self.punish(NegativePeerSanction::InvalidSyncChallengeResponse)
×
805
                        .await?;
×
806
                    return Ok(KEEP_CONNECTION_ALIVE);
×
807
                }
1✔
808

809
                // Does cumulative proof-of-work evolve reasonably?
810
                let own_tip_header = *self
1✔
811
                    .global_state_lock
1✔
812
                    .lock_guard()
1✔
813
                    .await
1✔
814
                    .chain
815
                    .light_state()
1✔
816
                    .header();
1✔
817
                if !challenge_response
1✔
818
                    .check_pow(self.global_state_lock.cli().network, own_tip_header.height)
1✔
819
                {
820
                    self.punish(NegativePeerSanction::FishyPowEvolutionChallengeResponse)
×
821
                        .await?;
×
822
                    return Ok(KEEP_CONNECTION_ALIVE);
×
823
                }
1✔
824

1✔
825
                // Is there some specific (*i.e.*, not aggregate) proof of work?
1✔
826
                if !challenge_response.check_difficulty(own_tip_header.difficulty) {
1✔
827
                    self.punish(NegativePeerSanction::FishyDifficultiesChallengeResponse)
×
828
                        .await?;
×
829
                    return Ok(KEEP_CONNECTION_ALIVE);
×
830
                }
1✔
831

1✔
832
                // Did it come in time?
1✔
833
                if now - issued_challenge.issued_at > SYNC_RESPONSE_TIMEOUT {
1✔
834
                    self.punish(NegativePeerSanction::TimedOutSyncChallengeResponse)
×
835
                        .await?;
×
836
                    return Ok(KEEP_CONNECTION_ALIVE);
×
837
                }
1✔
838

1✔
839
                info!("Successful sync challenge response; relaying peer tip info to main loop.");
1✔
840
                peer_state_info.successful_sync_challenge_response_time = Some(now);
1✔
841

1✔
842
                let mut sync_mmra_anchor = challenge_response.tip.body.block_mmr_accumulator;
1✔
843
                sync_mmra_anchor.append(issued_challenge.challenge.tip_digest);
1✔
844

1✔
845
                // Inform main loop
1✔
846
                self.to_main_tx
1✔
847
                    .send(PeerTaskToMain::AddPeerMaxBlockHeight {
1✔
848
                        peer_address: self.peer_address,
1✔
849
                        claimed_height: claimed_tip_height,
1✔
850
                        claimed_cumulative_pow: issued_challenge.accumulated_pow,
1✔
851
                        claimed_block_mmra: sync_mmra_anchor,
1✔
852
                    })
1✔
853
                    .await?;
1✔
854

855
                Ok(KEEP_CONNECTION_ALIVE)
1✔
856
            }
857
            PeerMessage::BlockRequestByHash(block_digest) => {
×
858
                let block = self
×
859
                    .global_state_lock
×
860
                    .lock_guard()
×
861
                    .await
×
862
                    .chain
863
                    .archival_state()
×
864
                    .get_block(block_digest)
×
865
                    .await?;
×
866

867
                match block {
×
868
                    None => {
869
                        // TODO: Consider punishing here
870
                        warn!("Peer requested unknown block with hash {}", block_digest);
×
871
                        Ok(KEEP_CONNECTION_ALIVE)
×
872
                    }
873
                    Some(b) => {
×
874
                        peer.send(PeerMessage::Block(Box::new(b.try_into().unwrap())))
×
875
                            .await?;
×
876
                        Ok(KEEP_CONNECTION_ALIVE)
×
877
                    }
878
                }
879
            }
880
            PeerMessage::BlockRequestByHeight(block_height) => {
16✔
881
                let block_response = {
15✔
882
                    log_slow_scope!(fn_name!() + "::PeerMessage::BlockRequestByHeight");
16✔
883

16✔
884
                    debug!("Got BlockRequestByHeight of height {}", block_height);
16✔
885

886
                    let canonical_block_digest = self
16✔
887
                        .global_state_lock
16✔
888
                        .lock_guard()
16✔
889
                        .await
16✔
890
                        .chain
891
                        .archival_state()
16✔
892
                        .archival_block_mmr
16✔
893
                        .ammr()
16✔
894
                        .try_get_leaf(block_height.into())
16✔
895
                        .await;
16✔
896

897
                    let canonical_block_digest = match canonical_block_digest {
16✔
898
                        None => {
899
                            let own_tip_height = self
1✔
900
                                .global_state_lock
1✔
901
                                .lock_guard()
1✔
902
                                .await
1✔
903
                                .chain
904
                                .light_state()
1✔
905
                                .header()
1✔
906
                                .height;
1✔
907
                            warn!("Got block request by height ({block_height}) for unknown block. Own tip height is {own_tip_height}.");
1✔
908
                            self.punish(NegativePeerSanction::BlockRequestUnknownHeight)
1✔
909
                                .await?;
1✔
910

911
                            return Ok(KEEP_CONNECTION_ALIVE);
1✔
912
                        }
913
                        Some(digest) => digest,
15✔
914
                    };
915

916
                    let canonical_chain_block: Block = self
15✔
917
                        .global_state_lock
15✔
918
                        .lock_guard()
15✔
919
                        .await
15✔
920
                        .chain
921
                        .archival_state()
15✔
922
                        .get_block(canonical_block_digest)
15✔
923
                        .await?
15✔
924
                        .unwrap();
15✔
925

15✔
926
                    PeerMessage::Block(Box::new(canonical_chain_block.try_into().unwrap()))
15✔
927
                };
15✔
928

15✔
929
                debug!("Sending block");
15✔
930
                peer.send(block_response).await?;
15✔
931
                debug!("Sent block");
15✔
932
                Ok(KEEP_CONNECTION_ALIVE)
15✔
933
            }
934
            PeerMessage::Block(t_block) => {
32✔
935
                log_slow_scope!(fn_name!() + "::PeerMessage::Block");
32✔
936

32✔
937
                info!(
32✔
938
                    "Got new block from peer {}, height {}, mined {}",
×
939
                    self.peer_address,
×
940
                    t_block.header.height,
×
941
                    t_block.header.timestamp.standard_format()
×
942
                );
943
                let new_block_height = t_block.header.height;
32✔
944

945
                let block = match Block::try_from(*t_block) {
32✔
946
                    Ok(block) => Box::new(block),
32✔
947
                    Err(e) => {
×
948
                        warn!("Peer sent invalid block: {e:?}");
×
949
                        self.punish(NegativePeerSanction::InvalidTransferBlock)
×
950
                            .await?;
×
951

952
                        return Ok(KEEP_CONNECTION_ALIVE);
×
953
                    }
954
                };
955

956
                // Update the value for the highest known height that peer possesses iff
957
                // we are not in a fork reconciliation state.
958
                if peer_state_info.fork_reconciliation_blocks.is_empty() {
32✔
959
                    peer_state_info.highest_shared_block_height = new_block_height;
22✔
960
                }
22✔
961

962
                self.try_ensure_path(block, peer, peer_state_info).await?;
32✔
963

964
                // Reward happens as part of `try_ensure_path`
965

966
                Ok(KEEP_CONNECTION_ALIVE)
31✔
967
            }
968
            PeerMessage::BlockRequestBatch(BlockRequestBatch {
969
                known_blocks,
8✔
970
                max_response_len,
8✔
971
                anchor,
8✔
972
            }) => {
8✔
973
                debug!(
8✔
974
                    "Received BlockRequestBatch from peer {}, max_response_len: {max_response_len}",
×
975
                    self.peer_address
976
                );
977

978
                if known_blocks.len() > MAX_NUM_DIGESTS_IN_BATCH_REQUEST {
8✔
979
                    self.punish(NegativePeerSanction::BatchBlocksRequestTooManyDigests)
×
980
                        .await?;
×
981

982
                    return Ok(KEEP_CONNECTION_ALIVE);
×
983
                }
8✔
984

985
                // The last block in the list of the peers known block is the
986
                // earliest block, block with lowest height, the peer has
987
                // requested. If it does not belong to canonical chain, none of
988
                // the later will. So we can do an early abort in that case.
989
                let least_preferred = match known_blocks.last() {
8✔
990
                    Some(least_preferred) => *least_preferred,
8✔
991
                    None => {
992
                        self.punish(NegativePeerSanction::BatchBlocksRequestEmpty)
×
993
                            .await?;
×
994

995
                        return Ok(KEEP_CONNECTION_ALIVE);
×
996
                    }
997
                };
998

999
                let state = self.global_state_lock.lock_guard().await;
8✔
1000
                let block_mmr_num_leafs = state.chain.light_state().header().height.next().into();
8✔
1001
                let luca_is_known = state
8✔
1002
                    .chain
8✔
1003
                    .archival_state()
8✔
1004
                    .block_belongs_to_canonical_chain(least_preferred)
8✔
1005
                    .await;
8✔
1006
                if !luca_is_known || anchor.num_leafs() > block_mmr_num_leafs {
8✔
1007
                    drop(state);
×
1008
                    self.punish(NegativePeerSanction::BatchBlocksUnknownRequest)
×
1009
                        .await?;
×
1010
                    peer.send(PeerMessage::UnableToSatisfyBatchRequest).await?;
×
1011

1012
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1013
                }
8✔
1014

1015
                // Happy case: At least *one* of the blocks referenced by peer
1016
                // is known to us.
1017
                let first_block_in_response = {
8✔
1018
                    let mut first_block_in_response: Option<BlockHeight> = None;
8✔
1019
                    for block_digest in known_blocks {
10✔
1020
                        if state
10✔
1021
                            .chain
10✔
1022
                            .archival_state()
10✔
1023
                            .block_belongs_to_canonical_chain(block_digest)
10✔
1024
                            .await
10✔
1025
                        {
1026
                            let height = state
8✔
1027
                                .chain
8✔
1028
                                .archival_state()
8✔
1029
                                .get_block_header(block_digest)
8✔
1030
                                .await
8✔
1031
                                .unwrap()
8✔
1032
                                .height;
8✔
1033
                            first_block_in_response = Some(height);
8✔
1034
                            debug!(
8✔
1035
                                "Found block in canonical chain for batch response: {}",
×
1036
                                block_digest
1037
                            );
1038
                            break;
8✔
1039
                        }
2✔
1040
                    }
1041

1042
                    first_block_in_response
8✔
1043
                        .expect("existence of LUCA should have been established already.")
8✔
1044
                };
8✔
1045

8✔
1046
                debug!(
8✔
1047
                    "Peer's most preferred block has height {first_block_in_response}.\
×
1048
                 Now building response from that height."
×
1049
                );
1050

1051
                // Get the relevant blocks, at most batch-size many, descending from the
1052
                // peer's (alleged) most canonical block. Don't exceed `max_response_len`
1053
                // or `STANDARD_BLOCK_BATCH_SIZE` number of blocks in response.
1054
                let max_response_len = cmp::min(
8✔
1055
                    max_response_len,
8✔
1056
                    self.global_state_lock.cli().sync_mode_threshold,
8✔
1057
                );
8✔
1058
                let max_response_len = cmp::max(max_response_len, MINIMUM_BLOCK_BATCH_SIZE);
8✔
1059
                let max_response_len = cmp::min(max_response_len, STANDARD_BLOCK_BATCH_SIZE);
8✔
1060

8✔
1061
                let mut digests_of_returned_blocks = Vec::with_capacity(max_response_len);
8✔
1062
                let response_start_height: u64 = first_block_in_response.into();
8✔
1063
                let mut i: u64 = 1;
8✔
1064
                while digests_of_returned_blocks.len() < max_response_len {
31✔
1065
                    let block_height = response_start_height + i;
31✔
1066
                    match state
31✔
1067
                        .chain
31✔
1068
                        .archival_state()
31✔
1069
                        .archival_block_mmr
31✔
1070
                        .ammr()
31✔
1071
                        .try_get_leaf(block_height)
31✔
1072
                        .await
31✔
1073
                    {
1074
                        Some(digest) => {
23✔
1075
                            digests_of_returned_blocks.push(digest);
23✔
1076
                        }
23✔
1077
                        None => break,
8✔
1078
                    }
1079
                    i += 1;
23✔
1080
                }
1081

1082
                let mut returned_blocks: Vec<Block> =
8✔
1083
                    Vec::with_capacity(digests_of_returned_blocks.len());
8✔
1084
                for block_digest in digests_of_returned_blocks {
31✔
1085
                    let block = state
23✔
1086
                        .chain
23✔
1087
                        .archival_state()
23✔
1088
                        .get_block(block_digest)
23✔
1089
                        .await?
23✔
1090
                        .unwrap();
23✔
1091
                    returned_blocks.push(block);
23✔
1092
                }
1093

1094
                let response = Self::batch_response(&state, returned_blocks, &anchor).await;
8✔
1095

1096
                // issue 457. do not hold lock across a peer.send(), nor self.punish()
1097
                drop(state);
8✔
1098

1099
                let Some(response) = response else {
8✔
1100
                    warn!("Unable to satisfy batch-block request");
×
1101
                    self.punish(NegativePeerSanction::BatchBlocksUnknownRequest)
×
1102
                        .await?;
×
1103
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1104
                };
1105

1106
                debug!("Returning {} blocks in batch response", response.len());
8✔
1107

1108
                let response = PeerMessage::BlockResponseBatch(response);
8✔
1109
                peer.send(response).await?;
8✔
1110

1111
                Ok(KEEP_CONNECTION_ALIVE)
8✔
1112
            }
1113
            PeerMessage::BlockResponseBatch(authenticated_blocks) => {
×
1114
                log_slow_scope!(fn_name!() + "::PeerMessage::BlockResponseBatch");
×
1115

×
1116
                debug!(
×
1117
                    "handling block response batch with {} blocks",
×
1118
                    authenticated_blocks.len()
×
1119
                );
1120

1121
                // (Alan:) why is there even a minimum?
1122
                if authenticated_blocks.len() < MINIMUM_BLOCK_BATCH_SIZE {
×
1123
                    warn!("Got smaller batch response than allowed");
×
1124
                    self.punish(NegativePeerSanction::TooShortBlockBatch)
×
1125
                        .await?;
×
1126
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1127
                }
×
1128

1129
                // Verify that we are in fact in syncing mode
1130
                // TODO: Separate peer messages into those allowed under syncing
1131
                // and those that are not
1132
                let Some(sync_anchor) = self
×
1133
                    .global_state_lock
×
1134
                    .lock_guard()
×
1135
                    .await
×
1136
                    .net
1137
                    .sync_anchor
1138
                    .clone()
×
1139
                else {
1140
                    warn!("Received a batch of blocks without being in syncing mode");
×
1141
                    self.punish(NegativePeerSanction::ReceivedBatchBlocksOutsideOfSync)
×
1142
                        .await?;
×
1143
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1144
                };
1145

1146
                // Verify that the response matches the current state
1147
                // We get the latest block from the DB here since this message is
1148
                // only valid for archival nodes.
1149
                let (first_block, _) = &authenticated_blocks[0];
×
1150
                let first_blocks_parent_digest: Digest = first_block.header.prev_block_digest;
×
1151
                let most_canonical_own_block_match: Option<Block> = self
×
1152
                    .global_state_lock
×
1153
                    .lock_guard()
×
1154
                    .await
×
1155
                    .chain
1156
                    .archival_state()
×
1157
                    .get_block(first_blocks_parent_digest)
×
1158
                    .await
×
1159
                    .expect("Block lookup must succeed");
×
1160
                let most_canonical_own_block_match: Block = match most_canonical_own_block_match {
×
1161
                    Some(block) => block,
×
1162
                    None => {
1163
                        warn!("Got batch response with invalid start block");
×
1164
                        self.punish(NegativePeerSanction::BatchBlocksInvalidStartHeight)
×
1165
                            .await?;
×
1166
                        return Ok(KEEP_CONNECTION_ALIVE);
×
1167
                    }
1168
                };
1169

1170
                // Convert all blocks to Block objects
1171
                debug!(
×
1172
                    "Found own block of height {} to match received batch",
×
1173
                    most_canonical_own_block_match.kernel.header.height
×
1174
                );
1175
                let mut received_blocks = vec![];
×
1176
                for (t_block, membership_proof) in authenticated_blocks {
×
1177
                    let Ok(block) = Block::try_from(t_block) else {
×
1178
                        warn!("Received invalid transfer block from peer");
×
1179
                        self.punish(NegativePeerSanction::InvalidTransferBlock)
×
1180
                            .await?;
×
1181
                        return Ok(KEEP_CONNECTION_ALIVE);
×
1182
                    };
1183

1184
                    if !membership_proof.verify(
×
1185
                        block.header().height.into(),
×
1186
                        block.hash(),
×
1187
                        &sync_anchor.block_mmr.peaks(),
×
1188
                        sync_anchor.block_mmr.num_leafs(),
×
1189
                    ) {
×
1190
                        warn!("Authentication of received block fails relative to anchor");
×
1191
                        self.punish(NegativePeerSanction::InvalidBlockMmrAuthentication)
×
1192
                            .await?;
×
1193
                        return Ok(KEEP_CONNECTION_ALIVE);
×
1194
                    }
×
1195

×
1196
                    received_blocks.push(block);
×
1197
                }
1198

1199
                // Get the latest block that we know of and handle all received blocks
1200
                self.handle_blocks(received_blocks, most_canonical_own_block_match)
×
1201
                    .await?;
×
1202

1203
                // Reward happens as part of `handle_blocks`.
1204

1205
                Ok(KEEP_CONNECTION_ALIVE)
×
1206
            }
1207
            PeerMessage::UnableToSatisfyBatchRequest => {
1208
                log_slow_scope!(fn_name!() + "::PeerMessage::UnableToSatisfyBatchRequest");
×
1209
                warn!(
×
1210
                    "Peer {} reports inability to satisfy batch request.",
×
1211
                    self.peer_address
1212
                );
1213

1214
                Ok(KEEP_CONNECTION_ALIVE)
×
1215
            }
1216
            PeerMessage::Handshake(_) => {
1217
                log_slow_scope!(fn_name!() + "::PeerMessage::Handshake");
×
1218

×
1219
                // The handshake should have been sent during connection
×
1220
                // initialization. Here it is out of order at best, malicious at
×
1221
                // worst.
×
1222
                self.punish(NegativePeerSanction::InvalidMessage).await?;
×
1223
                Ok(KEEP_CONNECTION_ALIVE)
×
1224
            }
1225
            PeerMessage::ConnectionStatus(_) => {
1226
                log_slow_scope!(fn_name!() + "::PeerMessage::ConnectionStatus");
×
1227

×
1228
                // The connection status should have been sent during connection
×
1229
                // initialization. Here it is out of order at best, malicious at
×
1230
                // worst.
×
1231

×
1232
                self.punish(NegativePeerSanction::InvalidMessage).await?;
×
1233
                Ok(KEEP_CONNECTION_ALIVE)
×
1234
            }
1235
            PeerMessage::Transaction(transaction) => {
5✔
1236
                log_slow_scope!(fn_name!() + "::PeerMessage::Transaction");
5✔
1237

5✔
1238
                debug!(
5✔
1239
                    "`peer_loop` received following transaction from peer. {} inputs, {} outputs. Synced to mutator set hash: {}",
×
1240
                    transaction.kernel.inputs.len(),
×
1241
                    transaction.kernel.outputs.len(),
×
1242
                    transaction.kernel.mutator_set_hash
×
1243
                );
1244

1245
                let transaction: Transaction = (*transaction).into();
5✔
1246

5✔
1247
                // 1. If transaction is invalid, punish.
5✔
1248
                if !transaction
5✔
1249
                    .is_valid(self.global_state_lock.cli().network)
5✔
1250
                    .await
5✔
1251
                {
1252
                    warn!("Received invalid tx");
×
1253
                    self.punish(NegativePeerSanction::InvalidTransaction)
×
1254
                        .await?;
×
1255
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1256
                }
5✔
1257

5✔
1258
                // 2. If transaction has coinbase, punish.
5✔
1259
                // Transactions received from peers have not been mined yet.
5✔
1260
                // Only the miner is allowed to produce transactions with non-empty coinbase fields.
5✔
1261
                if transaction.kernel.coinbase.is_some() {
5✔
1262
                    warn!("Received non-mined transaction with coinbase.");
×
1263
                    self.punish(NegativePeerSanction::NonMinedTransactionHasCoinbase)
×
1264
                        .await?;
×
1265
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1266
                }
5✔
1267

5✔
1268
                // 3. If negative fee, punish.
5✔
1269
                if transaction.kernel.fee.is_negative() {
5✔
1270
                    warn!("Received negative-fee transaction.");
×
1271
                    self.punish(NegativePeerSanction::TransactionWithNegativeFee)
×
1272
                        .await?;
×
1273
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1274
                }
5✔
1275

5✔
1276
                // 4. If transaction is already known, ignore.
5✔
1277
                if self
5✔
1278
                    .global_state_lock
5✔
1279
                    .lock_guard()
5✔
1280
                    .await
5✔
1281
                    .mempool
1282
                    .contains_with_higher_proof_quality(
1283
                        transaction.kernel.txid(),
5✔
1284
                        transaction.proof.proof_quality()?,
5✔
1285
                    )
1286
                {
1287
                    warn!("Received transaction that was already known");
×
1288

1289
                    // We received a transaction that we *probably* haven't requested.
1290
                    // Consider punishing here, if this is abused.
1291
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1292
                }
5✔
1293

1294
                // 5. if transaction is not confirmable, punish.
1295
                let (tip, mutator_set_accumulator_after) = {
5✔
1296
                    let state = self.global_state_lock.lock_guard().await;
5✔
1297

1298
                    (
5✔
1299
                        state.chain.light_state().hash(),
5✔
1300
                        state.chain.light_state().mutator_set_accumulator_after(),
5✔
1301
                    )
5✔
1302
                };
5✔
1303
                if !transaction.is_confirmable_relative_to(&mutator_set_accumulator_after) {
5✔
1304
                    warn!(
×
1305
                        "Received unconfirmable transaction with TXID {}. Unconfirmable because:",
×
1306
                        transaction.kernel.txid()
×
1307
                    );
1308
                    // get fine-grained error code for informative logging
1309
                    let confirmability_error_code = transaction
×
1310
                        .kernel
×
1311
                        .is_confirmable_relative_to(&mutator_set_accumulator_after);
×
1312
                    match confirmability_error_code {
×
1313
                        Ok(_) => unreachable!(),
×
1314
                        Err(TransactionConfirmabilityError::InvalidRemovalRecord(index)) => {
×
1315
                            warn!("invalid removal record (at index {index})");
×
1316
                            let invalid_removal_record = transaction.kernel.inputs[index].clone();
×
1317
                            let removal_record_error_code = invalid_removal_record
×
1318
                                .validate_inner(&mutator_set_accumulator_after);
×
1319
                            debug!(
×
1320
                                "Absolute index set of removal record {index}: {:?}",
×
1321
                                invalid_removal_record.absolute_indices
1322
                            );
1323
                            match removal_record_error_code {
×
1324
                                Ok(_) => unreachable!(),
×
1325
                                Err(RemovalRecordValidityError::AbsentAuthenticatedChunk) => {
1326
                                    debug!("invalid because membership proof is missing");
×
1327
                                }
1328
                                Err(RemovalRecordValidityError::InvalidSwbfiMmrMp {
1329
                                    chunk_index,
×
1330
                                }) => {
×
1331
                                    debug!("invalid because membership proof for chunk index {chunk_index} is invalid");
×
1332
                                }
1333
                            };
1334
                            self.punish(NegativePeerSanction::UnconfirmableTransaction)
×
1335
                                .await?;
×
1336
                            return Ok(KEEP_CONNECTION_ALIVE);
×
1337
                        }
1338
                        Err(TransactionConfirmabilityError::DuplicateInputs) => {
1339
                            warn!("duplicate inputs");
×
1340
                            self.punish(NegativePeerSanction::DoubleSpendingTransaction)
×
1341
                                .await?;
×
1342
                            return Ok(KEEP_CONNECTION_ALIVE);
×
1343
                        }
1344
                        Err(TransactionConfirmabilityError::AlreadySpentInput(index)) => {
×
1345
                            warn!("already spent input (at index {index})");
×
1346
                            self.punish(NegativePeerSanction::DoubleSpendingTransaction)
×
1347
                                .await?;
×
1348
                            return Ok(KEEP_CONNECTION_ALIVE);
×
1349
                        }
1350
                    };
1351
                }
5✔
1352

5✔
1353
                // If transaction cannot be applied to mutator set, punish.
5✔
1354
                // I don't think this can happen when above checks pass but we include
5✔
1355
                // the check to ensure that transaction can be applied.
5✔
1356
                let ms_update = MutatorSetUpdate::new(
5✔
1357
                    transaction.kernel.inputs.clone(),
5✔
1358
                    transaction.kernel.outputs.clone(),
5✔
1359
                );
5✔
1360
                let can_apply = ms_update
5✔
1361
                    .apply_to_accumulator(&mut mutator_set_accumulator_after.clone())
5✔
1362
                    .is_ok();
5✔
1363
                if !can_apply {
5✔
1364
                    warn!("Cannot apply transaction to current mutator set.");
×
1365
                    warn!("Transaction ID: {}", transaction.kernel.txid());
×
1366
                    self.punish(NegativePeerSanction::CannotApplyTransactionToMutatorSet)
×
1367
                        .await?;
×
1368
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1369
                }
5✔
1370

5✔
1371
                let tx_timestamp = transaction.kernel.timestamp;
5✔
1372

5✔
1373
                // 6. Ignore if transaction is too old
5✔
1374
                let now = self.now();
5✔
1375
                if tx_timestamp < now - Timestamp::seconds(MEMPOOL_TX_THRESHOLD_AGE_IN_SECS) {
5✔
1376
                    // TODO: Consider punishing here
1377
                    warn!("Received too old tx");
×
1378
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1379
                }
5✔
1380

5✔
1381
                // 7. Ignore if transaction is too far into the future
5✔
1382
                if tx_timestamp
5✔
1383
                    > now + Timestamp::seconds(MEMPOOL_IGNORE_TRANSACTIONS_THIS_MANY_SECS_AHEAD)
5✔
1384
                {
1385
                    // TODO: Consider punishing here
1386
                    warn!("Received tx too far into the future. Got timestamp: {tx_timestamp:?}");
×
1387
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1388
                }
5✔
1389

5✔
1390
                // Otherwise, relay to main
5✔
1391
                let pt2m_transaction = PeerTaskToMainTransaction {
5✔
1392
                    transaction,
5✔
1393
                    confirmable_for_block: tip,
5✔
1394
                };
5✔
1395
                self.to_main_tx
5✔
1396
                    .send(PeerTaskToMain::Transaction(Box::new(pt2m_transaction)))
5✔
1397
                    .await?;
5✔
1398

1399
                Ok(KEEP_CONNECTION_ALIVE)
5✔
1400
            }
1401
            PeerMessage::TransactionNotification(tx_notification) => {
12✔
1402
                // addresses #457
12✔
1403
                // new scope for state read-lock to avoid holding across peer.send()
12✔
1404
                {
12✔
1405
                    log_slow_scope!(fn_name!() + "::PeerMessage::TransactionNotification");
12✔
1406

1407
                    // 1. Ignore if we already know this transaction, and
1408
                    // the proof quality is not higher than what we already know.
1409
                    let state = self.global_state_lock.lock_guard().await;
12✔
1410
                    let transaction_of_same_or_higher_proof_quality_is_known =
12✔
1411
                        state.mempool.contains_with_higher_proof_quality(
12✔
1412
                            tx_notification.txid,
12✔
1413
                            tx_notification.proof_quality,
12✔
1414
                        );
12✔
1415
                    if transaction_of_same_or_higher_proof_quality_is_known {
12✔
1416
                        debug!("transaction with same or higher proof quality was already known");
7✔
1417
                        return Ok(KEEP_CONNECTION_ALIVE);
7✔
1418
                    }
5✔
1419

5✔
1420
                    // Only accept transactions that do not require executing
5✔
1421
                    // `update`.
5✔
1422
                    if state
5✔
1423
                        .chain
5✔
1424
                        .light_state()
5✔
1425
                        .mutator_set_accumulator_after()
5✔
1426
                        .hash()
5✔
1427
                        != tx_notification.mutator_set_hash
5✔
1428
                    {
1429
                        debug!("transaction refers to non-canonical mutator set state");
×
1430
                        return Ok(KEEP_CONNECTION_ALIVE);
×
1431
                    }
5✔
1432
                }
5✔
1433

5✔
1434
                // 2. Request the actual `Transaction` from peer
5✔
1435
                debug!("requesting transaction from peer");
5✔
1436
                peer.send(PeerMessage::TransactionRequest(tx_notification.txid))
5✔
1437
                    .await?;
5✔
1438

1439
                Ok(KEEP_CONNECTION_ALIVE)
5✔
1440
            }
1441
            PeerMessage::TransactionRequest(transaction_identifier) => {
5✔
1442
                let state = self.global_state_lock.lock_guard().await;
5✔
1443
                let Some(transaction) = state.mempool.get(transaction_identifier) else {
5✔
1444
                    return Ok(KEEP_CONNECTION_ALIVE);
1✔
1445
                };
1446

1447
                let Ok(transfer_transaction) = transaction.try_into() else {
4✔
1448
                    warn!("Peer requested transaction that cannot be converted to transfer object");
×
1449
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1450
                };
1451

1452
                // Drop state immediately to prevent holding over a response.
1453
                drop(state);
4✔
1454

4✔
1455
                peer.send(PeerMessage::Transaction(Box::new(transfer_transaction)))
4✔
1456
                    .await?;
4✔
1457

1458
                Ok(KEEP_CONNECTION_ALIVE)
4✔
1459
            }
1460
            PeerMessage::BlockProposalNotification(block_proposal_notification) => {
1✔
1461
                let verdict = self
1✔
1462
                    .global_state_lock
1✔
1463
                    .lock_guard()
1✔
1464
                    .await
1✔
1465
                    .favor_incoming_block_proposal(
1✔
1466
                        block_proposal_notification.height,
1✔
1467
                        block_proposal_notification.guesser_fee,
1✔
1468
                    );
1✔
1469
                match verdict {
1✔
1470
                    Ok(_) => {
1471
                        peer.send(PeerMessage::BlockProposalRequest(
1✔
1472
                            BlockProposalRequest::new(block_proposal_notification.body_mast_hash),
1✔
1473
                        ))
1✔
1474
                        .await?
1✔
1475
                    }
1476
                    Err(reject_reason) => {
×
1477
                        info!(
×
1478
                        "Rejecting notification of block proposal with guesser fee {} from peer \
×
1479
                        {}. Reason:\n{reject_reason}",
×
1480
                        block_proposal_notification.guesser_fee.display_n_decimals(5),
×
1481
                        self.peer_address
1482
                    )
1483
                    }
1484
                }
1485

1486
                Ok(KEEP_CONNECTION_ALIVE)
1✔
1487
            }
1488
            PeerMessage::BlockProposalRequest(block_proposal_request) => {
×
1489
                let matching_proposal = self
×
1490
                    .global_state_lock
×
1491
                    .lock_guard()
×
1492
                    .await
×
1493
                    .mining_state
1494
                    .block_proposal
1495
                    .filter(|x| x.body().mast_hash() == block_proposal_request.body_mast_hash)
×
1496
                    .map(|x| x.to_owned());
×
1497
                if let Some(proposal) = matching_proposal {
×
1498
                    peer.send(PeerMessage::BlockProposal(Box::new(proposal)))
×
1499
                        .await?;
×
1500
                } else {
1501
                    self.punish(NegativePeerSanction::BlockProposalNotFound)
×
1502
                        .await?;
×
1503
                }
1504

1505
                Ok(KEEP_CONNECTION_ALIVE)
×
1506
            }
1507
            PeerMessage::BlockProposal(block) => {
1✔
1508
                info!("Got block proposal from peer.");
1✔
1509

1510
                let should_punish = {
1✔
1511
                    log_slow_scope!(fn_name!() + "::PeerMessage::BlockProposal::should_punish");
1✔
1512

1513
                    let (verdict, tip) = {
1✔
1514
                        let state = self.global_state_lock.lock_guard().await;
1✔
1515

1516
                        let verdict = state.favor_incoming_block_proposal(
1✔
1517
                            block.header().height,
1✔
1518
                            block.total_guesser_reward(),
1✔
1519
                        );
1✔
1520
                        let tip = state.chain.light_state().to_owned();
1✔
1521
                        (verdict, tip)
1✔
1522
                    };
1523

1524
                    if let Err(rejection_reason) = verdict {
1✔
1525
                        match rejection_reason {
×
1526
                            // no need to punish and log if the fees are equal.  we just ignore the incoming proposal.
1527
                            BlockProposalRejectError::InsufficientFee { current, received }
×
1528
                                if Some(received) == current =>
×
1529
                            {
×
1530
                                debug!("ignoring new block proposal because the fee is equal to the present one");
×
1531
                                None
×
1532
                            }
1533
                            _ => {
1534
                                warn!("Rejecting new block proposal:\n{rejection_reason}");
×
1535
                                Some(NegativePeerSanction::NonFavorableBlockProposal)
×
1536
                            }
1537
                        }
1538
                    } else {
1539
                        // Verify validity and that proposal is child of current tip
1540
                        if block
1✔
1541
                            .is_valid(&tip, self.now(), self.global_state_lock.cli().network)
1✔
1542
                            .await
1✔
1543
                        {
1544
                            None // all is well, no punishment.
1✔
1545
                        } else {
1546
                            Some(NegativePeerSanction::InvalidBlockProposal)
×
1547
                        }
1548
                    }
1549
                };
1550

1551
                if let Some(sanction) = should_punish {
1✔
1552
                    self.punish(sanction).await?;
×
1553
                } else {
1554
                    self.send_to_main(PeerTaskToMain::BlockProposal(block), line!())
1✔
1555
                        .await?;
1✔
1556

1557
                    // Valuable, new, hard-to-produce information. Reward peer.
1558
                    self.reward(PositivePeerSanction::NewBlockProposal).await?;
1✔
1559
                }
1560

1561
                Ok(KEEP_CONNECTION_ALIVE)
1✔
1562
            }
1563
        }
1564
    }
147✔
1565

1566
    /// send msg to main via mpsc channel `to_main_tx` and logs if slow.
1567
    ///
1568
    /// the channel could potentially fill up in which case the send() will
1569
    /// block until there is capacity.  we wrap the send() so we can log if
1570
    /// that ever happens to the extent it passes slow-scope threshold.
1571
    async fn send_to_main(
1✔
1572
        &self,
1✔
1573
        msg: PeerTaskToMain,
1✔
1574
        line: u32,
1✔
1575
    ) -> Result<(), tokio::sync::mpsc::error::SendError<PeerTaskToMain>> {
1✔
1576
        // we measure across the send() in case the channel ever fills up.
1✔
1577
        log_slow_scope!(fn_name!() + &format!("peer_loop.rs:{}", line));
1✔
1578

1✔
1579
        self.to_main_tx.send(msg).await
1✔
1580
    }
1✔
1581

1582
    /// Handle message from main task. The boolean return value indicates if
1583
    /// the connection should be closed.
1584
    ///
1585
    /// Locking:
1586
    ///   * acquires `global_state_lock` for write via Self::punish()
1587
    async fn handle_main_task_message<S>(
29✔
1588
        &mut self,
29✔
1589
        msg: MainToPeerTask,
29✔
1590
        peer: &mut S,
29✔
1591
        peer_state_info: &mut MutablePeerState,
29✔
1592
    ) -> Result<bool>
29✔
1593
    where
29✔
1594
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
29✔
1595
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
29✔
1596
        <S as TryStream>::Error: std::error::Error,
29✔
1597
    {
29✔
1598
        debug!("Handling {} message from main in peer loop", msg.get_type());
29✔
1599
        match msg {
29✔
1600
            MainToPeerTask::Block(block) => {
23✔
1601
                // We don't currently differentiate whether a new block came from a peer, or from our
23✔
1602
                // own miner. It's always shared through this logic.
23✔
1603
                let new_block_height = block.kernel.header.height;
23✔
1604
                if new_block_height > peer_state_info.highest_shared_block_height {
23✔
1605
                    debug!("Sending PeerMessage::BlockNotification");
12✔
1606
                    peer_state_info.highest_shared_block_height = new_block_height;
12✔
1607
                    peer.send(PeerMessage::BlockNotification(block.as_ref().into()))
12✔
1608
                        .await?;
12✔
1609
                    debug!("Sent PeerMessage::BlockNotification");
12✔
1610
                }
11✔
1611
                Ok(KEEP_CONNECTION_ALIVE)
23✔
1612
            }
1613
            MainToPeerTask::RequestBlockBatch(batch_block_request) => {
×
1614
                // Only ask one of the peers about the batch of blocks
×
1615
                if batch_block_request.peer_addr_target != self.peer_address {
×
1616
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1617
                }
×
1618

×
1619
                let max_response_len = std::cmp::min(
×
1620
                    STANDARD_BLOCK_BATCH_SIZE,
×
1621
                    self.global_state_lock.cli().sync_mode_threshold,
×
1622
                );
×
1623

×
1624
                peer.send(PeerMessage::BlockRequestBatch(BlockRequestBatch {
×
1625
                    known_blocks: batch_block_request.known_blocks,
×
1626
                    max_response_len,
×
1627
                    anchor: batch_block_request.anchor_mmr,
×
1628
                }))
×
1629
                .await?;
×
1630

1631
                Ok(KEEP_CONNECTION_ALIVE)
×
1632
            }
1633
            MainToPeerTask::PeerSynchronizationTimeout(socket_addr) => {
×
1634
                log_slow_scope!(fn_name!() + "::MainToPeerTask::PeerSynchronizationTimeout");
×
1635

×
1636
                if self.peer_address != socket_addr {
×
1637
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1638
                }
×
1639

×
1640
                self.punish(NegativePeerSanction::SynchronizationTimeout)
×
1641
                    .await?;
×
1642

1643
                // If this peer failed the last synchronization attempt, we only
1644
                // sanction, we don't disconnect.
1645
                Ok(KEEP_CONNECTION_ALIVE)
×
1646
            }
1647
            MainToPeerTask::MakePeerDiscoveryRequest => {
1648
                peer.send(PeerMessage::PeerListRequest).await?;
×
1649
                Ok(KEEP_CONNECTION_ALIVE)
×
1650
            }
1651
            MainToPeerTask::Disconnect(peer_address) => {
×
1652
                log_slow_scope!(fn_name!() + "::MainToPeerTask::Disconnect");
×
1653

×
1654
                // Only disconnect from the peer the main task requested a disconnect for.
×
1655
                if peer_address != self.peer_address {
×
1656
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1657
                }
×
1658
                self.register_peer_disconnection().await;
×
1659

1660
                Ok(DISCONNECT_CONNECTION)
×
1661
            }
1662
            MainToPeerTask::DisconnectAll() => {
1663
                self.register_peer_disconnection().await;
×
1664

1665
                Ok(DISCONNECT_CONNECTION)
×
1666
            }
1667
            MainToPeerTask::MakeSpecificPeerDiscoveryRequest(target_socket_addr) => {
×
1668
                if target_socket_addr == self.peer_address {
×
1669
                    peer.send(PeerMessage::PeerListRequest).await?;
×
1670
                }
×
1671
                Ok(KEEP_CONNECTION_ALIVE)
×
1672
            }
1673
            MainToPeerTask::TransactionNotification(transaction_notification) => {
6✔
1674
                debug!("Sending PeerMessage::TransactionNotification");
6✔
1675
                peer.send(PeerMessage::TransactionNotification(
6✔
1676
                    transaction_notification,
6✔
1677
                ))
6✔
1678
                .await?;
6✔
1679
                debug!("Sent PeerMessage::TransactionNotification");
6✔
1680
                Ok(KEEP_CONNECTION_ALIVE)
6✔
1681
            }
1682
            MainToPeerTask::BlockProposalNotification(block_proposal_notification) => {
×
1683
                debug!("Sending PeerMessage::BlockProposalNotification");
×
1684
                peer.send(PeerMessage::BlockProposalNotification(
×
1685
                    block_proposal_notification,
×
1686
                ))
×
1687
                .await?;
×
1688
                debug!("Sent PeerMessage::BlockProposalNotification");
×
1689
                Ok(KEEP_CONNECTION_ALIVE)
×
1690
            }
1691
        }
1692
    }
29✔
1693

1694
    /// Loop for the peer tasks. Awaits either a message from the peer over TCP,
1695
    /// or a message from main over the main-to-peer-tasks broadcast channel.
1696
    async fn run<S>(
47✔
1697
        &mut self,
47✔
1698
        mut peer: S,
47✔
1699
        mut from_main_rx: broadcast::Receiver<MainToPeerTask>,
47✔
1700
        peer_state_info: &mut MutablePeerState,
47✔
1701
    ) -> Result<()>
47✔
1702
    where
47✔
1703
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
47✔
1704
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
47✔
1705
        <S as TryStream>::Error: std::error::Error,
47✔
1706
    {
47✔
1707
        loop {
1708
            select! {
182✔
1709
                // Handle peer messages
1710
                peer_message = peer.try_next() => {
182✔
1711
                    let peer_address = self.peer_address;
147✔
1712
                    let peer_message = match peer_message {
147✔
1713
                        Ok(message) => message,
147✔
1714
                        Err(err) => {
×
1715
                            let msg = format!("Error when receiving from peer: {peer_address}");
×
1716
                            error!("{msg}. Error: {err}");
×
1717
                            bail!("{msg}. Closing connection.");
×
1718
                        }
1719
                    };
1720
                    let Some(peer_message) = peer_message else {
147✔
UNCOV
1721
                        info!("Peer {peer_address} closed connection.");
×
UNCOV
1722
                        break;
×
1723
                    };
1724

1725
                    let syncing =
147✔
1726
                        self.global_state_lock.lock(|s| s.net.sync_anchor.is_some()).await;
147✔
1727
                    let message_type = peer_message.get_type();
147✔
1728
                    if peer_message.ignore_during_sync() && syncing {
147✔
1729
                        debug!(
×
1730
                            "Ignoring {message_type} message when syncing, from {peer_address}",
×
1731
                        );
1732
                        continue;
×
1733
                    }
147✔
1734
                    if peer_message.ignore_when_not_sync() && !syncing {
147✔
1735
                        debug!(
×
1736
                            "Ignoring {message_type} message when not syncing, from {peer_address}",
×
1737
                        );
1738
                        continue;
×
1739
                    }
147✔
1740

147✔
1741
                    match self
147✔
1742
                        .handle_peer_message(peer_message, &mut peer, peer_state_info)
147✔
1743
                        .await
147✔
1744
                    {
1745
                        Ok(false) => {}
106✔
1746
                        Ok(true) => {
1747
                            info!("Closing connection to {peer_address}");
40✔
1748
                            break;
40✔
1749
                        }
1750
                        Err(err) => {
1✔
1751
                            warn!("Closing connection to {peer_address} because of error {err}.");
1✔
1752
                            bail!("{err}");
1✔
1753
                        }
1754
                    };
1755
                }
1756

1757
                // Handle messages from main task
1758
                main_msg_res = from_main_rx.recv() => {
182✔
1759
                    let main_msg = main_msg_res.unwrap_or_else(|err| {
29✔
1760
                        let err_msg = format!("Failed to read from main loop: {err}");
×
1761
                        error!(err_msg);
×
1762
                        panic!("{err_msg}");
×
1763
                    });
29✔
1764
                    let close_connection = self
29✔
1765
                        .handle_main_task_message(main_msg, &mut peer, peer_state_info)
29✔
1766
                        .await
29✔
1767
                        .unwrap_or_else(|err| {
29✔
1768
                            warn!("handle_main_task_message returned an error: {err}");
×
1769
                            true
×
1770
                        });
29✔
1771

29✔
1772
                    if close_connection {
29✔
1773
                        info!(
×
1774
                            "handle_main_task_message is closing the connection to {}",
×
1775
                            self.peer_address
1776
                        );
1777
                        break;
×
1778
                    }
29✔
1779
                }
1780
            }
1781
        }
1782

1783
        Ok(())
40✔
1784
    }
41✔
1785

1786
    /// Function called before entering the peer loop. Reads the potentially stored
1787
    /// peer standing from the database and does other book-keeping before entering
1788
    /// its final resting place: the `peer_loop`. Note that the peer has already been
1789
    /// accepted for a connection for this loop to be entered. So we don't need
1790
    /// to check the standing again.
1791
    ///
1792
    /// Locking:
1793
    ///   * acquires `global_state_lock` for write
1794
    pub(crate) async fn run_wrapper<S>(
38✔
1795
        &mut self,
38✔
1796
        mut peer: S,
38✔
1797
        from_main_rx: broadcast::Receiver<MainToPeerTask>,
38✔
1798
    ) -> Result<()>
38✔
1799
    where
38✔
1800
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
38✔
1801
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
38✔
1802
        <S as TryStream>::Error: std::error::Error,
38✔
1803
    {
38✔
1804
        const TIME_DIFFERENCE_WARN_THRESHOLD_IN_SECONDS: i128 = 120;
1805

1806
        let cli_args = self.global_state_lock.cli().clone();
38✔
1807

1808
        let standing = self
38✔
1809
            .global_state_lock
38✔
1810
            .lock_guard()
38✔
1811
            .await
38✔
1812
            .net
1813
            .peer_databases
1814
            .peer_standings
1815
            .get(self.peer_address.ip())
38✔
1816
            .await
38✔
1817
            .unwrap_or_else(|| PeerStanding::new(cli_args.peer_tolerance));
37✔
1818

37✔
1819
        // Add peer to peer map
37✔
1820
        let peer_connection_info = PeerConnectionInfo::new(
37✔
1821
            self.peer_handshake_data.listen_port,
37✔
1822
            self.peer_address,
37✔
1823
            self.inbound_connection,
37✔
1824
        );
37✔
1825
        let new_peer = PeerInfo::new(
37✔
1826
            peer_connection_info,
37✔
1827
            &self.peer_handshake_data,
37✔
1828
            SystemTime::now(),
37✔
1829
            cli_args.peer_tolerance,
37✔
1830
        )
37✔
1831
        .with_standing(standing);
37✔
1832

37✔
1833
        // If timestamps are different, we currently just log a warning.
37✔
1834
        let peer_clock_ahead_in_seconds = new_peer.time_difference_in_seconds();
37✔
1835
        let own_clock_ahead_in_seconds = -peer_clock_ahead_in_seconds;
37✔
1836
        if peer_clock_ahead_in_seconds > TIME_DIFFERENCE_WARN_THRESHOLD_IN_SECONDS
37✔
1837
            || own_clock_ahead_in_seconds > TIME_DIFFERENCE_WARN_THRESHOLD_IN_SECONDS
37✔
1838
        {
1839
            let own_datetime_utc: DateTime<Utc> =
×
1840
                new_peer.own_timestamp_connection_established.into();
×
1841
            let peer_datetime_utc: DateTime<Utc> =
×
1842
                new_peer.peer_timestamp_connection_established.into();
×
1843
            warn!(
×
1844
                "New peer {} disagrees with us about time. Peer reports time {} but our clock at handshake was {}.",
×
1845
                new_peer.connected_address(),
×
1846
                peer_datetime_utc.format("%Y-%m-%d %H:%M:%S"),
×
1847
                own_datetime_utc.format("%Y-%m-%d %H:%M:%S"));
×
1848
        }
37✔
1849

1850
        // Multiple tasks might attempt to set up a connection concurrently. So
1851
        // even though we've checked that this connection is allowed, this check
1852
        // could have been invalidated by another task, for one accepting an
1853
        // incoming connection from a peer we're currently connecting to. So we
1854
        // need to make the a check again while holding a write-lock, since
1855
        // we're modifying `peer_map` here. Holding a read-lock doesn't work
1856
        // since it would have to be dropped before acquiring the write-lock.
1857
        {
1858
            let mut global_state = self.global_state_lock.lock_guard_mut().await;
37✔
1859
            let peer_map = &mut global_state.net.peer_map;
37✔
1860
            if peer_map
37✔
1861
                .values()
37✔
1862
                .any(|pi| pi.instance_id() == self.peer_handshake_data.instance_id)
37✔
1863
            {
1864
                bail!("Attempted to connect to already connected peer. Aborting connection.");
×
1865
            }
37✔
1866

37✔
1867
            if peer_map.len() >= cli_args.max_num_peers {
37✔
1868
                bail!("Attempted to connect to more peers than allowed. Aborting connection.");
×
1869
            }
37✔
1870

37✔
1871
            if peer_map.contains_key(&self.peer_address) {
37✔
1872
                // This shouldn't be possible, unless the peer reports a different instance ID than
1873
                // for the other connection. Only a malignant client would do that.
1874
                bail!("Already connected to peer. Aborting connection");
×
1875
            }
37✔
1876

37✔
1877
            peer_map.insert(self.peer_address, new_peer);
37✔
1878
        }
37✔
1879

37✔
1880
        // `MutablePeerState` contains the part of the peer-loop's state that is mutable
37✔
1881
        let mut peer_state = MutablePeerState::new(self.peer_handshake_data.tip_header.height);
37✔
1882

37✔
1883
        // If peer indicates more canonical block, request a block notification to catch up ASAP
37✔
1884
        if self.peer_handshake_data.tip_header.cumulative_proof_of_work
37✔
1885
            > self
37✔
1886
                .global_state_lock
37✔
1887
                .lock_guard()
37✔
1888
                .await
37✔
1889
                .chain
1890
                .light_state()
37✔
1891
                .kernel
1892
                .header
1893
                .cumulative_proof_of_work
1894
        {
1895
            // Send block notification request to catch up ASAP, in case we're
1896
            // behind the newly-connected peer.
1897
            peer.send(PeerMessage::BlockNotificationRequest).await?;
×
1898
        }
37✔
1899

1900
        let res = self.run(peer, from_main_rx, &mut peer_state).await;
37✔
1901
        debug!("Exited peer loop for {}", self.peer_address);
31✔
1902

1903
        close_peer_connected_callback(
31✔
1904
            self.global_state_lock.clone(),
31✔
1905
            self.peer_address,
31✔
1906
            &self.to_main_tx,
31✔
1907
        )
31✔
1908
        .await;
31✔
1909

1910
        debug!("Ending peer loop for {}", self.peer_address);
31✔
1911

1912
        // Return any error that `run` returned. Returning and not suppressing errors is a quite nice
1913
        // feature to have for testing purposes.
1914
        res
31✔
1915
    }
31✔
1916

1917
    /// Register graceful peer disconnection in the global state.
1918
    ///
1919
    /// See also [`NetworkingState::register_peer_disconnection`][1].
1920
    ///
1921
    /// # Locking:
1922
    ///   * acquires `global_state_lock` for write
1923
    ///
1924
    /// [1]: crate::models::state::networking_state::NetworkingState::register_peer_disconnection
1925
    async fn register_peer_disconnection(&mut self) {
×
1926
        let peer_id = self.peer_handshake_data.instance_id;
×
1927
        self.global_state_lock
×
1928
            .lock_guard_mut()
×
1929
            .await
×
1930
            .net
1931
            .register_peer_disconnection(peer_id, SystemTime::now());
×
1932
    }
×
1933
}
1934

1935
#[cfg(test)]
1936
mod peer_loop_tests {
1937
    use macro_rules_attr::apply;
1938
    use rand::rngs::StdRng;
1939
    use rand::Rng;
1940
    use rand::SeedableRng;
1941
    use tokio::sync::mpsc::error::TryRecvError;
1942
    use tracing_test::traced_test;
1943

1944
    use super::*;
1945
    use crate::config_models::cli_args;
1946
    use crate::config_models::network::Network;
1947
    use crate::models::blockchain::block::block_header::TARGET_BLOCK_INTERVAL;
1948
    use crate::models::blockchain::type_scripts::native_currency_amount::NativeCurrencyAmount;
1949
    use crate::models::peer::peer_block_notifications::PeerBlockNotification;
1950
    use crate::models::peer::transaction_notification::TransactionNotification;
1951
    use crate::models::state::mempool::TransactionOrigin;
1952
    use crate::models::state::tx_creation_config::TxCreationConfig;
1953
    use crate::models::state::tx_proving_capability::TxProvingCapability;
1954
    use crate::models::state::wallet::wallet_entropy::WalletEntropy;
1955
    use crate::tests::shared::fake_valid_block_for_tests;
1956
    use crate::tests::shared::fake_valid_sequence_of_blocks_for_tests;
1957
    use crate::tests::shared::get_dummy_handshake_data_for_genesis;
1958
    use crate::tests::shared::get_dummy_peer_connection_data_genesis;
1959
    use crate::tests::shared::get_dummy_socket_address;
1960
    use crate::tests::shared::get_test_genesis_setup;
1961
    use crate::tests::shared::invalid_empty_single_proof_transaction;
1962
    use crate::tests::shared::Action;
1963
    use crate::tests::shared::Mock;
1964
    use crate::tests::shared_tokio_runtime;
1965

1966
    #[traced_test]
×
1967
    #[apply(shared_tokio_runtime)]
1968
    async fn test_peer_loop_bye() -> Result<()> {
1969
        let mock = Mock::new(vec![Action::Read(PeerMessage::Bye)]);
1970

1971
        let (peer_broadcast_tx, _from_main_rx_clone, to_main_tx, _to_main_rx1, state_lock, hsd) =
1972
            get_test_genesis_setup(Network::Alpha, 2, cli_args::Args::default()).await?;
1973

1974
        let peer_address = get_dummy_socket_address(2);
1975
        let from_main_rx_clone = peer_broadcast_tx.subscribe();
1976
        let mut peer_loop_handler =
1977
            PeerLoopHandler::new(to_main_tx, state_lock.clone(), peer_address, hsd, true, 1);
1978
        peer_loop_handler
1979
            .run_wrapper(mock, from_main_rx_clone)
1980
            .await?;
1981

1982
        assert_eq!(
1983
            2,
1984
            state_lock.lock_guard().await.net.peer_map.len(),
1985
            "peer map length must be back to 2 after goodbye"
1986
        );
1987

1988
        Ok(())
1989
    }
1990

1991
    #[traced_test]
×
1992
    #[apply(shared_tokio_runtime)]
1993
    async fn test_peer_loop_peer_list() {
1994
        let (peer_broadcast_tx, _from_main_rx_clone, to_main_tx, _to_main_rx1, state_lock, _hsd) =
1995
            get_test_genesis_setup(Network::Alpha, 2, cli_args::Args::default())
1996
                .await
1997
                .unwrap();
1998

1999
        let mut peer_infos = state_lock
2000
            .lock_guard()
2001
            .await
2002
            .net
2003
            .peer_map
2004
            .clone()
2005
            .into_values()
2006
            .collect::<Vec<_>>();
2007
        peer_infos.sort_by_cached_key(|x| x.connected_address());
2✔
2008
        let (peer_address0, instance_id0) = (
2009
            peer_infos[0].connected_address(),
2010
            peer_infos[0].instance_id(),
2011
        );
2012
        let (peer_address1, instance_id1) = (
2013
            peer_infos[1].connected_address(),
2014
            peer_infos[1].instance_id(),
2015
        );
2016

2017
        let (hsd2, sa2) = get_dummy_peer_connection_data_genesis(Network::Alpha, 2);
2018
        let expected_response = vec![
2019
            (peer_address0, instance_id0),
2020
            (peer_address1, instance_id1),
2021
            (sa2, hsd2.instance_id),
2022
        ];
2023
        let mock = Mock::new(vec![
2024
            Action::Read(PeerMessage::PeerListRequest),
2025
            Action::Write(PeerMessage::PeerListResponse(expected_response)),
2026
            Action::Read(PeerMessage::Bye),
2027
        ]);
2028

2029
        let from_main_rx_clone = peer_broadcast_tx.subscribe();
2030

2031
        let mut peer_loop_handler =
2032
            PeerLoopHandler::new(to_main_tx, state_lock.clone(), sa2, hsd2, true, 0);
2033
        peer_loop_handler
2034
            .run_wrapper(mock, from_main_rx_clone)
2035
            .await
2036
            .unwrap();
2037

2038
        assert_eq!(
2039
            2,
2040
            state_lock.lock_guard().await.net.peer_map.len(),
2041
            "peer map must have length 2 after saying goodbye to peer 2"
2042
        );
2043
    }
2044

2045
    #[traced_test]
×
2046
    #[apply(shared_tokio_runtime)]
2047
    async fn different_genesis_test() -> Result<()> {
2048
        // In this scenario a peer provides another genesis block than what has been
2049
        // hardcoded. This should lead to the closing of the connection to this peer
2050
        // and a ban.
2051

2052
        let network = Network::Main;
2053
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, mut to_main_rx1, state_lock, hsd) =
2054
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2055
        assert_eq!(1000, state_lock.cli().peer_tolerance);
2056
        let peer_address = get_dummy_socket_address(0);
2057

2058
        // Although the database is empty, `get_latest_block` still returns the genesis block,
2059
        // since that block is hardcoded.
2060
        let mut different_genesis_block = state_lock
2061
            .lock_guard()
2062
            .await
2063
            .chain
2064
            .archival_state()
2065
            .get_tip()
2066
            .await;
2067

2068
        different_genesis_block.set_header_nonce(StdRng::seed_from_u64(5550001).random());
2069
        let [block_1_with_different_genesis] = fake_valid_sequence_of_blocks_for_tests(
2070
            &different_genesis_block,
2071
            Timestamp::hours(1),
2072
            StdRng::seed_from_u64(5550001).random(),
2073
            network,
2074
        )
2075
        .await;
2076
        let mock = Mock::new(vec![Action::Read(PeerMessage::Block(Box::new(
2077
            block_1_with_different_genesis.try_into().unwrap(),
2078
        )))]);
2079

2080
        let mut peer_loop_handler = PeerLoopHandler::new(
2081
            to_main_tx.clone(),
2082
            state_lock.clone(),
2083
            peer_address,
2084
            hsd,
2085
            true,
2086
            1,
2087
        );
2088
        let res = peer_loop_handler
2089
            .run_wrapper(mock, from_main_rx_clone)
2090
            .await;
2091
        assert!(
2092
            res.is_err(),
2093
            "run_wrapper must return failure when genesis is different"
2094
        );
2095

2096
        match to_main_rx1.recv().await {
2097
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
2098
            _ => bail!("Must receive remove of peer block max height"),
2099
        }
2100

2101
        // Verify that no further message was sent to main loop
2102
        match to_main_rx1.try_recv() {
2103
            Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (),
2104
            _ => bail!("Block notification must not be sent for block with invalid PoW"),
2105
        };
2106

2107
        drop(to_main_tx);
2108

2109
        let peer_standing = state_lock
2110
            .lock_guard()
2111
            .await
2112
            .net
2113
            .get_peer_standing_from_database(peer_address.ip())
2114
            .await;
2115
        assert_eq!(
2116
            -i32::from(state_lock.cli().peer_tolerance),
2117
            peer_standing.unwrap().standing
2118
        );
2119
        assert_eq!(
2120
            NegativePeerSanction::DifferentGenesis,
2121
            peer_standing.unwrap().latest_punishment.unwrap().0
2122
        );
2123

2124
        Ok(())
2125
    }
2126

2127
    #[traced_test]
×
2128
    #[apply(shared_tokio_runtime)]
2129
    async fn node_does_not_record_disconnection_time_when_peer_initiates_disconnect() -> Result<()>
2130
    {
2131
        let args = cli_args::Args::default();
2132
        let network = args.network;
2133
        let (from_main_tx, from_main_rx, to_main_tx, to_main_rx, state_lock, _) =
2134
            get_test_genesis_setup(network, 0, args).await?;
2135

2136
        let peer_address = get_dummy_socket_address(0);
2137
        let peer_handshake_data = get_dummy_handshake_data_for_genesis(network);
2138
        let peer_id = peer_handshake_data.instance_id;
2139
        let mut peer_loop_handler = PeerLoopHandler::new(
2140
            to_main_tx,
2141
            state_lock.clone(),
2142
            peer_address,
2143
            peer_handshake_data,
2144
            true,
2145
            1,
2146
        );
2147
        let mock = Mock::new(vec![Action::Read(PeerMessage::Bye)]);
2148
        peer_loop_handler.run_wrapper(mock, from_main_rx).await?;
2149

2150
        let global_state = state_lock.lock_guard().await;
2151
        assert!(global_state
2152
            .net
2153
            .last_disconnection_time_of_peer(peer_id)
2154
            .is_none());
2155

2156
        drop(to_main_rx);
2157
        drop(from_main_tx);
2158

2159
        Ok(())
2160
    }
2161

2162
    #[traced_test]
×
2163
    #[apply(shared_tokio_runtime)]
2164
    async fn block_without_valid_pow_test() -> Result<()> {
2165
        // In this scenario, a block without a valid PoW is received. This block should be rejected
2166
        // by the peer loop and a notification should never reach the main loop.
2167

2168
        let network = Network::Main;
2169
        let (peer_broadcast_tx, _from_main_rx_clone, to_main_tx, mut to_main_rx1, state_lock, hsd) =
2170
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2171
        let peer_address = get_dummy_socket_address(0);
2172
        let genesis_block: Block = state_lock
2173
            .lock_guard()
2174
            .await
2175
            .chain
2176
            .archival_state()
2177
            .get_tip()
2178
            .await;
2179

2180
        // Make a with hash above what the implied threshold from
2181
        let [mut block_without_valid_pow] = fake_valid_sequence_of_blocks_for_tests(
2182
            &genesis_block,
2183
            Timestamp::hours(1),
2184
            StdRng::seed_from_u64(5550001).random(),
2185
            network,
2186
        )
2187
        .await;
2188

2189
        // This *probably* is invalid PoW -- and needs to be for this test to
2190
        // work.
2191
        block_without_valid_pow.set_header_nonce(Digest::default());
2192

2193
        // Sending an invalid block will not necessarily result in a ban. This depends on the peer
2194
        // tolerance that is set in the client. For this reason, we include a "Bye" here.
2195
        let mock = Mock::new(vec![
2196
            Action::Read(PeerMessage::Block(Box::new(
2197
                block_without_valid_pow.clone().try_into().unwrap(),
2198
            ))),
2199
            Action::Read(PeerMessage::Bye),
2200
        ]);
2201

2202
        let from_main_rx_clone = peer_broadcast_tx.subscribe();
2203

2204
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
2205
            to_main_tx.clone(),
2206
            state_lock.clone(),
2207
            peer_address,
2208
            hsd,
2209
            true,
2210
            1,
2211
            block_without_valid_pow.header().timestamp,
2212
        );
2213
        peer_loop_handler
2214
            .run_wrapper(mock, from_main_rx_clone)
2215
            .await
2216
            .expect("sending (one) invalid block should not result in closed connection");
2217

2218
        match to_main_rx1.recv().await {
2219
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
2220
            _ => bail!("Must receive remove of peer block max height"),
2221
        }
2222

2223
        // Verify that no further message was sent to main loop
2224
        match to_main_rx1.try_recv() {
2225
            Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (),
2226
            _ => bail!("Block notification must not be sent for block with invalid PoW"),
2227
        };
2228

2229
        // We need to have the transmitter in scope until we have received from it
2230
        // otherwise the receiver will report the disconnected error when we attempt
2231
        // to read from it. And the purpose is to verify that the channel is empty,
2232
        // not that it has been closed.
2233
        drop(to_main_tx);
2234

2235
        // Verify that peer standing was stored in database
2236
        let standing = state_lock
2237
            .lock_guard()
2238
            .await
2239
            .net
2240
            .peer_databases
2241
            .peer_standings
2242
            .get(peer_address.ip())
2243
            .await
2244
            .unwrap();
2245
        assert!(
2246
            standing.standing < 0,
2247
            "Peer must be sanctioned for sending a bad block"
2248
        );
2249

2250
        Ok(())
2251
    }
2252

2253
    #[traced_test]
×
2254
    #[apply(shared_tokio_runtime)]
2255
    async fn test_peer_loop_block_with_block_in_db() -> Result<()> {
2256
        // The scenario tested here is that a client receives a block that is already
2257
        // known and stored. The expected behavior is to ignore the block and not send
2258
        // a message to the main task.
2259

2260
        let network = Network::Main;
2261
        let (peer_broadcast_tx, _from_main_rx_clone, to_main_tx, mut to_main_rx1, mut alice, hsd) =
2262
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2263
        let peer_address = get_dummy_socket_address(0);
2264
        let genesis_block: Block = Block::genesis(network);
2265

2266
        let now = genesis_block.header().timestamp + Timestamp::hours(1);
2267
        let block_1 =
2268
            fake_valid_block_for_tests(&alice, StdRng::seed_from_u64(5550001).random()).await;
2269
        assert!(
2270
            block_1.is_valid(&genesis_block, now, network).await,
2271
            "Block must be valid for this test to make sense"
2272
        );
2273
        alice.set_new_tip(block_1.clone()).await?;
2274

2275
        let mock_peer_messages = Mock::new(vec![
2276
            Action::Read(PeerMessage::Block(Box::new(
2277
                block_1.clone().try_into().unwrap(),
2278
            ))),
2279
            Action::Read(PeerMessage::Bye),
2280
        ]);
2281

2282
        let from_main_rx_clone = peer_broadcast_tx.subscribe();
2283

2284
        let mut alice_peer_loop_handler = PeerLoopHandler::with_mocked_time(
2285
            to_main_tx.clone(),
2286
            alice.clone(),
2287
            peer_address,
2288
            hsd,
2289
            false,
2290
            1,
2291
            block_1.header().timestamp,
2292
        );
2293
        alice_peer_loop_handler
2294
            .run_wrapper(mock_peer_messages, from_main_rx_clone)
2295
            .await?;
2296

2297
        match to_main_rx1.recv().await {
2298
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
2299
            other => bail!("Must receive remove of peer block max height. Got:\n {other:?}"),
2300
        }
2301
        match to_main_rx1.try_recv() {
2302
            Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (),
2303
            _ => bail!("Block notification must not be sent for block with invalid PoW"),
2304
        };
2305
        drop(to_main_tx);
2306

2307
        if !alice.lock_guard().await.net.peer_map.is_empty() {
2308
            bail!("peer map must be empty after closing connection gracefully");
2309
        }
2310

2311
        Ok(())
2312
    }
2313

2314
    #[traced_test]
×
2315
    #[apply(shared_tokio_runtime)]
2316
    async fn block_request_batch_simple() {
2317
        // Scenario: Six blocks (including genesis) are known. Peer requests
2318
        // from all possible starting points, and client responds with the
2319
        // correct list of blocks.
2320
        let network = Network::Main;
2321
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, mut state_lock, hsd) =
2322
            get_test_genesis_setup(network, 0, cli_args::Args::default())
2323
                .await
2324
                .unwrap();
2325
        let genesis_block: Block = Block::genesis(network);
2326
        let peer_address = get_dummy_socket_address(0);
2327
        let [block_1, block_2, block_3, block_4, block_5] =
2328
            fake_valid_sequence_of_blocks_for_tests(
2329
                &genesis_block,
2330
                Timestamp::hours(1),
2331
                StdRng::seed_from_u64(5550001).random(),
2332
                network,
2333
            )
2334
            .await;
2335
        let blocks = vec![
2336
            genesis_block,
2337
            block_1,
2338
            block_2,
2339
            block_3,
2340
            block_4,
2341
            block_5.clone(),
2342
        ];
2343
        for block in blocks.iter().skip(1) {
2344
            state_lock.set_new_tip(block.to_owned()).await.unwrap();
2345
        }
2346

2347
        let mmra = state_lock
2348
            .lock_guard()
2349
            .await
2350
            .chain
2351
            .archival_state()
2352
            .archival_block_mmr
2353
            .ammr()
2354
            .to_accumulator_async()
2355
            .await;
2356
        for i in 0..=4 {
2357
            let expected_response = {
2358
                let state = state_lock.lock_guard().await;
2359
                let blocks_for_response = blocks.iter().skip(i + 1).cloned().collect_vec();
2360
                PeerLoopHandler::batch_response(&state, blocks_for_response, &mmra)
2361
                    .await
2362
                    .unwrap()
2363
            };
2364
            let mock = Mock::new(vec![
2365
                Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch {
2366
                    known_blocks: vec![blocks[i].hash()],
2367
                    max_response_len: 14,
2368
                    anchor: mmra.clone(),
2369
                })),
2370
                Action::Write(PeerMessage::BlockResponseBatch(expected_response)),
2371
                Action::Read(PeerMessage::Bye),
2372
            ]);
2373
            let mut peer_loop_handler = PeerLoopHandler::new(
2374
                to_main_tx.clone(),
2375
                state_lock.clone(),
2376
                peer_address,
2377
                hsd.clone(),
2378
                false,
2379
                1,
2380
            );
2381

2382
            peer_loop_handler
2383
                .run_wrapper(mock, from_main_rx_clone.resubscribe())
2384
                .await
2385
                .unwrap();
2386
        }
2387
    }
2388

2389
    #[traced_test]
×
2390
    #[apply(shared_tokio_runtime)]
2391
    async fn block_request_batch_in_order_test() -> Result<()> {
2392
        // Scenario: A fork began at block 2, node knows two blocks of height 2 and two of height 3.
2393
        // A peer requests a batch of blocks starting from block 1. Ensure that the correct blocks
2394
        // are returned.
2395

2396
        let network = Network::Main;
2397
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, mut state_lock, hsd) =
2398
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2399
        let genesis_block: Block = Block::genesis(network);
2400
        let peer_address = get_dummy_socket_address(0);
2401
        let [block_1, block_2_a, block_3_a] = fake_valid_sequence_of_blocks_for_tests(
2402
            &genesis_block,
2403
            Timestamp::hours(1),
2404
            StdRng::seed_from_u64(5550001).random(),
2405
            network,
2406
        )
2407
        .await;
2408
        let [block_2_b, block_3_b] = fake_valid_sequence_of_blocks_for_tests(
2409
            &block_1,
2410
            Timestamp::hours(1),
2411
            StdRng::seed_from_u64(5550002).random(),
2412
            network,
2413
        )
2414
        .await;
2415
        assert_ne!(block_2_b.hash(), block_2_a.hash());
2416

2417
        state_lock.set_new_tip(block_1.clone()).await?;
2418
        state_lock.set_new_tip(block_2_a.clone()).await?;
2419
        state_lock.set_new_tip(block_2_b.clone()).await?;
2420
        state_lock.set_new_tip(block_3_b.clone()).await?;
2421
        state_lock.set_new_tip(block_3_a.clone()).await?;
2422

2423
        let anchor = state_lock
2424
            .lock_guard()
2425
            .await
2426
            .chain
2427
            .archival_state()
2428
            .archival_block_mmr
2429
            .ammr()
2430
            .to_accumulator_async()
2431
            .await;
2432
        let response_1 = {
2433
            let state_lock = state_lock.lock_guard().await;
2434
            PeerLoopHandler::batch_response(
2435
                &state_lock,
2436
                vec![block_1.clone(), block_2_a.clone(), block_3_a.clone()],
2437
                &anchor,
2438
            )
2439
            .await
2440
            .unwrap()
2441
        };
2442

2443
        let mut mock = Mock::new(vec![
2444
            Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch {
2445
                known_blocks: vec![genesis_block.hash()],
2446
                max_response_len: 14,
2447
                anchor: anchor.clone(),
2448
            })),
2449
            Action::Write(PeerMessage::BlockResponseBatch(response_1)),
2450
            Action::Read(PeerMessage::Bye),
2451
        ]);
2452

2453
        let mut peer_loop_handler_1 = PeerLoopHandler::with_mocked_time(
2454
            to_main_tx.clone(),
2455
            state_lock.clone(),
2456
            peer_address,
2457
            hsd.clone(),
2458
            false,
2459
            1,
2460
            block_3_a.header().timestamp,
2461
        );
2462

2463
        peer_loop_handler_1
2464
            .run_wrapper(mock, from_main_rx_clone.resubscribe())
2465
            .await?;
2466

2467
        // Peer knows block 2_b, verify that canonical chain with 2_a is returned
2468
        let response_2 = {
2469
            let state_lock = state_lock.lock_guard().await;
2470
            PeerLoopHandler::batch_response(
2471
                &state_lock,
2472
                vec![block_2_a, block_3_a.clone()],
2473
                &anchor,
2474
            )
2475
            .await
2476
            .unwrap()
2477
        };
2478
        mock = Mock::new(vec![
2479
            Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch {
2480
                known_blocks: vec![block_2_b.hash(), block_1.hash(), genesis_block.hash()],
2481
                max_response_len: 14,
2482
                anchor,
2483
            })),
2484
            Action::Write(PeerMessage::BlockResponseBatch(response_2)),
2485
            Action::Read(PeerMessage::Bye),
2486
        ]);
2487

2488
        let mut peer_loop_handler_2 = PeerLoopHandler::with_mocked_time(
2489
            to_main_tx.clone(),
2490
            state_lock.clone(),
2491
            peer_address,
2492
            hsd,
2493
            false,
2494
            1,
2495
            block_3_a.header().timestamp,
2496
        );
2497

2498
        peer_loop_handler_2
2499
            .run_wrapper(mock, from_main_rx_clone)
2500
            .await?;
2501

2502
        Ok(())
2503
    }
2504

2505
    #[traced_test]
×
2506
    #[apply(shared_tokio_runtime)]
2507
    async fn block_request_batch_out_of_order_test() -> Result<()> {
2508
        // Scenario: A fork began at block 2, node knows two blocks of height 2 and two of height 3.
2509
        // A peer requests a batch of blocks starting from block 1, but the peer supplies their
2510
        // hashes in a wrong order. Ensure that the correct blocks are returned, in the right order.
2511
        // The blocks will be supplied in the correct order but starting from the first digest in
2512
        // the list that is known and canonical.
2513

2514
        let network = Network::Main;
2515
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, mut state_lock, hsd) =
2516
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2517
        let genesis_block = Block::genesis(network);
2518
        let peer_address = get_dummy_socket_address(0);
2519
        let [block_1, block_2_a, block_3_a] = fake_valid_sequence_of_blocks_for_tests(
2520
            &genesis_block,
2521
            Timestamp::hours(1),
2522
            StdRng::seed_from_u64(5550001).random(),
2523
            network,
2524
        )
2525
        .await;
2526
        let [block_2_b, block_3_b] = fake_valid_sequence_of_blocks_for_tests(
2527
            &block_1,
2528
            Timestamp::hours(1),
2529
            StdRng::seed_from_u64(5550002).random(),
2530
            network,
2531
        )
2532
        .await;
2533
        assert_ne!(block_2_a.hash(), block_2_b.hash());
2534

2535
        state_lock.set_new_tip(block_1.clone()).await?;
2536
        state_lock.set_new_tip(block_2_a.clone()).await?;
2537
        state_lock.set_new_tip(block_2_b.clone()).await?;
2538
        state_lock.set_new_tip(block_3_b.clone()).await?;
2539
        state_lock.set_new_tip(block_3_a.clone()).await?;
2540

2541
        // Peer knows block 2_b, verify that canonical chain with 2_a is returned
2542
        let mut expected_anchor = block_3_a.body().block_mmr_accumulator.clone();
2543
        expected_anchor.append(block_3_a.hash());
2544
        let state_anchor = state_lock
2545
            .lock_guard()
2546
            .await
2547
            .chain
2548
            .archival_state()
2549
            .archival_block_mmr
2550
            .ammr()
2551
            .to_accumulator_async()
2552
            .await;
2553
        assert_eq!(
2554
            expected_anchor, state_anchor,
2555
            "Catching assumption about MMRA in tip and in archival state"
2556
        );
2557

2558
        let response = {
2559
            let state_lock = state_lock.lock_guard().await;
2560
            PeerLoopHandler::batch_response(
2561
                &state_lock,
2562
                vec![block_1.clone(), block_2_a, block_3_a.clone()],
2563
                &expected_anchor,
2564
            )
2565
            .await
2566
            .unwrap()
2567
        };
2568
        let mock = Mock::new(vec![
2569
            Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch {
2570
                known_blocks: vec![block_2_b.hash(), genesis_block.hash(), block_1.hash()],
2571
                max_response_len: 14,
2572
                anchor: expected_anchor,
2573
            })),
2574
            // Since genesis block is the 1st known in the list of known blocks,
2575
            // it's immediate descendent, block_1, is the first one returned.
2576
            Action::Write(PeerMessage::BlockResponseBatch(response)),
2577
            Action::Read(PeerMessage::Bye),
2578
        ]);
2579

2580
        let mut peer_loop_handler_2 = PeerLoopHandler::with_mocked_time(
2581
            to_main_tx.clone(),
2582
            state_lock.clone(),
2583
            peer_address,
2584
            hsd,
2585
            false,
2586
            1,
2587
            block_3_a.header().timestamp,
2588
        );
2589

2590
        peer_loop_handler_2
2591
            .run_wrapper(mock, from_main_rx_clone)
2592
            .await?;
2593

2594
        Ok(())
2595
    }
2596

2597
    #[traced_test]
×
2598
    #[apply(shared_tokio_runtime)]
2599
    async fn request_unknown_height_doesnt_crash() {
2600
        // Scenario: Only genesis block is known. Peer requests block of height
2601
        // 2.
2602
        let network = Network::Main;
2603
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, state_lock, hsd) =
2604
            get_test_genesis_setup(network, 0, cli_args::Args::default())
2605
                .await
2606
                .unwrap();
2607
        let peer_address = get_dummy_socket_address(0);
2608
        let mock = Mock::new(vec![
2609
            Action::Read(PeerMessage::BlockRequestByHeight(2.into())),
2610
            Action::Read(PeerMessage::Bye),
2611
        ]);
2612

2613
        let mut peer_loop_handler = PeerLoopHandler::new(
2614
            to_main_tx.clone(),
2615
            state_lock.clone(),
2616
            peer_address,
2617
            hsd,
2618
            false,
2619
            1,
2620
        );
2621

2622
        // This will return error if seen read/write order does not match that of the
2623
        // mocked object.
2624
        peer_loop_handler
2625
            .run_wrapper(mock, from_main_rx_clone)
2626
            .await
2627
            .unwrap();
2628

2629
        // Verify that peer is sanctioned for this nonsense.
2630
        assert!(state_lock
2631
            .lock_guard()
2632
            .await
2633
            .net
2634
            .get_peer_standing_from_database(peer_address.ip())
2635
            .await
2636
            .unwrap()
2637
            .standing
2638
            .is_negative());
2639
    }
2640

2641
    #[traced_test]
×
2642
    #[apply(shared_tokio_runtime)]
2643
    async fn find_canonical_chain_when_multiple_blocks_at_same_height_test() -> Result<()> {
2644
        // Scenario: A fork began at block 2, node knows two blocks of height 2 and two of height 3.
2645
        // A peer requests a block at height 2. Verify that the correct block at height 2 is
2646
        // returned.
2647

2648
        let network = Network::Main;
2649
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, mut state_lock, hsd) =
2650
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2651
        let genesis_block = Block::genesis(network);
2652
        let peer_address = get_dummy_socket_address(0);
2653

2654
        let [block_1, block_2_a, block_3_a] = fake_valid_sequence_of_blocks_for_tests(
2655
            &genesis_block,
2656
            Timestamp::hours(1),
2657
            StdRng::seed_from_u64(5550001).random(),
2658
            network,
2659
        )
2660
        .await;
2661
        let [block_2_b, block_3_b] = fake_valid_sequence_of_blocks_for_tests(
2662
            &block_1,
2663
            Timestamp::hours(1),
2664
            StdRng::seed_from_u64(5550002).random(),
2665
            network,
2666
        )
2667
        .await;
2668
        assert_ne!(block_2_a.hash(), block_2_b.hash());
2669

2670
        state_lock.set_new_tip(block_1.clone()).await?;
2671
        state_lock.set_new_tip(block_2_a.clone()).await?;
2672
        state_lock.set_new_tip(block_2_b.clone()).await?;
2673
        state_lock.set_new_tip(block_3_b.clone()).await?;
2674
        state_lock.set_new_tip(block_3_a.clone()).await?;
2675

2676
        let mock = Mock::new(vec![
2677
            Action::Read(PeerMessage::BlockRequestByHeight(2.into())),
2678
            Action::Write(PeerMessage::Block(Box::new(block_2_a.try_into().unwrap()))),
2679
            Action::Read(PeerMessage::BlockRequestByHeight(3.into())),
2680
            Action::Write(PeerMessage::Block(Box::new(
2681
                block_3_a.clone().try_into().unwrap(),
2682
            ))),
2683
            Action::Read(PeerMessage::Bye),
2684
        ]);
2685

2686
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
2687
            to_main_tx.clone(),
2688
            state_lock.clone(),
2689
            peer_address,
2690
            hsd,
2691
            false,
2692
            1,
2693
            block_3_a.header().timestamp,
2694
        );
2695

2696
        // This will return error if seen read/write order does not match that of the
2697
        // mocked object.
2698
        peer_loop_handler
2699
            .run_wrapper(mock, from_main_rx_clone)
2700
            .await?;
2701

2702
        Ok(())
2703
    }
2704

2705
    #[traced_test]
×
2706
    #[apply(shared_tokio_runtime)]
2707
    async fn receival_of_block_notification_height_1() {
2708
        // Scenario: client only knows genesis block. Then receives block
2709
        // notification of height 1. Must request block 1.
2710
        let network = Network::Main;
2711
        let mut rng = StdRng::seed_from_u64(5552401);
2712
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, to_main_rx1, state_lock, hsd) =
2713
            get_test_genesis_setup(network, 0, cli_args::Args::default())
2714
                .await
2715
                .unwrap();
2716
        let block_1 = fake_valid_block_for_tests(&state_lock, rng.random()).await;
2717
        let notification_height1 = (&block_1).into();
2718
        let mock = Mock::new(vec![
2719
            Action::Read(PeerMessage::BlockNotification(notification_height1)),
2720
            Action::Write(PeerMessage::BlockRequestByHeight(1u64.into())),
2721
            Action::Read(PeerMessage::Bye),
2722
        ]);
2723

2724
        let peer_address = get_dummy_socket_address(0);
2725
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
2726
            to_main_tx.clone(),
2727
            state_lock.clone(),
2728
            peer_address,
2729
            hsd,
2730
            false,
2731
            1,
2732
            block_1.header().timestamp,
2733
        );
2734
        peer_loop_handler
2735
            .run_wrapper(mock, from_main_rx_clone)
2736
            .await
2737
            .unwrap();
2738

2739
        drop(to_main_rx1);
2740
    }
2741

2742
    #[traced_test]
×
2743
    #[apply(shared_tokio_runtime)]
2744
    async fn receive_block_request_by_height_block_7() {
2745
        // Scenario: client only knows blocks up to height 7. Then receives block-
2746
        // request-by-height for height 7. Must respond with block 7.
2747
        let network = Network::Main;
2748
        let mut rng = StdRng::seed_from_u64(5552401);
2749
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, to_main_rx1, mut state_lock, hsd) =
2750
            get_test_genesis_setup(network, 0, cli_args::Args::default())
2751
                .await
2752
                .unwrap();
2753
        let genesis_block = Block::genesis(network);
2754
        let blocks: [Block; 7] = fake_valid_sequence_of_blocks_for_tests(
2755
            &genesis_block,
2756
            Timestamp::hours(1),
2757
            rng.random(),
2758
            network,
2759
        )
2760
        .await;
2761
        let block7 = blocks.last().unwrap().to_owned();
2762
        let tip_height: u64 = block7.header().height.into();
2763
        assert_eq!(7, tip_height);
2764

2765
        for block in &blocks {
2766
            state_lock.set_new_tip(block.to_owned()).await.unwrap();
2767
        }
2768

2769
        let block7_response = PeerMessage::Block(Box::new(block7.try_into().unwrap()));
2770
        let mock = Mock::new(vec![
2771
            Action::Read(PeerMessage::BlockRequestByHeight(7u64.into())),
2772
            Action::Write(block7_response),
2773
            Action::Read(PeerMessage::Bye),
2774
        ]);
2775

2776
        let peer_address = get_dummy_socket_address(0);
2777
        let mut peer_loop_handler = PeerLoopHandler::new(
2778
            to_main_tx.clone(),
2779
            state_lock.clone(),
2780
            peer_address,
2781
            hsd,
2782
            false,
2783
            1,
2784
        );
2785
        peer_loop_handler
2786
            .run_wrapper(mock, from_main_rx_clone)
2787
            .await
2788
            .unwrap();
2789

2790
        drop(to_main_rx1);
2791
    }
2792

2793
    #[traced_test]
×
2794
    #[apply(shared_tokio_runtime)]
2795
    async fn test_peer_loop_receival_of_first_block() -> Result<()> {
2796
        // Scenario: client only knows genesis block. Then receives block 1.
2797

2798
        let network = Network::Main;
2799
        let mut rng = StdRng::seed_from_u64(5550001);
2800
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, mut to_main_rx1, state_lock, hsd) =
2801
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2802
        let peer_address = get_dummy_socket_address(0);
2803

2804
        let block_1 = fake_valid_block_for_tests(&state_lock, rng.random()).await;
2805
        let mock = Mock::new(vec![
2806
            Action::Read(PeerMessage::Block(Box::new(
2807
                block_1.clone().try_into().unwrap(),
2808
            ))),
2809
            Action::Read(PeerMessage::Bye),
2810
        ]);
2811

2812
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
2813
            to_main_tx.clone(),
2814
            state_lock.clone(),
2815
            peer_address,
2816
            hsd,
2817
            false,
2818
            1,
2819
            block_1.header().timestamp,
2820
        );
2821
        peer_loop_handler
2822
            .run_wrapper(mock, from_main_rx_clone)
2823
            .await?;
2824

2825
        // Verify that a block was sent to `main_loop`
2826
        match to_main_rx1.recv().await {
2827
            Some(PeerTaskToMain::NewBlocks(_block)) => (),
2828
            _ => bail!("Did not find msg sent to main task"),
2829
        };
2830

2831
        match to_main_rx1.recv().await {
2832
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
2833
            _ => bail!("Must receive remove of peer block max height"),
2834
        }
2835

2836
        if !state_lock.lock_guard().await.net.peer_map.is_empty() {
2837
            bail!("peer map must be empty after closing connection gracefully");
2838
        }
2839

2840
        Ok(())
2841
    }
2842

2843
    #[traced_test]
×
2844
    #[apply(shared_tokio_runtime)]
2845
    async fn test_peer_loop_receival_of_second_block_no_blocks_in_db() -> Result<()> {
2846
        // In this scenario, the client only knows the genesis block (block 0) and then
2847
        // receives block 2, meaning that block 1 will have to be requested.
2848

2849
        let network = Network::Main;
2850
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, mut to_main_rx1, state_lock, hsd) =
2851
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2852
        let peer_address = get_dummy_socket_address(0);
2853
        let genesis_block: Block = state_lock
2854
            .lock_guard()
2855
            .await
2856
            .chain
2857
            .archival_state()
2858
            .get_tip()
2859
            .await;
2860
        let [block_1, block_2] = fake_valid_sequence_of_blocks_for_tests(
2861
            &genesis_block,
2862
            Timestamp::hours(1),
2863
            StdRng::seed_from_u64(5550001).random(),
2864
            network,
2865
        )
2866
        .await;
2867

2868
        let mock = Mock::new(vec![
2869
            Action::Read(PeerMessage::Block(Box::new(
2870
                block_2.clone().try_into().unwrap(),
2871
            ))),
2872
            Action::Write(PeerMessage::BlockRequestByHash(block_1.hash())),
2873
            Action::Read(PeerMessage::Block(Box::new(
2874
                block_1.clone().try_into().unwrap(),
2875
            ))),
2876
            Action::Read(PeerMessage::Bye),
2877
        ]);
2878

2879
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
2880
            to_main_tx.clone(),
2881
            state_lock.clone(),
2882
            peer_address,
2883
            hsd,
2884
            true,
2885
            1,
2886
            block_2.header().timestamp,
2887
        );
2888
        peer_loop_handler
2889
            .run_wrapper(mock, from_main_rx_clone)
2890
            .await?;
2891

2892
        match to_main_rx1.recv().await {
2893
            Some(PeerTaskToMain::NewBlocks(blocks)) => {
2894
                if blocks[0].hash() != block_1.hash() {
2895
                    bail!("1st received block by main loop must be block 1");
2896
                }
2897
                if blocks[1].hash() != block_2.hash() {
2898
                    bail!("2nd received block by main loop must be block 2");
2899
                }
2900
            }
2901
            _ => bail!("Did not find msg sent to main task 1"),
2902
        };
2903
        match to_main_rx1.recv().await {
2904
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
2905
            _ => bail!("Must receive remove of peer block max height"),
2906
        }
2907

2908
        if !state_lock.lock_guard().await.net.peer_map.is_empty() {
2909
            bail!("peer map must be empty after closing connection gracefully");
2910
        }
2911

2912
        Ok(())
2913
    }
2914

2915
    #[traced_test]
×
2916
    #[apply(shared_tokio_runtime)]
2917
    async fn prevent_ram_exhaustion_test() -> Result<()> {
2918
        // In this scenario the peer sends more blocks than the client allows to store in the
2919
        // fork-reconciliation field. This should result in abandonment of the fork-reconciliation
2920
        // process as the alternative is that the program will crash because it runs out of RAM.
2921

2922
        let network = Network::Main;
2923
        let mut rng = StdRng::seed_from_u64(5550001);
2924
        let (
2925
            _peer_broadcast_tx,
2926
            from_main_rx_clone,
2927
            to_main_tx,
2928
            mut to_main_rx1,
2929
            mut state_lock,
2930
            _hsd,
2931
        ) = get_test_genesis_setup(network, 1, cli_args::Args::default()).await?;
2932
        let genesis_block = Block::genesis(network);
2933

2934
        // Restrict max number of blocks held in memory to 2.
2935
        let mut cli = state_lock.cli().clone();
2936
        cli.sync_mode_threshold = 2;
2937
        state_lock.set_cli(cli).await;
2938

2939
        let (hsd1, peer_address1) = get_dummy_peer_connection_data_genesis(Network::Alpha, 1);
2940
        let [block_1, _block_2, block_3, block_4] = fake_valid_sequence_of_blocks_for_tests(
2941
            &genesis_block,
2942
            Timestamp::hours(1),
2943
            rng.random(),
2944
            network,
2945
        )
2946
        .await;
2947
        state_lock.set_new_tip(block_1.clone()).await?;
2948

2949
        let mock = Mock::new(vec![
2950
            Action::Read(PeerMessage::Block(Box::new(
2951
                block_4.clone().try_into().unwrap(),
2952
            ))),
2953
            Action::Write(PeerMessage::BlockRequestByHash(block_3.hash())),
2954
            Action::Read(PeerMessage::Block(Box::new(
2955
                block_3.clone().try_into().unwrap(),
2956
            ))),
2957
            Action::Read(PeerMessage::Bye),
2958
        ]);
2959

2960
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
2961
            to_main_tx.clone(),
2962
            state_lock.clone(),
2963
            peer_address1,
2964
            hsd1,
2965
            true,
2966
            1,
2967
            block_4.header().timestamp,
2968
        );
2969
        peer_loop_handler
2970
            .run_wrapper(mock, from_main_rx_clone)
2971
            .await?;
2972

2973
        match to_main_rx1.recv().await {
2974
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
2975
            _ => bail!("Must receive remove of peer block max height"),
2976
        }
2977

2978
        // Verify that no block is sent to main loop.
2979
        match to_main_rx1.try_recv() {
2980
            Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (),
2981
            _ => bail!("Peer must not handle more fork-reconciliation blocks than specified in CLI arguments"),
2982
        };
2983
        drop(to_main_tx);
2984

2985
        // Verify that peer is sanctioned for failed fork reconciliation attempt
2986
        assert!(state_lock
2987
            .lock_guard()
2988
            .await
2989
            .net
2990
            .get_peer_standing_from_database(peer_address1.ip())
2991
            .await
2992
            .unwrap()
2993
            .standing
2994
            .is_negative());
2995

2996
        Ok(())
2997
    }
2998

2999
    #[traced_test]
×
3000
    #[apply(shared_tokio_runtime)]
3001
    async fn test_peer_loop_receival_of_fourth_block_one_block_in_db() {
3002
        // In this scenario, the client know the genesis block (block 0) and block 1, it
3003
        // then receives block 4, meaning that block 3 and 2 will have to be requested.
3004

3005
        let network = Network::Main;
3006
        let (
3007
            _peer_broadcast_tx,
3008
            from_main_rx_clone,
3009
            to_main_tx,
3010
            mut to_main_rx1,
3011
            mut state_lock,
3012
            hsd,
3013
        ) = get_test_genesis_setup(network, 0, cli_args::Args::default())
3014
            .await
3015
            .unwrap();
3016
        let peer_address: SocketAddr = get_dummy_socket_address(0);
3017
        let genesis_block = Block::genesis(network);
3018
        let [block_1, block_2, block_3, block_4] = fake_valid_sequence_of_blocks_for_tests(
3019
            &genesis_block,
3020
            Timestamp::hours(1),
3021
            StdRng::seed_from_u64(5550001).random(),
3022
            network,
3023
        )
3024
        .await;
3025
        state_lock.set_new_tip(block_1.clone()).await.unwrap();
3026

3027
        let mock = Mock::new(vec![
3028
            Action::Read(PeerMessage::Block(Box::new(
3029
                block_4.clone().try_into().unwrap(),
3030
            ))),
3031
            Action::Write(PeerMessage::BlockRequestByHash(block_3.hash())),
3032
            Action::Read(PeerMessage::Block(Box::new(
3033
                block_3.clone().try_into().unwrap(),
3034
            ))),
3035
            Action::Write(PeerMessage::BlockRequestByHash(block_2.hash())),
3036
            Action::Read(PeerMessage::Block(Box::new(
3037
                block_2.clone().try_into().unwrap(),
3038
            ))),
3039
            Action::Read(PeerMessage::Bye),
3040
        ]);
3041

3042
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
3043
            to_main_tx.clone(),
3044
            state_lock.clone(),
3045
            peer_address,
3046
            hsd,
3047
            true,
3048
            1,
3049
            block_4.header().timestamp,
3050
        );
3051
        peer_loop_handler
3052
            .run_wrapper(mock, from_main_rx_clone)
3053
            .await
3054
            .unwrap();
3055

3056
        let Some(PeerTaskToMain::NewBlocks(blocks)) = to_main_rx1.recv().await else {
3057
            panic!("Did not find msg sent to main task");
3058
        };
3059
        assert_eq!(blocks[0].hash(), block_2.hash());
3060
        assert_eq!(blocks[1].hash(), block_3.hash());
3061
        assert_eq!(blocks[2].hash(), block_4.hash());
3062

3063
        let Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) = to_main_rx1.recv().await else {
3064
            panic!("Must receive remove of peer block max height");
3065
        };
3066

3067
        assert!(
3068
            state_lock.lock_guard().await.net.peer_map.is_empty(),
3069
            "peer map must be empty after closing connection gracefully"
3070
        );
3071
    }
3072

3073
    #[traced_test]
×
3074
    #[apply(shared_tokio_runtime)]
3075
    async fn test_peer_loop_receival_of_third_block_no_blocks_in_db() -> Result<()> {
3076
        // In this scenario, the client only knows the genesis block (block 0) and then
3077
        // receives block 3, meaning that block 2 and 1 will have to be requested.
3078

3079
        let network = Network::Main;
3080
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, mut to_main_rx1, state_lock, hsd) =
3081
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
3082
        let peer_address = get_dummy_socket_address(0);
3083
        let genesis_block = Block::genesis(network);
3084

3085
        let [block_1, block_2, block_3] = fake_valid_sequence_of_blocks_for_tests(
3086
            &genesis_block,
3087
            Timestamp::hours(1),
3088
            StdRng::seed_from_u64(5550001).random(),
3089
            network,
3090
        )
3091
        .await;
3092

3093
        let mock = Mock::new(vec![
3094
            Action::Read(PeerMessage::Block(Box::new(
3095
                block_3.clone().try_into().unwrap(),
3096
            ))),
3097
            Action::Write(PeerMessage::BlockRequestByHash(block_2.hash())),
3098
            Action::Read(PeerMessage::Block(Box::new(
3099
                block_2.clone().try_into().unwrap(),
3100
            ))),
3101
            Action::Write(PeerMessage::BlockRequestByHash(block_1.hash())),
3102
            Action::Read(PeerMessage::Block(Box::new(
3103
                block_1.clone().try_into().unwrap(),
3104
            ))),
3105
            Action::Read(PeerMessage::Bye),
3106
        ]);
3107

3108
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
3109
            to_main_tx.clone(),
3110
            state_lock.clone(),
3111
            peer_address,
3112
            hsd,
3113
            true,
3114
            1,
3115
            block_3.header().timestamp,
3116
        );
3117
        peer_loop_handler
3118
            .run_wrapper(mock, from_main_rx_clone)
3119
            .await?;
3120

3121
        match to_main_rx1.recv().await {
3122
            Some(PeerTaskToMain::NewBlocks(blocks)) => {
3123
                if blocks[0].hash() != block_1.hash() {
3124
                    bail!("1st received block by main loop must be block 1");
3125
                }
3126
                if blocks[1].hash() != block_2.hash() {
3127
                    bail!("2nd received block by main loop must be block 2");
3128
                }
3129
                if blocks[2].hash() != block_3.hash() {
3130
                    bail!("3rd received block by main loop must be block 3");
3131
                }
3132
            }
3133
            _ => bail!("Did not find msg sent to main task"),
3134
        };
3135
        match to_main_rx1.recv().await {
3136
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
3137
            _ => bail!("Must receive remove of peer block max height"),
3138
        }
3139

3140
        if !state_lock.lock_guard().await.net.peer_map.is_empty() {
3141
            bail!("peer map must be empty after closing connection gracefully");
3142
        }
3143

3144
        Ok(())
3145
    }
3146

3147
    #[traced_test]
×
3148
    #[apply(shared_tokio_runtime)]
3149
    async fn test_block_reconciliation_interrupted_by_block_notification() -> Result<()> {
3150
        // In this scenario, the client know the genesis block (block 0) and block 1, it
3151
        // then receives block 4, meaning that block 3, 2, and 1 will have to be requested.
3152
        // But the requests are interrupted by the peer sending another message: a new block
3153
        // notification.
3154

3155
        let network = Network::Main;
3156
        let (
3157
            _peer_broadcast_tx,
3158
            from_main_rx_clone,
3159
            to_main_tx,
3160
            mut to_main_rx1,
3161
            mut state_lock,
3162
            hsd,
3163
        ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
3164
        let peer_socket_address: SocketAddr = get_dummy_socket_address(0);
3165
        let genesis_block: Block = state_lock
3166
            .lock_guard()
3167
            .await
3168
            .chain
3169
            .archival_state()
3170
            .get_tip()
3171
            .await;
3172

3173
        let [block_1, block_2, block_3, block_4, block_5] =
3174
            fake_valid_sequence_of_blocks_for_tests(
3175
                &genesis_block,
3176
                Timestamp::hours(1),
3177
                StdRng::seed_from_u64(5550001).random(),
3178
                network,
3179
            )
3180
            .await;
3181
        state_lock.set_new_tip(block_1.clone()).await?;
3182

3183
        let mock = Mock::new(vec![
3184
            Action::Read(PeerMessage::Block(Box::new(
3185
                block_4.clone().try_into().unwrap(),
3186
            ))),
3187
            Action::Write(PeerMessage::BlockRequestByHash(block_3.hash())),
3188
            Action::Read(PeerMessage::Block(Box::new(
3189
                block_3.clone().try_into().unwrap(),
3190
            ))),
3191
            Action::Write(PeerMessage::BlockRequestByHash(block_2.hash())),
3192
            //
3193
            // Now make the interruption of the block reconciliation process
3194
            Action::Read(PeerMessage::BlockNotification((&block_5).into())),
3195
            //
3196
            // Complete the block reconciliation process by requesting the last block
3197
            // in this process, to get back to a mutually known block.
3198
            Action::Read(PeerMessage::Block(Box::new(
3199
                block_2.clone().try_into().unwrap(),
3200
            ))),
3201
            //
3202
            // Then anticipate the request of the block that was announced
3203
            // in the interruption.
3204
            // Note that we cannot anticipate the response, as only the main
3205
            // task writes to the database. And the database needs to be updated
3206
            // for the handling of block 5 to be done correctly.
3207
            Action::Write(PeerMessage::BlockRequestByHeight(
3208
                block_5.kernel.header.height,
3209
            )),
3210
            Action::Read(PeerMessage::Bye),
3211
        ]);
3212

3213
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
3214
            to_main_tx.clone(),
3215
            state_lock.clone(),
3216
            peer_socket_address,
3217
            hsd,
3218
            false,
3219
            1,
3220
            block_5.header().timestamp,
3221
        );
3222
        peer_loop_handler
3223
            .run_wrapper(mock, from_main_rx_clone)
3224
            .await?;
3225

3226
        match to_main_rx1.recv().await {
3227
            Some(PeerTaskToMain::NewBlocks(blocks)) => {
3228
                if blocks[0].hash() != block_2.hash() {
3229
                    bail!("1st received block by main loop must be block 1");
3230
                }
3231
                if blocks[1].hash() != block_3.hash() {
3232
                    bail!("2nd received block by main loop must be block 2");
3233
                }
3234
                if blocks[2].hash() != block_4.hash() {
3235
                    bail!("3rd received block by main loop must be block 3");
3236
                }
3237
            }
3238
            _ => bail!("Did not find msg sent to main task"),
3239
        };
3240
        match to_main_rx1.recv().await {
3241
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
3242
            _ => bail!("Must receive remove of peer block max height"),
3243
        }
3244

3245
        if !state_lock.lock_guard().await.net.peer_map.is_empty() {
3246
            bail!("peer map must be empty after closing connection gracefully");
3247
        }
3248

3249
        Ok(())
3250
    }
3251

3252
    #[traced_test]
×
3253
    #[apply(shared_tokio_runtime)]
3254
    async fn test_block_reconciliation_interrupted_by_peer_list_request() -> Result<()> {
3255
        // In this scenario, the client knows the genesis block (block 0) and block 1, it
3256
        // then receives block 4, meaning that block 3, 2, and 1 will have to be requested.
3257
        // But the requests are interrupted by the peer sending another message: a request
3258
        // for a list of peers.
3259

3260
        let network = Network::Main;
3261
        let (
3262
            _peer_broadcast_tx,
3263
            from_main_rx_clone,
3264
            to_main_tx,
3265
            mut to_main_rx1,
3266
            mut state_lock,
3267
            _hsd,
3268
        ) = get_test_genesis_setup(network, 1, cli_args::Args::default()).await?;
3269
        let genesis_block = Block::genesis(network);
3270
        let peer_infos: Vec<PeerInfo> = state_lock
3271
            .lock_guard()
3272
            .await
3273
            .net
3274
            .peer_map
3275
            .clone()
3276
            .into_values()
3277
            .collect::<Vec<_>>();
3278

3279
        let [block_1, block_2, block_3, block_4] = fake_valid_sequence_of_blocks_for_tests(
3280
            &genesis_block,
3281
            Timestamp::hours(1),
3282
            StdRng::seed_from_u64(5550001).random(),
3283
            network,
3284
        )
3285
        .await;
3286
        state_lock.set_new_tip(block_1.clone()).await?;
3287

3288
        let (hsd_1, sa_1) = get_dummy_peer_connection_data_genesis(network, 1);
3289
        let expected_peer_list_resp = vec![
3290
            (
3291
                peer_infos[0].listen_address().unwrap(),
3292
                peer_infos[0].instance_id(),
3293
            ),
3294
            (sa_1, hsd_1.instance_id),
3295
        ];
3296
        let mock = Mock::new(vec![
3297
            Action::Read(PeerMessage::Block(Box::new(
3298
                block_4.clone().try_into().unwrap(),
3299
            ))),
3300
            Action::Write(PeerMessage::BlockRequestByHash(block_3.hash())),
3301
            Action::Read(PeerMessage::Block(Box::new(
3302
                block_3.clone().try_into().unwrap(),
3303
            ))),
3304
            Action::Write(PeerMessage::BlockRequestByHash(block_2.hash())),
3305
            //
3306
            // Now make the interruption of the block reconciliation process
3307
            Action::Read(PeerMessage::PeerListRequest),
3308
            //
3309
            // Answer the request for a peer list
3310
            Action::Write(PeerMessage::PeerListResponse(expected_peer_list_resp)),
3311
            //
3312
            // Complete the block reconciliation process by requesting the last block
3313
            // in this process, to get back to a mutually known block.
3314
            Action::Read(PeerMessage::Block(Box::new(
3315
                block_2.clone().try_into().unwrap(),
3316
            ))),
3317
            Action::Read(PeerMessage::Bye),
3318
        ]);
3319

3320
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
3321
            to_main_tx,
3322
            state_lock.clone(),
3323
            sa_1,
3324
            hsd_1,
3325
            true,
3326
            1,
3327
            block_4.header().timestamp,
3328
        );
3329
        peer_loop_handler
3330
            .run_wrapper(mock, from_main_rx_clone)
3331
            .await?;
3332

3333
        // Verify that blocks are sent to `main_loop` in expected ordering
3334
        match to_main_rx1.recv().await {
3335
            Some(PeerTaskToMain::NewBlocks(blocks)) => {
3336
                if blocks[0].hash() != block_2.hash() {
3337
                    bail!("1st received block by main loop must be block 1");
3338
                }
3339
                if blocks[1].hash() != block_3.hash() {
3340
                    bail!("2nd received block by main loop must be block 2");
3341
                }
3342
                if blocks[2].hash() != block_4.hash() {
3343
                    bail!("3rd received block by main loop must be block 3");
3344
                }
3345
            }
3346
            _ => bail!("Did not find msg sent to main task"),
3347
        };
3348

3349
        assert_eq!(
3350
            1,
3351
            state_lock.lock_guard().await.net.peer_map.len(),
3352
            "One peer must remain in peer list after peer_1 closed gracefully"
3353
        );
3354

3355
        Ok(())
3356
    }
3357

3358
    #[traced_test]
×
3359
    #[apply(shared_tokio_runtime)]
3360
    async fn receive_transaction_request() {
3361
        let network = Network::Main;
3362
        let dummy_tx = invalid_empty_single_proof_transaction();
3363
        let txid = dummy_tx.kernel.txid();
3364

3365
        for transaction_is_known in [false, true] {
3366
            let (_peer_broadcast_tx, from_main_rx, to_main_tx, _, mut state_lock, _hsd) =
3367
                get_test_genesis_setup(network, 1, cli_args::Args::default())
3368
                    .await
3369
                    .unwrap();
3370
            if transaction_is_known {
3371
                state_lock
3372
                    .lock_guard_mut()
3373
                    .await
3374
                    .mempool_insert(dummy_tx.clone(), TransactionOrigin::Own)
3375
                    .await;
3376
            }
3377

3378
            let mock = if transaction_is_known {
3379
                Mock::new(vec![
3380
                    Action::Read(PeerMessage::TransactionRequest(txid)),
3381
                    Action::Write(PeerMessage::Transaction(Box::new(
3382
                        (&dummy_tx).try_into().unwrap(),
3383
                    ))),
3384
                    Action::Read(PeerMessage::Bye),
3385
                ])
3386
            } else {
3387
                Mock::new(vec![
3388
                    Action::Read(PeerMessage::TransactionRequest(txid)),
3389
                    Action::Read(PeerMessage::Bye),
3390
                ])
3391
            };
3392

3393
            let hsd = get_dummy_handshake_data_for_genesis(network);
3394
            let mut peer_state = MutablePeerState::new(hsd.tip_header.height);
3395
            let mut peer_loop_handler = PeerLoopHandler::new(
3396
                to_main_tx,
3397
                state_lock,
3398
                get_dummy_socket_address(0),
3399
                hsd,
3400
                true,
3401
                1,
3402
            );
3403

3404
            peer_loop_handler
3405
                .run(mock, from_main_rx, &mut peer_state)
3406
                .await
3407
                .unwrap();
3408
        }
3409
    }
3410

3411
    #[traced_test]
×
3412
    #[apply(shared_tokio_runtime)]
3413
    async fn empty_mempool_request_tx_test() {
3414
        // In this scenario the client receives a transaction notification from
3415
        // a peer of a transaction it doesn't know; the client must then request it.
3416

3417
        let network = Network::Main;
3418
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, mut to_main_rx1, state_lock, _hsd) =
3419
            get_test_genesis_setup(network, 1, cli_args::Args::default())
3420
                .await
3421
                .unwrap();
3422

3423
        let spending_key = state_lock
3424
            .lock_guard()
3425
            .await
3426
            .wallet_state
3427
            .wallet_entropy
3428
            .nth_symmetric_key_for_tests(0);
3429
        let genesis_block = Block::genesis(network);
3430
        let now = genesis_block.kernel.header.timestamp;
3431
        let config = TxCreationConfig::default()
3432
            .recover_change_off_chain(spending_key.into())
3433
            .with_prover_capability(TxProvingCapability::ProofCollection);
3434
        let transaction_1: Transaction = state_lock
3435
            .api()
3436
            .tx_initiator_internal()
3437
            .create_transaction(
3438
                Default::default(),
3439
                NativeCurrencyAmount::coins(0),
3440
                now,
3441
                config,
3442
            )
3443
            .await
3444
            .unwrap()
3445
            .transaction
3446
            .into();
3447

3448
        // Build the resulting transaction notification
3449
        let tx_notification: TransactionNotification = (&transaction_1).try_into().unwrap();
3450
        let mock = Mock::new(vec![
3451
            Action::Read(PeerMessage::TransactionNotification(tx_notification)),
3452
            Action::Write(PeerMessage::TransactionRequest(tx_notification.txid)),
3453
            Action::Read(PeerMessage::Transaction(Box::new(
3454
                (&transaction_1).try_into().unwrap(),
3455
            ))),
3456
            Action::Read(PeerMessage::Bye),
3457
        ]);
3458

3459
        let (hsd_1, _sa_1) = get_dummy_peer_connection_data_genesis(network, 1);
3460

3461
        // Mock a timestamp to allow transaction to be considered valid
3462
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
3463
            to_main_tx,
3464
            state_lock.clone(),
3465
            get_dummy_socket_address(0),
3466
            hsd_1.clone(),
3467
            true,
3468
            1,
3469
            now,
3470
        );
3471

3472
        let mut peer_state = MutablePeerState::new(hsd_1.tip_header.height);
3473

3474
        assert!(
3475
            state_lock.lock_guard().await.mempool.is_empty(),
3476
            "Mempool must be empty at init"
3477
        );
3478
        peer_loop_handler
3479
            .run(mock, from_main_rx_clone, &mut peer_state)
3480
            .await
3481
            .unwrap();
3482

3483
        // Transaction must be sent to `main_loop`. The transaction is stored to the mempool
3484
        // by the `main_loop`.
3485
        match to_main_rx1.recv().await {
3486
            Some(PeerTaskToMain::Transaction(_)) => (),
3487
            _ => panic!("Must receive remove of peer block max height"),
3488
        };
3489
    }
3490

3491
    #[traced_test]
×
3492
    #[apply(shared_tokio_runtime)]
3493
    async fn populated_mempool_request_tx_test() -> Result<()> {
3494
        // In this scenario the peer is informed of a transaction that it already knows
3495

3496
        let network = Network::Main;
3497
        let (
3498
            _peer_broadcast_tx,
3499
            from_main_rx_clone,
3500
            to_main_tx,
3501
            mut to_main_rx1,
3502
            mut state_lock,
3503
            _hsd,
3504
        ) = get_test_genesis_setup(network, 1, cli_args::Args::default())
3505
            .await
3506
            .unwrap();
3507
        let spending_key = state_lock
3508
            .lock_guard()
3509
            .await
3510
            .wallet_state
3511
            .wallet_entropy
3512
            .nth_symmetric_key_for_tests(0);
3513

3514
        let genesis_block = Block::genesis(network);
3515
        let now = genesis_block.kernel.header.timestamp;
3516
        let config = TxCreationConfig::default()
3517
            .recover_change_off_chain(spending_key.into())
3518
            .with_prover_capability(TxProvingCapability::ProofCollection);
3519
        let transaction_1: Transaction = state_lock
3520
            .api()
3521
            .tx_initiator_internal()
3522
            .create_transaction(
3523
                Default::default(),
3524
                NativeCurrencyAmount::coins(0),
3525
                now,
3526
                config,
3527
            )
3528
            .await
3529
            .unwrap()
3530
            .transaction
3531
            .into();
3532

3533
        let (hsd_1, _sa_1) = get_dummy_peer_connection_data_genesis(network, 1);
3534
        let mut peer_loop_handler = PeerLoopHandler::new(
3535
            to_main_tx,
3536
            state_lock.clone(),
3537
            get_dummy_socket_address(0),
3538
            hsd_1.clone(),
3539
            true,
3540
            1,
3541
        );
3542
        let mut peer_state = MutablePeerState::new(hsd_1.tip_header.height);
3543

3544
        assert!(
3545
            state_lock.lock_guard().await.mempool.is_empty(),
3546
            "Mempool must be empty at init"
3547
        );
3548
        state_lock
3549
            .lock_guard_mut()
3550
            .await
3551
            .mempool_insert(transaction_1.clone(), TransactionOrigin::Foreign)
3552
            .await;
3553
        assert!(
3554
            !state_lock.lock_guard().await.mempool.is_empty(),
3555
            "Mempool must be non-empty after insertion"
3556
        );
3557

3558
        // Run the peer loop and verify expected exchange -- namely that the
3559
        // tx notification is received and the the transaction is *not*
3560
        // requested.
3561
        let tx_notification: TransactionNotification = (&transaction_1).try_into().unwrap();
3562
        let mock = Mock::new(vec![
3563
            Action::Read(PeerMessage::TransactionNotification(tx_notification)),
3564
            Action::Read(PeerMessage::Bye),
3565
        ]);
3566
        peer_loop_handler
3567
            .run(mock, from_main_rx_clone, &mut peer_state)
3568
            .await
3569
            .unwrap();
3570

3571
        // nothing is allowed to be sent to `main_loop`
3572
        match to_main_rx1.try_recv() {
3573
            Err(TryRecvError::Empty) => (),
3574
            Err(TryRecvError::Disconnected) => panic!("to_main channel must still be open"),
3575
            Ok(_) => panic!("to_main channel must be empty"),
3576
        };
3577
        Ok(())
3578
    }
3579

3580
    mod block_proposals {
3581
        use super::*;
3582
        use crate::tests::shared::get_dummy_handshake_data_for_genesis;
3583

3584
        struct TestSetup {
3585
            peer_loop_handler: PeerLoopHandler,
3586
            to_main_rx: mpsc::Receiver<PeerTaskToMain>,
3587
            from_main_rx: broadcast::Receiver<MainToPeerTask>,
3588
            peer_state: MutablePeerState,
3589
            to_main_tx: mpsc::Sender<PeerTaskToMain>,
3590
            genesis_block: Block,
3591
            peer_broadcast_tx: broadcast::Sender<MainToPeerTask>,
3592
        }
3593

3594
        async fn genesis_setup(network: Network) -> TestSetup {
2✔
3595
            let (peer_broadcast_tx, from_main_rx, to_main_tx, to_main_rx, alice, _hsd) =
2✔
3596
                get_test_genesis_setup(network, 0, cli_args::Args::default())
2✔
3597
                    .await
2✔
3598
                    .unwrap();
2✔
3599
            let peer_hsd = get_dummy_handshake_data_for_genesis(network);
2✔
3600
            let peer_loop_handler = PeerLoopHandler::new(
2✔
3601
                to_main_tx.clone(),
2✔
3602
                alice.clone(),
2✔
3603
                get_dummy_socket_address(0),
2✔
3604
                peer_hsd.clone(),
2✔
3605
                true,
2✔
3606
                1,
2✔
3607
            );
2✔
3608
            let peer_state = MutablePeerState::new(peer_hsd.tip_header.height);
2✔
3609

2✔
3610
            // (peer_loop_handler, to_main_rx1)
2✔
3611
            TestSetup {
2✔
3612
                peer_broadcast_tx,
2✔
3613
                peer_loop_handler,
2✔
3614
                to_main_rx,
2✔
3615
                from_main_rx,
2✔
3616
                peer_state,
2✔
3617
                to_main_tx,
2✔
3618
                genesis_block: Block::genesis(network),
2✔
3619
            }
2✔
3620
        }
2✔
3621

3622
        #[traced_test]
×
3623
        #[apply(shared_tokio_runtime)]
3624
        async fn accept_block_proposal_height_one() {
3625
            // Node knows genesis block, receives a block proposal for block 1
3626
            // and must accept this. Verify that main loop is informed of block
3627
            // proposal.
3628
            let TestSetup {
3629
                peer_broadcast_tx,
3630
                mut peer_loop_handler,
3631
                mut to_main_rx,
3632
                from_main_rx,
3633
                mut peer_state,
3634
                to_main_tx,
3635
                genesis_block,
3636
            } = genesis_setup(Network::Main).await;
3637
            let block1 = fake_valid_block_for_tests(
3638
                &peer_loop_handler.global_state_lock,
3639
                StdRng::seed_from_u64(5550001).random(),
3640
            )
3641
            .await;
3642

3643
            let mock = Mock::new(vec![
3644
                Action::Read(PeerMessage::BlockProposal(Box::new(block1))),
3645
                Action::Read(PeerMessage::Bye),
3646
            ]);
3647
            peer_loop_handler
3648
                .run(mock, from_main_rx, &mut peer_state)
3649
                .await
3650
                .unwrap();
3651

3652
            match to_main_rx.try_recv().unwrap() {
3653
                PeerTaskToMain::BlockProposal(block) => {
3654
                    assert_eq!(genesis_block.hash(), block.header().prev_block_digest);
3655
                }
3656
                _ => panic!("Expected main loop to be informed of block proposal"),
3657
            };
3658

3659
            drop(to_main_tx);
3660
            drop(peer_broadcast_tx);
3661
        }
3662

3663
        #[traced_test]
×
3664
        #[apply(shared_tokio_runtime)]
3665
        async fn accept_block_proposal_notification_height_one() {
3666
            // Node knows genesis block, receives a block proposal notification
3667
            // for block 1 and must accept this by requesting the block
3668
            // proposal from peer.
3669
            let TestSetup {
3670
                peer_broadcast_tx,
3671
                mut peer_loop_handler,
3672
                to_main_rx: _,
3673
                from_main_rx,
3674
                mut peer_state,
3675
                to_main_tx,
3676
                ..
3677
            } = genesis_setup(Network::Main).await;
3678
            let block1 = fake_valid_block_for_tests(
3679
                &peer_loop_handler.global_state_lock,
3680
                StdRng::seed_from_u64(5550001).random(),
3681
            )
3682
            .await;
3683

3684
            let mock = Mock::new(vec![
3685
                Action::Read(PeerMessage::BlockProposalNotification((&block1).into())),
3686
                Action::Write(PeerMessage::BlockProposalRequest(
3687
                    BlockProposalRequest::new(block1.body().mast_hash()),
3688
                )),
3689
                Action::Read(PeerMessage::Bye),
3690
            ]);
3691
            peer_loop_handler
3692
                .run(mock, from_main_rx, &mut peer_state)
3693
                .await
3694
                .unwrap();
3695

3696
            drop(to_main_tx);
3697
            drop(peer_broadcast_tx);
3698
        }
3699
    }
3700

3701
    mod proof_qualities {
3702
        use strum::IntoEnumIterator;
3703

3704
        use super::*;
3705
        use crate::config_models::cli_args;
3706
        use crate::models::blockchain::transaction::Transaction;
3707
        use crate::models::peer::transfer_transaction::TransactionProofQuality;
3708
        use crate::models::state::wallet::transaction_output::TxOutput;
3709
        use crate::tests::shared::mock_genesis_global_state;
3710

3711
        async fn tx_of_proof_quality(
2✔
3712
            network: Network,
2✔
3713
            quality: TransactionProofQuality,
2✔
3714
        ) -> Transaction {
2✔
3715
            let wallet_secret = WalletEntropy::devnet_wallet();
2✔
3716
            let alice_key = wallet_secret.nth_generation_spending_key_for_tests(0);
2✔
3717
            let alice_gsl =
2✔
3718
                mock_genesis_global_state(network, 1, wallet_secret, cli_args::Args::default())
2✔
3719
                    .await;
2✔
3720
            let alice = alice_gsl.lock_guard().await;
2✔
3721
            let genesis_block = alice.chain.light_state();
2✔
3722
            let in_seven_months = genesis_block.header().timestamp + Timestamp::months(7);
2✔
3723
            let prover_capability = match quality {
2✔
3724
                TransactionProofQuality::ProofCollection => TxProvingCapability::ProofCollection,
1✔
3725
                TransactionProofQuality::SingleProof => TxProvingCapability::SingleProof,
1✔
3726
            };
3727
            let config = TxCreationConfig::default()
2✔
3728
                .recover_change_off_chain(alice_key.into())
2✔
3729
                .with_prover_capability(prover_capability);
2✔
3730
            alice_gsl
2✔
3731
                .api()
2✔
3732
                .tx_initiator_internal()
2✔
3733
                .create_transaction(
2✔
3734
                    Vec::<TxOutput>::new().into(),
2✔
3735
                    NativeCurrencyAmount::coins(1),
2✔
3736
                    in_seven_months,
2✔
3737
                    config,
2✔
3738
                )
2✔
3739
                .await
2✔
3740
                .unwrap()
2✔
3741
                .transaction()
2✔
3742
                .clone()
2✔
3743
        }
2✔
3744

3745
        #[traced_test]
×
3746
        #[apply(shared_tokio_runtime)]
3747
        async fn client_favors_higher_proof_quality() {
3748
            // In this scenario the peer is informed of a transaction that it
3749
            // already knows, and it's tested that it checks the proof quality
3750
            // field and verifies that it exceeds the proof in the mempool
3751
            // before requesting the transasction.
3752
            let network = Network::Main;
3753
            let proof_collection_tx =
3754
                tx_of_proof_quality(network, TransactionProofQuality::ProofCollection).await;
3755
            let single_proof_tx =
3756
                tx_of_proof_quality(network, TransactionProofQuality::SingleProof).await;
3757

3758
            for (own_tx_pq, new_tx_pq) in
3759
                TransactionProofQuality::iter().cartesian_product(TransactionProofQuality::iter())
3760
            {
3761
                use TransactionProofQuality::*;
3762

3763
                let (
3764
                    _peer_broadcast_tx,
3765
                    from_main_rx_clone,
3766
                    to_main_tx,
3767
                    mut to_main_rx1,
3768
                    mut alice,
3769
                    handshake_data,
3770
                ) = get_test_genesis_setup(network, 1, cli_args::Args::default())
3771
                    .await
3772
                    .unwrap();
3773

3774
                let (own_tx, new_tx) = match (own_tx_pq, new_tx_pq) {
3775
                    (ProofCollection, ProofCollection) => {
3776
                        (&proof_collection_tx, &proof_collection_tx)
3777
                    }
3778
                    (ProofCollection, SingleProof) => (&proof_collection_tx, &single_proof_tx),
3779
                    (SingleProof, ProofCollection) => (&single_proof_tx, &proof_collection_tx),
3780
                    (SingleProof, SingleProof) => (&single_proof_tx, &single_proof_tx),
3781
                };
3782

3783
                alice
3784
                    .lock_guard_mut()
3785
                    .await
3786
                    .mempool_insert((*own_tx).to_owned(), TransactionOrigin::Foreign)
3787
                    .await;
3788

3789
                let tx_notification: TransactionNotification = new_tx.try_into().unwrap();
3790

3791
                let own_proof_is_supreme = own_tx_pq >= new_tx_pq;
3792
                let mock = if own_proof_is_supreme {
3793
                    Mock::new(vec![
3794
                        Action::Read(PeerMessage::TransactionNotification(tx_notification)),
3795
                        Action::Read(PeerMessage::Bye),
3796
                    ])
3797
                } else {
3798
                    Mock::new(vec![
3799
                        Action::Read(PeerMessage::TransactionNotification(tx_notification)),
3800
                        Action::Write(PeerMessage::TransactionRequest(tx_notification.txid)),
3801
                        Action::Read(PeerMessage::Transaction(Box::new(
3802
                            new_tx.try_into().unwrap(),
3803
                        ))),
3804
                        Action::Read(PeerMessage::Bye),
3805
                    ])
3806
                };
3807

3808
                let now = proof_collection_tx.kernel.timestamp;
3809
                let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
3810
                    to_main_tx,
3811
                    alice.clone(),
3812
                    get_dummy_socket_address(0),
3813
                    handshake_data.clone(),
3814
                    true,
3815
                    1,
3816
                    now,
3817
                );
3818
                let mut peer_state = MutablePeerState::new(handshake_data.tip_header.height);
3819

3820
                peer_loop_handler
3821
                    .run(mock, from_main_rx_clone, &mut peer_state)
3822
                    .await
3823
                    .unwrap();
3824

3825
                if own_proof_is_supreme {
3826
                    match to_main_rx1.try_recv() {
3827
                        Err(TryRecvError::Empty) => (),
3828
                        Err(TryRecvError::Disconnected) => {
3829
                            panic!("to_main channel must still be open")
3830
                        }
3831
                        Ok(_) => panic!("to_main channel must be empty"),
3832
                    }
3833
                } else {
3834
                    match to_main_rx1.try_recv() {
3835
                        Err(TryRecvError::Empty) => panic!("Transaction must be sent to main loop"),
3836
                        Err(TryRecvError::Disconnected) => {
3837
                            panic!("to_main channel must still be open")
3838
                        }
3839
                        Ok(PeerTaskToMain::Transaction(_)) => (),
3840
                        _ => panic!("Unexpected result from channel"),
3841
                    }
3842
                }
3843
            }
3844
        }
3845
    }
3846

3847
    mod sync_challenges {
3848
        use super::*;
3849
        use crate::tests::shared::fake_valid_sequence_of_blocks_for_tests_dyn;
3850

3851
        #[traced_test]
×
3852
        #[apply(shared_tokio_runtime)]
3853
        async fn bad_sync_challenge_height_greater_than_tip() {
3854
            // Criterium: Challenge height may not exceed that of tip in the
3855
            // request.
3856

3857
            let network = Network::Main;
3858
            let (
3859
                _alice_main_to_peer_tx,
3860
                alice_main_to_peer_rx,
3861
                alice_peer_to_main_tx,
3862
                alice_peer_to_main_rx,
3863
                mut alice,
3864
                alice_hsd,
3865
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default())
3866
                .await
3867
                .unwrap();
3868
            let genesis_block: Block = Block::genesis(network);
3869
            let blocks: [Block; 11] = fake_valid_sequence_of_blocks_for_tests(
3870
                &genesis_block,
3871
                Timestamp::hours(1),
3872
                [0u8; 32],
3873
                network,
3874
            )
3875
            .await;
3876
            for block in &blocks {
3877
                alice.set_new_tip(block.clone()).await.unwrap();
3878
            }
3879

3880
            let bh12 = blocks.last().unwrap().header().height;
3881
            let sync_challenge = SyncChallenge {
3882
                tip_digest: blocks[9].hash(),
3883
                challenges: [bh12; 10],
3884
            };
3885
            let alice_p2p_messages = Mock::new(vec![
3886
                Action::Read(PeerMessage::SyncChallenge(sync_challenge)),
3887
                Action::Read(PeerMessage::Bye),
3888
            ]);
3889

3890
            let peer_address = get_dummy_socket_address(0);
3891
            let mut alice_peer_loop_handler = PeerLoopHandler::new(
3892
                alice_peer_to_main_tx.clone(),
3893
                alice.clone(),
3894
                peer_address,
3895
                alice_hsd,
3896
                false,
3897
                1,
3898
            );
3899
            alice_peer_loop_handler
3900
                .run_wrapper(alice_p2p_messages, alice_main_to_peer_rx)
3901
                .await
3902
                .unwrap();
3903

3904
            drop(alice_peer_to_main_rx);
3905

3906
            let latest_sanction = alice
3907
                .lock_guard()
3908
                .await
3909
                .net
3910
                .get_peer_standing_from_database(peer_address.ip())
3911
                .await
3912
                .unwrap();
3913
            assert_eq!(
3914
                NegativePeerSanction::InvalidSyncChallenge,
3915
                latest_sanction
3916
                    .latest_punishment
3917
                    .expect("peer must be sanctioned")
3918
                    .0
3919
            );
3920
        }
3921

3922
        #[traced_test]
×
3923
        #[apply(shared_tokio_runtime)]
3924
        async fn bad_sync_challenge_genesis_block_doesnt_crash_client() {
3925
            // Criterium: Challenge may not point to genesis block, or block 1, as
3926
            // tip.
3927

3928
            let network = Network::Main;
3929
            let genesis_block: Block = Block::genesis(network);
3930

3931
            let alice_cli = cli_args::Args::default();
3932
            let (
3933
                _alice_main_to_peer_tx,
3934
                alice_main_to_peer_rx,
3935
                alice_peer_to_main_tx,
3936
                alice_peer_to_main_rx,
3937
                alice,
3938
                alice_hsd,
3939
            ) = get_test_genesis_setup(network, 0, alice_cli).await.unwrap();
3940

3941
            let sync_challenge = SyncChallenge {
3942
                tip_digest: genesis_block.hash(),
3943
                challenges: [BlockHeight::genesis(); 10],
3944
            };
3945

3946
            let alice_p2p_messages = Mock::new(vec![
3947
                Action::Read(PeerMessage::SyncChallenge(sync_challenge)),
3948
                Action::Read(PeerMessage::Bye),
3949
            ]);
3950

3951
            let peer_address = get_dummy_socket_address(0);
3952
            let mut alice_peer_loop_handler = PeerLoopHandler::new(
3953
                alice_peer_to_main_tx.clone(),
3954
                alice.clone(),
3955
                peer_address,
3956
                alice_hsd,
3957
                false,
3958
                1,
3959
            );
3960
            alice_peer_loop_handler
3961
                .run_wrapper(alice_p2p_messages, alice_main_to_peer_rx)
3962
                .await
3963
                .unwrap();
3964

3965
            drop(alice_peer_to_main_rx);
3966

3967
            let latest_sanction = alice
3968
                .lock_guard()
3969
                .await
3970
                .net
3971
                .get_peer_standing_from_database(peer_address.ip())
3972
                .await
3973
                .unwrap();
3974
            assert_eq!(
3975
                NegativePeerSanction::InvalidSyncChallenge,
3976
                latest_sanction
3977
                    .latest_punishment
3978
                    .expect("peer must be sanctioned")
3979
                    .0
3980
            );
3981
        }
3982

3983
        #[traced_test]
×
3984
        #[apply(shared_tokio_runtime)]
3985
        async fn sync_challenge_happy_path() -> Result<()> {
3986
            // Bob notifies Alice of a block whose parameters satisfy the sync mode
3987
            // criterion. Alice issues a challenge. Bob responds. Alice enters into
3988
            // sync mode.
3989

3990
            let mut rng = rand::rng();
3991
            let network = Network::Main;
3992
            let genesis_block: Block = Block::genesis(network);
3993

3994
            const ALICE_SYNC_MODE_THRESHOLD: usize = 10;
3995
            let alice_cli = cli_args::Args {
3996
                sync_mode_threshold: ALICE_SYNC_MODE_THRESHOLD,
3997
                ..Default::default()
3998
            };
3999
            let (
4000
                _alice_main_to_peer_tx,
4001
                alice_main_to_peer_rx,
4002
                alice_peer_to_main_tx,
4003
                mut alice_peer_to_main_rx,
4004
                mut alice,
4005
                alice_hsd,
4006
            ) = get_test_genesis_setup(network, 0, alice_cli).await?;
4007
            let _alice_socket_address = get_dummy_socket_address(0);
4008

4009
            let (
4010
                _bob_main_to_peer_tx,
4011
                _bob_main_to_peer_rx,
4012
                _bob_peer_to_main_tx,
4013
                _bob_peer_to_main_rx,
4014
                mut bob,
4015
                _bob_hsd,
4016
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
4017
            let bob_socket_address = get_dummy_socket_address(0);
4018

4019
            let now = genesis_block.header().timestamp + Timestamp::hours(1);
4020
            let block_1 = fake_valid_block_for_tests(&alice, rng.random()).await;
4021
            assert!(
4022
                block_1.is_valid(&genesis_block, now, network).await,
4023
                "Block must be valid for this test to make sense"
4024
            );
4025
            let alice_tip = &block_1;
4026
            alice.set_new_tip(block_1.clone()).await?;
4027
            bob.set_new_tip(block_1.clone()).await?;
4028

4029
            // produce enough blocks to ensure alice needs to go into sync mode
4030
            // with this block notification.
4031
            let blocks = fake_valid_sequence_of_blocks_for_tests_dyn(
4032
                &block_1,
4033
                TARGET_BLOCK_INTERVAL,
4034
                rng.random(),
4035
                network,
4036
                rng.random_range(ALICE_SYNC_MODE_THRESHOLD + 1..20),
4037
            )
4038
            .await;
4039
            for block in &blocks {
4040
                bob.set_new_tip(block.clone()).await?;
4041
            }
4042
            let bob_tip = blocks.last().unwrap();
4043

4044
            let block_notification_from_bob = PeerBlockNotification {
4045
                hash: bob_tip.hash(),
4046
                height: bob_tip.header().height,
4047
                cumulative_proof_of_work: bob_tip.header().cumulative_proof_of_work,
4048
            };
4049

4050
            let alice_rng_seed = rng.random::<[u8; 32]>();
4051
            let mut alice_rng_clone = StdRng::from_seed(alice_rng_seed);
4052
            let sync_challenge_from_alice = SyncChallenge::generate(
4053
                &block_notification_from_bob,
4054
                alice_tip.header().height,
4055
                alice_rng_clone.random(),
4056
            );
4057

4058
            println!(
4059
                "sync challenge from alice:\n{:?}",
4060
                sync_challenge_from_alice
4061
            );
4062

4063
            let sync_challenge_response_from_bob = bob
4064
                .lock_guard()
4065
                .await
4066
                .response_to_sync_challenge(sync_challenge_from_alice)
4067
                .await
4068
                .expect("should be able to respond to sync challenge");
4069

4070
            let alice_p2p_messages = Mock::new(vec![
4071
                Action::Read(PeerMessage::BlockNotification(block_notification_from_bob)),
4072
                Action::Write(PeerMessage::SyncChallenge(sync_challenge_from_alice)),
4073
                Action::Read(PeerMessage::SyncChallengeResponse(Box::new(
4074
                    sync_challenge_response_from_bob,
4075
                ))),
4076
                Action::Read(PeerMessage::BlockNotification(block_notification_from_bob)),
4077
                // Ensure no 2nd sync challenge is sent, as timeout has not yet passed.
4078
                // The absence of a Write here checks that a 2nd challenge isn't sent
4079
                // when a successful was just received.
4080
                Action::Read(PeerMessage::Bye),
4081
            ]);
4082

4083
            let mut alice_peer_loop_handler = PeerLoopHandler::with_mocked_time(
4084
                alice_peer_to_main_tx.clone(),
4085
                alice.clone(),
4086
                bob_socket_address,
4087
                alice_hsd,
4088
                false,
4089
                1,
4090
                bob_tip.header().timestamp,
4091
            );
4092
            alice_peer_loop_handler.set_rng(StdRng::from_seed(alice_rng_seed));
4093
            alice_peer_loop_handler
4094
                .run_wrapper(alice_p2p_messages, alice_main_to_peer_rx)
4095
                .await?;
4096

4097
            // AddPeerMaxBlockHeight message triggered *after* sync challenge
4098
            let mut expected_anchor_mmra = bob_tip.body().block_mmr_accumulator.clone();
4099
            expected_anchor_mmra.append(bob_tip.hash());
4100
            let expected_message_from_alice_peer_loop = PeerTaskToMain::AddPeerMaxBlockHeight {
4101
                peer_address: bob_socket_address,
4102
                claimed_height: bob_tip.header().height,
4103
                claimed_cumulative_pow: bob_tip.header().cumulative_proof_of_work,
4104
                claimed_block_mmra: expected_anchor_mmra,
4105
            };
4106
            let observed_message_from_alice_peer_loop = alice_peer_to_main_rx.recv().await.unwrap();
4107
            assert_eq!(
4108
                expected_message_from_alice_peer_loop,
4109
                observed_message_from_alice_peer_loop
4110
            );
4111

4112
            Ok(())
4113
        }
4114
    }
4115
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc