• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Neptune-Crypto / neptune-core / 15027636480

14 May 2025 05:53PM UTC coverage: 71.729% (-0.02%) from 71.744%
15027636480

Pull #592

github

web-flow
Merge e1c7119d6 into 825a3b2f8
Pull Request #592: Bootstrap with downloaded blocks

83 of 124 new or added lines in 6 files covered. (66.94%)

19 existing lines in 4 files now uncovered.

20051 of 27954 relevant lines covered (71.73%)

381258.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.41
/src/peer_loop.rs
1
use std::cmp;
2
use std::marker::Unpin;
3
use std::net::SocketAddr;
4
use std::time::SystemTime;
5

6
use anyhow::bail;
7
use anyhow::Result;
8
use chrono::DateTime;
9
use chrono::Utc;
10
use futures::sink::Sink;
11
use futures::sink::SinkExt;
12
use futures::stream::TryStream;
13
use futures::stream::TryStreamExt;
14
use itertools::Itertools;
15
use rand::rngs::StdRng;
16
use rand::Rng;
17
use rand::SeedableRng;
18
use tasm_lib::triton_vm::prelude::Digest;
19
use tasm_lib::twenty_first::prelude::Mmr;
20
use tasm_lib::twenty_first::prelude::MmrMembershipProof;
21
use tasm_lib::twenty_first::util_types::mmr::mmr_accumulator::MmrAccumulator;
22
use tokio::select;
23
use tokio::sync::broadcast;
24
use tokio::sync::mpsc;
25
use tracing::debug;
26
use tracing::error;
27
use tracing::info;
28
use tracing::warn;
29

30
use crate::connect_to_peers::close_peer_connected_callback;
31
use crate::macros::fn_name;
32
use crate::macros::log_slow_scope;
33
use crate::main_loop::MAX_NUM_DIGESTS_IN_BATCH_REQUEST;
34
use crate::models::blockchain::block::block_height::BlockHeight;
35
use crate::models::blockchain::block::mutator_set_update::MutatorSetUpdate;
36
use crate::models::blockchain::block::Block;
37
use crate::models::blockchain::transaction::transaction_kernel::TransactionConfirmabilityError;
38
use crate::models::blockchain::transaction::Transaction;
39
use crate::models::channel::MainToPeerTask;
40
use crate::models::channel::PeerTaskToMain;
41
use crate::models::channel::PeerTaskToMainTransaction;
42
use crate::models::peer::handshake_data::HandshakeData;
43
use crate::models::peer::peer_info::PeerConnectionInfo;
44
use crate::models::peer::peer_info::PeerInfo;
45
use crate::models::peer::transfer_block::TransferBlock;
46
use crate::models::peer::BlockProposalRequest;
47
use crate::models::peer::BlockRequestBatch;
48
use crate::models::peer::IssuedSyncChallenge;
49
use crate::models::peer::MutablePeerState;
50
use crate::models::peer::NegativePeerSanction;
51
use crate::models::peer::PeerMessage;
52
use crate::models::peer::PeerSanction;
53
use crate::models::peer::PeerStanding;
54
use crate::models::peer::PositivePeerSanction;
55
use crate::models::peer::SyncChallenge;
56
use crate::models::proof_abstractions::mast_hash::MastHash;
57
use crate::models::proof_abstractions::timestamp::Timestamp;
58
use crate::models::state::block_proposal::BlockProposalRejectError;
59
use crate::models::state::mempool::MEMPOOL_IGNORE_TRANSACTIONS_THIS_MANY_SECS_AHEAD;
60
use crate::models::state::mempool::MEMPOOL_TX_THRESHOLD_AGE_IN_SECS;
61
use crate::models::state::GlobalState;
62
use crate::models::state::GlobalStateLock;
63
use crate::util_types::mutator_set::removal_record::RemovalRecordValidityError;
64

65
const STANDARD_BLOCK_BATCH_SIZE: usize = 250;
66
const MAX_PEER_LIST_LENGTH: usize = 10;
67
const MINIMUM_BLOCK_BATCH_SIZE: usize = 2;
68

69
const KEEP_CONNECTION_ALIVE: bool = false;
70
const DISCONNECT_CONNECTION: bool = true;
71

72
pub type PeerStandingNumber = i32;
73

74
/// Handles messages from peers via TCP
75
///
76
/// also handles messages from main task over the main-to-peer-tasks broadcast
77
/// channel.
78
#[derive(Debug, Clone)]
79
pub struct PeerLoopHandler {
80
    to_main_tx: mpsc::Sender<PeerTaskToMain>,
81
    global_state_lock: GlobalStateLock,
82
    peer_address: SocketAddr,
83
    peer_handshake_data: HandshakeData,
84
    inbound_connection: bool,
85
    distance: u8,
86
    rng: StdRng,
87
    #[cfg(test)]
88
    mock_now: Option<Timestamp>,
89
}
90

91
impl PeerLoopHandler {
92
    pub(crate) fn new(
28✔
93
        to_main_tx: mpsc::Sender<PeerTaskToMain>,
28✔
94
        global_state_lock: GlobalStateLock,
28✔
95
        peer_address: SocketAddr,
28✔
96
        peer_handshake_data: HandshakeData,
28✔
97
        inbound_connection: bool,
28✔
98
        distance: u8,
28✔
99
    ) -> Self {
28✔
100
        Self {
28✔
101
            to_main_tx,
28✔
102
            global_state_lock,
28✔
103
            peer_address,
28✔
104
            peer_handshake_data,
28✔
105
            inbound_connection,
28✔
106
            distance,
28✔
107
            rng: StdRng::from_rng(&mut rand::rng()),
28✔
108
            #[cfg(test)]
28✔
109
            mock_now: None,
28✔
110
        }
28✔
111
    }
28✔
112

113
    /// Allows for mocked timestamps such that time dependencies may be tested.
114
    #[cfg(test)]
115
    pub(crate) fn with_mocked_time(
20✔
116
        to_main_tx: mpsc::Sender<PeerTaskToMain>,
20✔
117
        global_state_lock: GlobalStateLock,
20✔
118
        peer_address: SocketAddr,
20✔
119
        peer_handshake_data: HandshakeData,
20✔
120
        inbound_connection: bool,
20✔
121
        distance: u8,
20✔
122
        mocked_time: Timestamp,
20✔
123
    ) -> Self {
20✔
124
        Self {
20✔
125
            to_main_tx,
20✔
126
            global_state_lock,
20✔
127
            peer_address,
20✔
128
            peer_handshake_data,
20✔
129
            inbound_connection,
20✔
130
            distance,
20✔
131
            mock_now: Some(mocked_time),
20✔
132
            rng: StdRng::from_rng(&mut rand::rng()),
20✔
133
        }
20✔
134
    }
20✔
135

136
    /// Overwrite the random number generator object with a specific one.
137
    ///
138
    /// Useful for derandomizing tests.
139
    #[cfg(test)]
140
    fn set_rng(&mut self, rng: StdRng) {
1✔
141
        self.rng = rng;
1✔
142
    }
1✔
143

144
    fn now(&self) -> Timestamp {
54✔
145
        #[cfg(not(test))]
146
        {
147
            Timestamp::now()
27✔
148
        }
149
        #[cfg(test)]
150
        {
151
            self.mock_now.unwrap_or(Timestamp::now())
27✔
152
        }
153
    }
54✔
154

155
    /// Punish a peer for bad behavior.
156
    ///
157
    /// Return `Err` if the peer in question is (now) banned.
158
    ///
159
    /// # Locking:
160
    ///   * acquires `global_state_lock` for write
161
    async fn punish(&mut self, reason: NegativePeerSanction) -> Result<()> {
6✔
162
        let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
6✔
163
        warn!("Punishing peer {} for {:?}", self.peer_address.ip(), reason);
6✔
164
        debug!(
6✔
165
            "Peer standing before punishment is {}",
×
166
            global_state_mut
×
167
                .net
×
168
                .peer_map
×
169
                .get(&self.peer_address)
×
170
                .unwrap()
×
171
                .standing
172
        );
173

174
        let Some(peer_info) = global_state_mut.net.peer_map.get_mut(&self.peer_address) else {
6✔
175
            bail!("Could not read peer map.");
×
176
        };
177
        let sanction_result = peer_info.standing.sanction(PeerSanction::Negative(reason));
6✔
178
        if let Err(err) = sanction_result {
6✔
179
            warn!("Banning peer: {err}");
1✔
180
        }
5✔
181

182
        sanction_result.map_err(|err| anyhow::anyhow!("Banning peer: {err}"))
6✔
183
    }
6✔
184

185
    /// Reward a peer for good behavior.
186
    ///
187
    /// Return `Err` if the peer in question is banned.
188
    ///
189
    /// # Locking:
190
    ///   * acquires `global_state_lock` for write
191
    async fn reward(&mut self, reason: PositivePeerSanction) -> Result<()> {
19✔
192
        let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
19✔
193
        info!("Rewarding peer {} for {:?}", self.peer_address.ip(), reason);
19✔
194
        let Some(peer_info) = global_state_mut.net.peer_map.get_mut(&self.peer_address) else {
19✔
195
            error!("Could not read peer map.");
1✔
196
            return Ok(());
1✔
197
        };
198
        let sanction_result = peer_info.standing.sanction(PeerSanction::Positive(reason));
18✔
199
        if sanction_result.is_err() {
18✔
200
            error!("Cannot reward banned peer");
×
201
        }
18✔
202

203
        sanction_result.map_err(|err| anyhow::anyhow!("Cannot reward banned peer: {err}"))
18✔
204
    }
19✔
205

206
    /// Construct a batch response, with blocks and their MMR membership proofs
207
    /// relative to a specified anchor.
208
    ///
209
    /// Returns `None` if the anchor has a lower leaf count than the blocks, or
210
    /// a block height of the response exceeds own tip height.
211
    async fn batch_response(
16✔
212
        state: &GlobalState,
16✔
213
        blocks: Vec<Block>,
16✔
214
        anchor: &MmrAccumulator,
16✔
215
    ) -> Option<Vec<(TransferBlock, MmrMembershipProof)>> {
16✔
216
        let own_tip_height = state.chain.light_state().header().height;
16✔
217
        let block_heights_match_anchor = blocks
16✔
218
            .iter()
16✔
219
            .all(|bl| bl.header().height < anchor.num_leafs().into());
46✔
220
        let block_heights_known = blocks.iter().all(|bl| bl.header().height <= own_tip_height);
46✔
221
        if !block_heights_match_anchor || !block_heights_known {
16✔
222
            let max_block_height = match blocks.iter().map(|bl| bl.header().height).max() {
×
223
                Some(height) => height.to_string(),
×
224
                None => "None".to_owned(),
×
225
            };
226

227
            debug!("max_block_height: {max_block_height}");
×
228
            debug!("own_tip_height: {own_tip_height}");
×
229
            debug!("anchor.num_leafs(): {}", anchor.num_leafs());
×
230
            debug!("block_heights_match_anchor: {block_heights_match_anchor}");
×
231
            debug!("block_heights_known: {block_heights_known}");
×
232
            return None;
×
233
        }
16✔
234

235
        let mut ret = vec![];
16✔
236
        for block in blocks {
62✔
237
            let mmr_mp = state
46✔
238
                .chain
46✔
239
                .archival_state()
46✔
240
                .archival_block_mmr
46✔
241
                .ammr()
46✔
242
                .prove_membership_relative_to_smaller_mmr(
46✔
243
                    block.header().height.into(),
46✔
244
                    anchor.num_leafs(),
46✔
245
                )
46✔
246
                .await;
46✔
247
            let block: TransferBlock = block.try_into().unwrap();
46✔
248
            ret.push((block, mmr_mp));
46✔
249
        }
250

251
        Some(ret)
16✔
252
    }
16✔
253

254
    /// Handle validation and send all blocks to the main task if they're all
255
    /// valid. Use with a list of blocks or a single block. When the
256
    /// `received_blocks` is a list, the parent of the `i+1`th block in the
257
    /// list is the `i`th block. The parent of element zero in this list is
258
    /// `parent_of_first_block`.
259
    ///
260
    /// # Return Value
261
    ///  - `Err` when the connection should be closed;
262
    ///  - `Ok(None)` if some block is invalid
263
    ///  - `Ok(None)` if the last block has insufficient cumulative PoW and we
264
    ///    are not syncing;
265
    ///  - `Ok(None)` if the last block has insufficient height and we are
266
    ///    syncing;
267
    ///  - `Ok(Some(block_height))` otherwise, referring to the block with the
268
    ///    highest height in the batch.
269
    ///
270
    /// A return value of Ok(Some(_)) means that the message was passed on to
271
    /// main loop.
272
    ///
273
    /// # Locking
274
    ///   * Acquires `global_state_lock` for write via `self.punish(..)` and
275
    ///     `self.reward(..)`.
276
    ///
277
    /// # Panics
278
    ///
279
    ///  - Panics if called with the empty list.
280
    async fn handle_blocks(
20✔
281
        &mut self,
20✔
282
        received_blocks: Vec<Block>,
20✔
283
        parent_of_first_block: Block,
20✔
284
    ) -> Result<Option<BlockHeight>> {
20✔
285
        debug!(
20✔
286
            "attempting to validate {} {}",
×
287
            received_blocks.len(),
×
288
            if received_blocks.len() == 1 {
×
289
                "block"
×
290
            } else {
291
                "blocks"
×
292
            }
293
        );
294
        let now = self.now();
20✔
295
        debug!("validating with respect to current timestamp {now}");
20✔
296
        let mut previous_block = &parent_of_first_block;
20✔
297
        for new_block in &received_blocks {
48✔
298
            let new_block_has_proof_of_work = new_block.has_proof_of_work(previous_block.header());
29✔
299
            debug!("new block has proof of work? {new_block_has_proof_of_work}");
29✔
300
            let new_block_is_valid = new_block
29✔
301
                .is_valid(previous_block, now, self.global_state_lock.cli().network)
29✔
302
                .await;
29✔
303
            debug!("new block is valid? {new_block_is_valid}");
29✔
304
            if !new_block_has_proof_of_work {
29✔
305
                warn!(
1✔
306
                    "Received invalid proof-of-work for block of height {} from peer with IP {}",
×
307
                    new_block.kernel.header.height, self.peer_address
×
308
                );
309
                warn!("Difficulty is {}.", previous_block.kernel.header.difficulty);
1✔
310
                warn!(
1✔
311
                    "Proof of work should be {} (or more) but was [{}].",
×
312
                    previous_block.kernel.header.difficulty.target(),
×
313
                    new_block.hash().values().iter().join(", ")
×
314
                );
315
                self.punish(NegativePeerSanction::InvalidBlock((
1✔
316
                    new_block.kernel.header.height,
1✔
317
                    new_block.hash(),
1✔
318
                )))
1✔
319
                .await?;
1✔
320
                warn!("Failed to validate block due to insufficient PoW");
1✔
321
                return Ok(None);
1✔
322
            } else if !new_block_is_valid {
28✔
323
                warn!(
×
324
                    "Received invalid block of height {} from peer with IP {}",
×
325
                    new_block.kernel.header.height, self.peer_address
×
326
                );
327
                self.punish(NegativePeerSanction::InvalidBlock((
×
328
                    new_block.kernel.header.height,
×
329
                    new_block.hash(),
×
330
                )))
×
331
                .await?;
×
332
                warn!("Failed to validate block: invalid block");
×
333
                return Ok(None);
×
334
            }
28✔
335
            info!(
28✔
336
                "Block with height {} is valid. mined: {}",
×
337
                new_block.kernel.header.height,
×
338
                new_block.kernel.header.timestamp.standard_format()
×
339
            );
340

341
            previous_block = new_block;
28✔
342
        }
343

344
        // evaluate the fork choice rule
345
        debug!("Checking last block's canonicity ...");
19✔
346
        let last_block = received_blocks.last().unwrap();
19✔
347
        let is_canonical = self
19✔
348
            .global_state_lock
19✔
349
            .lock_guard()
19✔
350
            .await
19✔
351
            .incoming_block_is_more_canonical(last_block);
19✔
352
        let last_block_height = last_block.header().height;
19✔
353
        let sync_mode_active_and_have_new_champion = self
19✔
354
            .global_state_lock
19✔
355
            .lock_guard()
19✔
356
            .await
19✔
357
            .net
358
            .sync_anchor
359
            .as_ref()
19✔
360
            .is_some_and(|x| {
19✔
361
                x.champion
×
362
                    .is_none_or(|(height, _)| height < last_block_height)
×
363
            });
×
364
        if !is_canonical && !sync_mode_active_and_have_new_champion {
19✔
365
            warn!(
1✔
366
                "Received {} blocks from peer but incoming blocks are less \
×
367
            canonical than current tip, or current sync-champion.",
×
368
                received_blocks.len()
×
369
            );
370
            return Ok(None);
1✔
371
        }
18✔
372

373
        // Send the new blocks to the main task which handles the state update
374
        // and storage to the database.
375
        let number_of_received_blocks = received_blocks.len();
18✔
376
        self.to_main_tx
18✔
377
            .send(PeerTaskToMain::NewBlocks(received_blocks))
18✔
378
            .await?;
18✔
379
        info!(
18✔
380
            "Updated block info by block from peer. block height {}",
×
381
            last_block_height
382
        );
383

384
        // Valuable, new, hard-to-produce information. Reward peer.
385
        self.reward(PositivePeerSanction::ValidBlocks(number_of_received_blocks))
18✔
386
            .await?;
18✔
387

388
        Ok(Some(last_block_height))
18✔
389
    }
20✔
390

391
    /// Take a single block received from a peer and (attempt to) find a path
392
    /// between the received block and a common ancestor stored in the blocks
393
    /// database.
394
    ///
395
    /// This function attempts to find the parent of the received block, either
396
    /// by searching the database or by requesting it from a peer.
397
    ///  - If the parent is not stored, it is requested from the peer and the
398
    ///    received block is pushed to the fork reconciliation list for later
399
    ///    handling by this function. The fork reconciliation list starts out
400
    ///    empty, but grows as more parents are requested and transmitted.
401
    ///  - If the parent is found in the database, a) block handling continues:
402
    ///    the entire list of fork reconciliation blocks are passed down the
403
    ///    pipeline, potentially leading to a state update; and b) the fork
404
    ///    reconciliation list is cleared.
405
    ///
406
    /// Locking:
407
    ///   * Acquires `global_state_lock` for write via `self.punish(..)` and
408
    ///     `self.reward(..)`.
409
    async fn try_ensure_path<S>(
32✔
410
        &mut self,
32✔
411
        received_block: Box<Block>,
32✔
412
        peer: &mut S,
32✔
413
        peer_state: &mut MutablePeerState,
32✔
414
    ) -> Result<()>
32✔
415
    where
32✔
416
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
32✔
417
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
32✔
418
        <S as TryStream>::Error: std::error::Error,
32✔
419
    {
32✔
420
        // Does the received block match the fork reconciliation list?
421
        let received_block_matches_fork_reconciliation_list = if let Some(successor) =
32✔
422
            peer_state.fork_reconciliation_blocks.last()
32✔
423
        {
424
            let valid = successor
10✔
425
                .is_valid(
10✔
426
                    received_block.as_ref(),
10✔
427
                    self.now(),
10✔
428
                    self.global_state_lock.cli().network,
10✔
429
                )
10✔
430
                .await;
10✔
431
            if !valid {
10✔
432
                warn!(
×
433
                        "Fork reconciliation failed after receiving {} blocks: successor of received block is invalid",
×
434
                        peer_state.fork_reconciliation_blocks.len() + 1
×
435
                    );
436
            }
10✔
437
            valid
10✔
438
        } else {
439
            true
22✔
440
        };
441

442
        // Are we running out of RAM?
443
        let too_many_blocks = peer_state.fork_reconciliation_blocks.len() + 1
32✔
444
            >= self.global_state_lock.cli().sync_mode_threshold;
32✔
445
        if too_many_blocks {
32✔
446
            warn!(
1✔
447
                "Fork reconciliation failed after receiving {} blocks: block count exceeds sync mode threshold",
×
448
                peer_state.fork_reconciliation_blocks.len() + 1
×
449
            );
450
        }
31✔
451

452
        // Block mismatch or too many blocks: abort!
453
        if !received_block_matches_fork_reconciliation_list || too_many_blocks {
32✔
454
            self.punish(NegativePeerSanction::ForkResolutionError((
1✔
455
                received_block.header().height,
1✔
456
                peer_state.fork_reconciliation_blocks.len() as u16,
1✔
457
                received_block.hash(),
1✔
458
            )))
1✔
459
            .await?;
1✔
460
            peer_state.fork_reconciliation_blocks = vec![];
1✔
461
            return Ok(());
1✔
462
        }
31✔
463

464
        // otherwise, append
465
        peer_state.fork_reconciliation_blocks.push(*received_block);
31✔
466

467
        // Try fetch parent
468
        let received_block_header = *peer_state
31✔
469
            .fork_reconciliation_blocks
31✔
470
            .last()
31✔
471
            .unwrap()
31✔
472
            .header();
31✔
473

474
        let parent_digest = received_block_header.prev_block_digest;
31✔
475
        let parent_height = received_block_header.height.previous()
31✔
476
            .expect("transferred block must have previous height because genesis block cannot be transferred");
31✔
477
        debug!("Try ensure path: fetching parent block");
31✔
478
        let parent_block = self
31✔
479
            .global_state_lock
31✔
480
            .lock_guard()
31✔
481
            .await
31✔
482
            .chain
483
            .archival_state()
31✔
484
            .get_block(parent_digest)
31✔
485
            .await?;
31✔
486
        debug!(
31✔
487
            "Completed parent block fetching from DB: {}",
×
488
            if parent_block.is_some() {
×
489
                "found".to_string()
×
490
            } else {
491
                "not found".to_string()
×
492
            }
493
        );
494

495
        // If parent is not known (but not genesis) request it.
496
        let Some(parent_block) = parent_block else {
31✔
497
            if parent_height.is_genesis() {
11✔
498
                peer_state.fork_reconciliation_blocks.clear();
1✔
499
                self.punish(NegativePeerSanction::DifferentGenesis).await?;
1✔
500
                return Ok(());
×
501
            }
10✔
502
            info!(
10✔
503
                "Parent not known: Requesting previous block with height {} from peer",
×
504
                parent_height
505
            );
506

507
            peer.send(PeerMessage::BlockRequestByHash(parent_digest))
10✔
508
                .await?;
10✔
509

510
            return Ok(());
10✔
511
        };
512

513
        // We want to treat the received fork reconciliation blocks (plus the
514
        // received block) in reverse order, from oldest to newest, because
515
        // they were requested from high to low block height.
516
        let mut new_blocks = peer_state.fork_reconciliation_blocks.clone();
20✔
517
        new_blocks.reverse();
20✔
518

519
        // Reset the fork resolution state since we got all the way back to a
520
        // block that we have.
521
        let fork_reconciliation_event = !peer_state.fork_reconciliation_blocks.is_empty();
20✔
522
        peer_state.fork_reconciliation_blocks.clear();
20✔
523

524
        if let Some(new_block_height) = self.handle_blocks(new_blocks, parent_block).await? {
20✔
525
            // If `BlockNotification` was received during a block reconciliation
526
            // event, then the peer might have one (or more (unlikely)) blocks
527
            // that we do not have. We should thus request those blocks.
528
            if fork_reconciliation_event
18✔
529
                && peer_state.highest_shared_block_height > new_block_height
18✔
530
            {
531
                peer.send(PeerMessage::BlockRequestByHeight(
1✔
532
                    peer_state.highest_shared_block_height,
1✔
533
                ))
1✔
534
                .await?;
1✔
535
            }
17✔
536
        }
2✔
537

538
        Ok(())
20✔
539
    }
32✔
540

541
    /// Handle peer messages and returns Ok(true) if connection should be closed.
542
    /// Connection should also be closed if an error is returned.
543
    /// Otherwise, returns OK(false).
544
    ///
545
    /// Locking:
546
    ///   * Acquires `global_state_lock` for read.
547
    ///   * Acquires `global_state_lock` for write via `self.punish(..)` and
548
    ///     `self.reward(..)`.
549
    async fn handle_peer_message<S>(
147✔
550
        &mut self,
147✔
551
        msg: PeerMessage,
147✔
552
        peer: &mut S,
147✔
553
        peer_state_info: &mut MutablePeerState,
147✔
554
    ) -> Result<bool>
147✔
555
    where
147✔
556
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
147✔
557
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
147✔
558
        <S as TryStream>::Error: std::error::Error,
147✔
559
    {
147✔
560
        debug!(
147✔
561
            "Received {} from peer {}",
×
562
            msg.get_type(),
×
563
            self.peer_address
564
        );
565
        match msg {
147✔
566
            PeerMessage::Bye => {
567
                // Note that the current peer is not removed from the global_state.peer_map here
568
                // but that this is done by the caller.
569
                info!("Got bye. Closing connection to peer");
40✔
570
                Ok(DISCONNECT_CONNECTION)
40✔
571
            }
572
            PeerMessage::PeerListRequest => {
573
                let peer_info = {
5✔
574
                    log_slow_scope!(fn_name!() + "::PeerMessage::PeerListRequest");
5✔
575

576
                    // We are interested in the address on which peers accept ingoing connections,
577
                    // not in the address in which they are connected to us. We are only interested in
578
                    // peers that accept incoming connections.
579
                    let mut peer_info: Vec<(SocketAddr, u128)> = self
5✔
580
                        .global_state_lock
5✔
581
                        .lock_guard()
5✔
582
                        .await
5✔
583
                        .net
584
                        .peer_map
585
                        .values()
5✔
586
                        .filter(|peer_info| peer_info.listen_address().is_some())
8✔
587
                        .take(MAX_PEER_LIST_LENGTH) // limit length of response
5✔
588
                        .map(|peer_info| {
8✔
589
                            (
8✔
590
                                // unwrap is safe bc of above `filter`
8✔
591
                                peer_info.listen_address().unwrap(),
8✔
592
                                peer_info.instance_id(),
8✔
593
                            )
8✔
594
                        })
8✔
595
                        .collect();
5✔
596

597
                    // We sort the returned list, so this function is easier to test
598
                    peer_info.sort_by_cached_key(|x| x.0);
5✔
599
                    peer_info
5✔
600
                };
601

602
                debug!("Responding with: {:?}", peer_info);
5✔
603
                peer.send(PeerMessage::PeerListResponse(peer_info)).await?;
5✔
604
                Ok(KEEP_CONNECTION_ALIVE)
5✔
605
            }
606
            PeerMessage::PeerListResponse(peers) => {
3✔
607
                log_slow_scope!(fn_name!() + "::PeerMessage::PeerListResponse");
3✔
608

609
                if peers.len() > MAX_PEER_LIST_LENGTH {
3✔
610
                    self.punish(NegativePeerSanction::FloodPeerListResponse)
×
611
                        .await?;
×
612
                }
3✔
613
                self.to_main_tx
3✔
614
                    .send(PeerTaskToMain::PeerDiscoveryAnswer((
3✔
615
                        peers,
3✔
616
                        self.peer_address,
3✔
617
                        // The distance to the revealed peers is 1 + this peer's distance
3✔
618
                        self.distance + 1,
3✔
619
                    )))
3✔
620
                    .await?;
3✔
621
                Ok(KEEP_CONNECTION_ALIVE)
3✔
622
            }
623
            PeerMessage::BlockNotificationRequest => {
624
                debug!("Got BlockNotificationRequest");
×
625

626
                peer.send(PeerMessage::BlockNotification(
×
627
                    self.global_state_lock
×
628
                        .lock_guard()
×
629
                        .await
×
630
                        .chain
631
                        .light_state()
×
632
                        .into(),
×
633
                ))
634
                .await?;
×
635

636
                Ok(KEEP_CONNECTION_ALIVE)
×
637
            }
638
            PeerMessage::BlockNotification(block_notification) => {
16✔
639
                const SYNC_CHALLENGE_COOLDOWN: Timestamp = Timestamp::minutes(10);
640

641
                let (tip_header, sync_anchor_is_set) = {
16✔
642
                    let state = self.global_state_lock.lock_guard().await;
16✔
643
                    (
16✔
644
                        *state.chain.light_state().header(),
16✔
645
                        state.net.sync_anchor.is_some(),
16✔
646
                    )
16✔
647
                };
648
                debug!(
16✔
649
                    "Got BlockNotification of height {}. Own height is {}",
×
650
                    block_notification.height, tip_header.height
651
                );
652

653
                let sync_mode_threshold = self.global_state_lock.cli().sync_mode_threshold;
16✔
654
                let now = self.now();
16✔
655
                let time_since_latest_successful_challenge = peer_state_info
16✔
656
                    .successful_sync_challenge_response_time
16✔
657
                    .map(|then| now - then);
16✔
658
                let cooldown_expired = time_since_latest_successful_challenge
16✔
659
                    .is_none_or(|time_passed| time_passed > SYNC_CHALLENGE_COOLDOWN);
16✔
660
                let exceeds_sync_mode_threshold = GlobalState::sync_mode_threshold_stateless(
16✔
661
                    &tip_header,
16✔
662
                    block_notification.height,
16✔
663
                    block_notification.cumulative_proof_of_work,
16✔
664
                    sync_mode_threshold,
16✔
665
                );
666
                if cooldown_expired && exceeds_sync_mode_threshold {
16✔
667
                    debug!("sync mode criterion satisfied.");
1✔
668

669
                    if peer_state_info.sync_challenge.is_some() {
1✔
670
                        warn!("Cannot launch new sync challenge because one is already on-going.");
×
671
                        return Ok(KEEP_CONNECTION_ALIVE);
×
672
                    }
1✔
673

674
                    info!(
1✔
675
                        "Peer indicates block which satisfies sync mode criterion, issuing challenge."
×
676
                    );
677
                    let challenge = SyncChallenge::generate(
1✔
678
                        &block_notification,
1✔
679
                        tip_header.height,
1✔
680
                        self.rng.random(),
1✔
681
                    );
682
                    peer_state_info.sync_challenge = Some(IssuedSyncChallenge::new(
1✔
683
                        challenge,
1✔
684
                        block_notification.cumulative_proof_of_work,
1✔
685
                        self.now(),
1✔
686
                    ));
1✔
687

688
                    debug!("sending challenge ...");
1✔
689
                    peer.send(PeerMessage::SyncChallenge(challenge)).await?;
1✔
690

691
                    return Ok(KEEP_CONNECTION_ALIVE);
1✔
692
                }
15✔
693

694
                peer_state_info.highest_shared_block_height = block_notification.height;
15✔
695
                let block_is_new = tip_header.cumulative_proof_of_work
15✔
696
                    < block_notification.cumulative_proof_of_work;
15✔
697

698
                debug!("block_is_new: {}", block_is_new);
15✔
699

700
                if block_is_new
15✔
701
                    && peer_state_info.fork_reconciliation_blocks.is_empty()
15✔
702
                    && !sync_anchor_is_set
14✔
703
                    && !exceeds_sync_mode_threshold
14✔
704
                {
705
                    debug!(
13✔
706
                        "sending BlockRequestByHeight to peer for block with height {}",
×
707
                        block_notification.height
708
                    );
709
                    peer.send(PeerMessage::BlockRequestByHeight(block_notification.height))
13✔
710
                        .await?;
13✔
711
                } else {
712
                    debug!(
2✔
713
                        "ignoring peer block. height {}. new: {}, reconciling_fork: {}",
×
714
                        block_notification.height,
715
                        block_is_new,
716
                        !peer_state_info.fork_reconciliation_blocks.is_empty()
×
717
                    );
718
                }
719

720
                Ok(KEEP_CONNECTION_ALIVE)
15✔
721
            }
722
            PeerMessage::SyncChallenge(sync_challenge) => {
2✔
723
                let response = {
×
724
                    log_slow_scope!(fn_name!() + "::PeerMessage::SyncChallenge");
2✔
725

726
                    info!("Got sync challenge from {}", self.peer_address.ip());
2✔
727

728
                    let response = self
2✔
729
                        .global_state_lock
2✔
730
                        .lock_guard()
2✔
731
                        .await
2✔
732
                        .response_to_sync_challenge(sync_challenge)
2✔
733
                        .await;
2✔
734

735
                    match response {
2✔
736
                        Ok(resp) => resp,
×
737
                        Err(e) => {
2✔
738
                            warn!("could not generate sync challenge response:\n{e}");
2✔
739
                            self.punish(NegativePeerSanction::InvalidSyncChallenge)
2✔
740
                                .await?;
2✔
741
                            return Ok(KEEP_CONNECTION_ALIVE);
2✔
742
                        }
743
                    }
744
                };
745

746
                info!(
×
747
                    "Responding to sync challenge from {}",
×
748
                    self.peer_address.ip()
×
749
                );
750
                peer.send(PeerMessage::SyncChallengeResponse(Box::new(response)))
×
751
                    .await?;
×
752

753
                Ok(KEEP_CONNECTION_ALIVE)
×
754
            }
755
            PeerMessage::SyncChallengeResponse(challenge_response) => {
1✔
756
                const SYNC_RESPONSE_TIMEOUT: Timestamp = Timestamp::seconds(45);
757

758
                log_slow_scope!(fn_name!() + "::PeerMessage::SyncChallengeResponse");
1✔
759
                info!(
1✔
760
                    "Got sync challenge response from {}",
×
761
                    self.peer_address.ip()
×
762
                );
763

764
                // The purpose of the sync challenge and sync challenge response
765
                // is to avoid going into sync mode based on a malicious target
766
                // fork. Instead of verifying that the claimed proof-of-work
767
                // number is correct (which would require sending and verifying,
768
                // at least, all blocks between luca (whatever that is) and the
769
                // claimed tip), we use a heuristic that requires less
770
                // communication and less verification work. The downside of
771
                // using a heuristic here is a nonzero false positive and false
772
                // negative rate. Note that the false negative event
773
                // (maliciously sending someone into sync mode based on a bogus
774
                // fork) still requires a significant amount of work from the
775
                // attacker, *in addition* to being lucky. Also, down the line
776
                // succinctness (and more specifically, recursive block
777
                // validation) obviates this entire subprotocol.
778

779
                // Did we issue a challenge?
780
                let Some(issued_challenge) = peer_state_info.sync_challenge else {
1✔
781
                    warn!("Sync challenge response was not prompted.");
×
782
                    self.punish(NegativePeerSanction::UnexpectedSyncChallengeResponse)
×
783
                        .await?;
×
784
                    return Ok(KEEP_CONNECTION_ALIVE);
×
785
                };
786

787
                // Reset the challenge, regardless of the response's success.
788
                peer_state_info.sync_challenge = None;
1✔
789

790
                // Does response match issued challenge?
791
                if !challenge_response.matches(issued_challenge) {
1✔
792
                    self.punish(NegativePeerSanction::InvalidSyncChallengeResponse)
×
793
                        .await?;
×
794
                    return Ok(KEEP_CONNECTION_ALIVE);
×
795
                }
1✔
796

797
                // Does response verify?
798
                let claimed_tip_height = challenge_response.tip.header.height;
1✔
799
                let now = self.now();
1✔
800
                if !challenge_response
1✔
801
                    .is_valid(now, self.global_state_lock.cli().network)
1✔
802
                    .await
1✔
803
                {
804
                    self.punish(NegativePeerSanction::InvalidSyncChallengeResponse)
×
805
                        .await?;
×
806
                    return Ok(KEEP_CONNECTION_ALIVE);
×
807
                }
1✔
808

809
                // Does cumulative proof-of-work evolve reasonably?
810
                let own_tip_header = *self
1✔
811
                    .global_state_lock
1✔
812
                    .lock_guard()
1✔
813
                    .await
1✔
814
                    .chain
815
                    .light_state()
1✔
816
                    .header();
1✔
817
                if !challenge_response
1✔
818
                    .check_pow(self.global_state_lock.cli().network, own_tip_header.height)
1✔
819
                {
820
                    self.punish(NegativePeerSanction::FishyPowEvolutionChallengeResponse)
×
821
                        .await?;
×
822
                    return Ok(KEEP_CONNECTION_ALIVE);
×
823
                }
1✔
824

825
                // Is there some specific (*i.e.*, not aggregate) proof of work?
826
                if !challenge_response.check_difficulty(own_tip_header.difficulty) {
1✔
827
                    self.punish(NegativePeerSanction::FishyDifficultiesChallengeResponse)
×
828
                        .await?;
×
829
                    return Ok(KEEP_CONNECTION_ALIVE);
×
830
                }
1✔
831

832
                // Did it come in time?
833
                if now - issued_challenge.issued_at > SYNC_RESPONSE_TIMEOUT {
1✔
834
                    self.punish(NegativePeerSanction::TimedOutSyncChallengeResponse)
×
835
                        .await?;
×
836
                    return Ok(KEEP_CONNECTION_ALIVE);
×
837
                }
1✔
838

839
                info!("Successful sync challenge response; relaying peer tip info to main loop.");
1✔
840
                peer_state_info.successful_sync_challenge_response_time = Some(now);
1✔
841

842
                let mut sync_mmra_anchor = challenge_response.tip.body.block_mmr_accumulator;
1✔
843
                sync_mmra_anchor.append(issued_challenge.challenge.tip_digest);
1✔
844

845
                // Inform main loop
846
                self.to_main_tx
1✔
847
                    .send(PeerTaskToMain::AddPeerMaxBlockHeight {
1✔
848
                        peer_address: self.peer_address,
1✔
849
                        claimed_height: claimed_tip_height,
1✔
850
                        claimed_cumulative_pow: issued_challenge.accumulated_pow,
1✔
851
                        claimed_block_mmra: sync_mmra_anchor,
1✔
852
                    })
1✔
853
                    .await?;
1✔
854

855
                Ok(KEEP_CONNECTION_ALIVE)
1✔
856
            }
857
            PeerMessage::BlockRequestByHash(block_digest) => {
×
858
                let block = self
×
859
                    .global_state_lock
×
860
                    .lock_guard()
×
861
                    .await
×
862
                    .chain
863
                    .archival_state()
×
864
                    .get_block(block_digest)
×
865
                    .await?;
×
866

867
                match block {
×
868
                    None => {
869
                        // TODO: Consider punishing here
870
                        warn!("Peer requested unknown block with hash {}", block_digest);
×
871
                        Ok(KEEP_CONNECTION_ALIVE)
×
872
                    }
873
                    Some(b) => {
×
874
                        peer.send(PeerMessage::Block(Box::new(b.try_into().unwrap())))
×
875
                            .await?;
×
876
                        Ok(KEEP_CONNECTION_ALIVE)
×
877
                    }
878
                }
879
            }
880
            PeerMessage::BlockRequestByHeight(block_height) => {
16✔
881
                let block_response = {
15✔
882
                    log_slow_scope!(fn_name!() + "::PeerMessage::BlockRequestByHeight");
16✔
883

884
                    debug!("Got BlockRequestByHeight of height {}", block_height);
16✔
885

886
                    let canonical_block_digest = self
16✔
887
                        .global_state_lock
16✔
888
                        .lock_guard()
16✔
889
                        .await
16✔
890
                        .chain
891
                        .archival_state()
16✔
892
                        .archival_block_mmr
893
                        .ammr()
16✔
894
                        .try_get_leaf(block_height.into())
16✔
895
                        .await;
16✔
896

897
                    let canonical_block_digest = match canonical_block_digest {
16✔
898
                        None => {
899
                            let own_tip_height = self
1✔
900
                                .global_state_lock
1✔
901
                                .lock_guard()
1✔
902
                                .await
1✔
903
                                .chain
904
                                .light_state()
1✔
905
                                .header()
1✔
906
                                .height;
907
                            warn!("Got block request by height ({block_height}) for unknown block. Own tip height is {own_tip_height}.");
1✔
908
                            self.punish(NegativePeerSanction::BlockRequestUnknownHeight)
1✔
909
                                .await?;
1✔
910

911
                            return Ok(KEEP_CONNECTION_ALIVE);
1✔
912
                        }
913
                        Some(digest) => digest,
15✔
914
                    };
915

916
                    let canonical_chain_block: Block = self
15✔
917
                        .global_state_lock
15✔
918
                        .lock_guard()
15✔
919
                        .await
15✔
920
                        .chain
921
                        .archival_state()
15✔
922
                        .get_block(canonical_block_digest)
15✔
923
                        .await?
15✔
924
                        .unwrap();
15✔
925

926
                    PeerMessage::Block(Box::new(canonical_chain_block.try_into().unwrap()))
15✔
927
                };
928

929
                debug!("Sending block");
15✔
930
                peer.send(block_response).await?;
15✔
931
                debug!("Sent block");
15✔
932
                Ok(KEEP_CONNECTION_ALIVE)
15✔
933
            }
934
            PeerMessage::Block(t_block) => {
32✔
935
                log_slow_scope!(fn_name!() + "::PeerMessage::Block");
32✔
936

937
                info!(
32✔
938
                    "Got new block from peer {}, height {}, mined {}",
×
939
                    self.peer_address,
940
                    t_block.header.height,
941
                    t_block.header.timestamp.standard_format()
×
942
                );
943
                let new_block_height = t_block.header.height;
32✔
944

945
                let block = match Block::try_from(*t_block) {
32✔
946
                    Ok(block) => Box::new(block),
32✔
947
                    Err(e) => {
×
948
                        warn!("Peer sent invalid block: {e:?}");
×
949
                        self.punish(NegativePeerSanction::InvalidTransferBlock)
×
950
                            .await?;
×
951

952
                        return Ok(KEEP_CONNECTION_ALIVE);
×
953
                    }
954
                };
955

956
                // Update the value for the highest known height that peer possesses iff
957
                // we are not in a fork reconciliation state.
958
                if peer_state_info.fork_reconciliation_blocks.is_empty() {
32✔
959
                    peer_state_info.highest_shared_block_height = new_block_height;
22✔
960
                }
22✔
961

962
                self.try_ensure_path(block, peer, peer_state_info).await?;
32✔
963

964
                // Reward happens as part of `try_ensure_path`
965

966
                Ok(KEEP_CONNECTION_ALIVE)
31✔
967
            }
968
            PeerMessage::BlockRequestBatch(BlockRequestBatch {
969
                known_blocks,
8✔
970
                max_response_len,
8✔
971
                anchor,
8✔
972
            }) => {
973
                debug!(
8✔
974
                    "Received BlockRequestBatch from peer {}, max_response_len: {max_response_len}",
×
975
                    self.peer_address
976
                );
977

978
                if known_blocks.len() > MAX_NUM_DIGESTS_IN_BATCH_REQUEST {
8✔
979
                    self.punish(NegativePeerSanction::BatchBlocksRequestTooManyDigests)
×
980
                        .await?;
×
981

982
                    return Ok(KEEP_CONNECTION_ALIVE);
×
983
                }
8✔
984

985
                // The last block in the list of the peers known block is the
986
                // earliest block, block with lowest height, the peer has
987
                // requested. If it does not belong to canonical chain, none of
988
                // the later will. So we can do an early abort in that case.
989
                let least_preferred = match known_blocks.last() {
8✔
990
                    Some(least_preferred) => *least_preferred,
8✔
991
                    None => {
992
                        self.punish(NegativePeerSanction::BatchBlocksRequestEmpty)
×
993
                            .await?;
×
994

995
                        return Ok(KEEP_CONNECTION_ALIVE);
×
996
                    }
997
                };
998

999
                let state = self.global_state_lock.lock_guard().await;
8✔
1000
                let block_mmr_num_leafs = state.chain.light_state().header().height.next().into();
8✔
1001
                let luca_is_known = state
8✔
1002
                    .chain
8✔
1003
                    .archival_state()
8✔
1004
                    .block_belongs_to_canonical_chain(least_preferred)
8✔
1005
                    .await;
8✔
1006
                if !luca_is_known || anchor.num_leafs() > block_mmr_num_leafs {
8✔
1007
                    drop(state);
×
1008
                    self.punish(NegativePeerSanction::BatchBlocksUnknownRequest)
×
1009
                        .await?;
×
1010
                    peer.send(PeerMessage::UnableToSatisfyBatchRequest).await?;
×
1011

1012
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1013
                }
8✔
1014

1015
                // Happy case: At least *one* of the blocks referenced by peer
1016
                // is known to us.
1017
                let first_block_in_response = {
8✔
1018
                    let mut first_block_in_response: Option<BlockHeight> = None;
8✔
1019
                    for block_digest in known_blocks {
10✔
1020
                        if state
10✔
1021
                            .chain
10✔
1022
                            .archival_state()
10✔
1023
                            .block_belongs_to_canonical_chain(block_digest)
10✔
1024
                            .await
10✔
1025
                        {
1026
                            let height = state
8✔
1027
                                .chain
8✔
1028
                                .archival_state()
8✔
1029
                                .get_block_header(block_digest)
8✔
1030
                                .await
8✔
1031
                                .unwrap()
8✔
1032
                                .height;
1033
                            first_block_in_response = Some(height);
8✔
1034
                            debug!(
8✔
1035
                                "Found block in canonical chain for batch response: {}",
×
1036
                                block_digest
1037
                            );
1038
                            break;
8✔
1039
                        }
2✔
1040
                    }
1041

1042
                    first_block_in_response
8✔
1043
                        .expect("existence of LUCA should have been established already.")
8✔
1044
                };
1045

1046
                debug!(
8✔
1047
                    "Peer's most preferred block has height {first_block_in_response}.\
×
1048
                 Now building response from that height."
×
1049
                );
1050

1051
                // Get the relevant blocks, at most batch-size many, descending from the
1052
                // peer's (alleged) most canonical block. Don't exceed `max_response_len`
1053
                // or `STANDARD_BLOCK_BATCH_SIZE` number of blocks in response.
1054
                let max_response_len = cmp::min(
8✔
1055
                    max_response_len,
8✔
1056
                    self.global_state_lock.cli().sync_mode_threshold,
8✔
1057
                );
1058
                let max_response_len = cmp::max(max_response_len, MINIMUM_BLOCK_BATCH_SIZE);
8✔
1059
                let max_response_len = cmp::min(max_response_len, STANDARD_BLOCK_BATCH_SIZE);
8✔
1060

1061
                let mut digests_of_returned_blocks = Vec::with_capacity(max_response_len);
8✔
1062
                let response_start_height: u64 = first_block_in_response.into();
8✔
1063
                let mut i: u64 = 1;
8✔
1064
                while digests_of_returned_blocks.len() < max_response_len {
31✔
1065
                    let block_height = response_start_height + i;
31✔
1066
                    match state
31✔
1067
                        .chain
31✔
1068
                        .archival_state()
31✔
1069
                        .archival_block_mmr
31✔
1070
                        .ammr()
31✔
1071
                        .try_get_leaf(block_height)
31✔
1072
                        .await
31✔
1073
                    {
1074
                        Some(digest) => {
23✔
1075
                            digests_of_returned_blocks.push(digest);
23✔
1076
                        }
23✔
1077
                        None => break,
8✔
1078
                    }
1079
                    i += 1;
23✔
1080
                }
1081

1082
                let mut returned_blocks: Vec<Block> =
8✔
1083
                    Vec::with_capacity(digests_of_returned_blocks.len());
8✔
1084
                for block_digest in digests_of_returned_blocks {
31✔
1085
                    let block = state
23✔
1086
                        .chain
23✔
1087
                        .archival_state()
23✔
1088
                        .get_block(block_digest)
23✔
1089
                        .await?
23✔
1090
                        .unwrap();
23✔
1091
                    returned_blocks.push(block);
23✔
1092
                }
1093

1094
                let response = Self::batch_response(&state, returned_blocks, &anchor).await;
8✔
1095

1096
                // issue 457. do not hold lock across a peer.send(), nor self.punish()
1097
                drop(state);
8✔
1098

1099
                let Some(response) = response else {
8✔
1100
                    warn!("Unable to satisfy batch-block request");
×
1101
                    self.punish(NegativePeerSanction::BatchBlocksUnknownRequest)
×
1102
                        .await?;
×
1103
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1104
                };
1105

1106
                debug!("Returning {} blocks in batch response", response.len());
8✔
1107

1108
                let response = PeerMessage::BlockResponseBatch(response);
8✔
1109
                peer.send(response).await?;
8✔
1110

1111
                Ok(KEEP_CONNECTION_ALIVE)
8✔
1112
            }
1113
            PeerMessage::BlockResponseBatch(authenticated_blocks) => {
×
1114
                log_slow_scope!(fn_name!() + "::PeerMessage::BlockResponseBatch");
×
1115

1116
                debug!(
×
1117
                    "handling block response batch with {} blocks",
×
1118
                    authenticated_blocks.len()
×
1119
                );
1120

1121
                // (Alan:) why is there even a minimum?
1122
                if authenticated_blocks.len() < MINIMUM_BLOCK_BATCH_SIZE {
×
1123
                    warn!("Got smaller batch response than allowed");
×
1124
                    self.punish(NegativePeerSanction::TooShortBlockBatch)
×
1125
                        .await?;
×
1126
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1127
                }
×
1128

1129
                // Verify that we are in fact in syncing mode
1130
                // TODO: Separate peer messages into those allowed under syncing
1131
                // and those that are not
1132
                let Some(sync_anchor) = self
×
1133
                    .global_state_lock
×
1134
                    .lock_guard()
×
1135
                    .await
×
1136
                    .net
1137
                    .sync_anchor
1138
                    .clone()
×
1139
                else {
1140
                    warn!("Received a batch of blocks without being in syncing mode");
×
1141
                    self.punish(NegativePeerSanction::ReceivedBatchBlocksOutsideOfSync)
×
1142
                        .await?;
×
1143
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1144
                };
1145

1146
                // Verify that the response matches the current state
1147
                // We get the latest block from the DB here since this message is
1148
                // only valid for archival nodes.
1149
                let (first_block, _) = &authenticated_blocks[0];
×
1150
                let first_blocks_parent_digest: Digest = first_block.header.prev_block_digest;
×
1151
                let most_canonical_own_block_match: Option<Block> = self
×
1152
                    .global_state_lock
×
1153
                    .lock_guard()
×
1154
                    .await
×
1155
                    .chain
1156
                    .archival_state()
×
1157
                    .get_block(first_blocks_parent_digest)
×
1158
                    .await
×
1159
                    .expect("Block lookup must succeed");
×
1160
                let most_canonical_own_block_match: Block = match most_canonical_own_block_match {
×
1161
                    Some(block) => block,
×
1162
                    None => {
1163
                        warn!("Got batch response with invalid start block");
×
1164
                        self.punish(NegativePeerSanction::BatchBlocksInvalidStartHeight)
×
1165
                            .await?;
×
1166
                        return Ok(KEEP_CONNECTION_ALIVE);
×
1167
                    }
1168
                };
1169

1170
                // Convert all blocks to Block objects
1171
                debug!(
×
1172
                    "Found own block of height {} to match received batch",
×
1173
                    most_canonical_own_block_match.kernel.header.height
×
1174
                );
1175
                let mut received_blocks = vec![];
×
1176
                for (t_block, membership_proof) in authenticated_blocks {
×
1177
                    let Ok(block) = Block::try_from(t_block) else {
×
1178
                        warn!("Received invalid transfer block from peer");
×
1179
                        self.punish(NegativePeerSanction::InvalidTransferBlock)
×
1180
                            .await?;
×
1181
                        return Ok(KEEP_CONNECTION_ALIVE);
×
1182
                    };
1183

1184
                    if !membership_proof.verify(
×
1185
                        block.header().height.into(),
×
1186
                        block.hash(),
×
1187
                        &sync_anchor.block_mmr.peaks(),
×
1188
                        sync_anchor.block_mmr.num_leafs(),
×
1189
                    ) {
×
1190
                        warn!("Authentication of received block fails relative to anchor");
×
1191
                        self.punish(NegativePeerSanction::InvalidBlockMmrAuthentication)
×
1192
                            .await?;
×
1193
                        return Ok(KEEP_CONNECTION_ALIVE);
×
1194
                    }
×
1195

1196
                    received_blocks.push(block);
×
1197
                }
1198

1199
                // Get the latest block that we know of and handle all received blocks
1200
                self.handle_blocks(received_blocks, most_canonical_own_block_match)
×
1201
                    .await?;
×
1202

1203
                // Reward happens as part of `handle_blocks`.
1204

1205
                Ok(KEEP_CONNECTION_ALIVE)
×
1206
            }
1207
            PeerMessage::UnableToSatisfyBatchRequest => {
1208
                log_slow_scope!(fn_name!() + "::PeerMessage::UnableToSatisfyBatchRequest");
×
1209
                warn!(
×
1210
                    "Peer {} reports inability to satisfy batch request.",
×
1211
                    self.peer_address
1212
                );
1213

1214
                Ok(KEEP_CONNECTION_ALIVE)
×
1215
            }
1216
            PeerMessage::Handshake(_) => {
1217
                log_slow_scope!(fn_name!() + "::PeerMessage::Handshake");
×
1218

1219
                // The handshake should have been sent during connection
1220
                // initialization. Here it is out of order at best, malicious at
1221
                // worst.
1222
                self.punish(NegativePeerSanction::InvalidMessage).await?;
×
1223
                Ok(KEEP_CONNECTION_ALIVE)
×
1224
            }
1225
            PeerMessage::ConnectionStatus(_) => {
1226
                log_slow_scope!(fn_name!() + "::PeerMessage::ConnectionStatus");
×
1227

1228
                // The connection status should have been sent during connection
1229
                // initialization. Here it is out of order at best, malicious at
1230
                // worst.
1231

1232
                self.punish(NegativePeerSanction::InvalidMessage).await?;
×
1233
                Ok(KEEP_CONNECTION_ALIVE)
×
1234
            }
1235
            PeerMessage::Transaction(transaction) => {
5✔
1236
                log_slow_scope!(fn_name!() + "::PeerMessage::Transaction");
5✔
1237

1238
                debug!(
5✔
1239
                    "`peer_loop` received following transaction from peer. {} inputs, {} outputs. Synced to mutator set hash: {}",
×
1240
                    transaction.kernel.inputs.len(),
×
1241
                    transaction.kernel.outputs.len(),
×
1242
                    transaction.kernel.mutator_set_hash
×
1243
                );
1244

1245
                let transaction: Transaction = (*transaction).into();
5✔
1246

1247
                // 1. If transaction is invalid, punish.
1248
                if !transaction
5✔
1249
                    .is_valid(self.global_state_lock.cli().network)
5✔
1250
                    .await
5✔
1251
                {
1252
                    warn!("Received invalid tx");
×
1253
                    self.punish(NegativePeerSanction::InvalidTransaction)
×
1254
                        .await?;
×
1255
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1256
                }
5✔
1257

1258
                // 2. If transaction has coinbase, punish.
1259
                // Transactions received from peers have not been mined yet.
1260
                // Only the miner is allowed to produce transactions with non-empty coinbase fields.
1261
                if transaction.kernel.coinbase.is_some() {
5✔
1262
                    warn!("Received non-mined transaction with coinbase.");
×
1263
                    self.punish(NegativePeerSanction::NonMinedTransactionHasCoinbase)
×
1264
                        .await?;
×
1265
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1266
                }
5✔
1267

1268
                // 3. If negative fee, punish.
1269
                if transaction.kernel.fee.is_negative() {
5✔
1270
                    warn!("Received negative-fee transaction.");
×
1271
                    self.punish(NegativePeerSanction::TransactionWithNegativeFee)
×
1272
                        .await?;
×
1273
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1274
                }
5✔
1275

1276
                // 4. If transaction is already known, ignore.
1277
                if self
5✔
1278
                    .global_state_lock
5✔
1279
                    .lock_guard()
5✔
1280
                    .await
5✔
1281
                    .mempool
1282
                    .contains_with_higher_proof_quality(
5✔
1283
                        transaction.kernel.txid(),
5✔
1284
                        transaction.proof.proof_quality()?,
5✔
1285
                    )
1286
                {
1287
                    warn!("Received transaction that was already known");
×
1288

1289
                    // We received a transaction that we *probably* haven't requested.
1290
                    // Consider punishing here, if this is abused.
1291
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1292
                }
5✔
1293

1294
                // 5. if transaction is not confirmable, punish.
1295
                let (tip, mutator_set_accumulator_after) = {
5✔
1296
                    let state = self.global_state_lock.lock_guard().await;
5✔
1297

1298
                    (
5✔
1299
                        state.chain.light_state().hash(),
5✔
1300
                        state.chain.light_state().mutator_set_accumulator_after(),
5✔
1301
                    )
5✔
1302
                };
1303
                if !transaction.is_confirmable_relative_to(&mutator_set_accumulator_after) {
5✔
1304
                    warn!(
×
1305
                        "Received unconfirmable transaction with TXID {}. Unconfirmable because:",
×
1306
                        transaction.kernel.txid()
×
1307
                    );
1308
                    // get fine-grained error code for informative logging
1309
                    let confirmability_error_code = transaction
×
1310
                        .kernel
×
1311
                        .is_confirmable_relative_to(&mutator_set_accumulator_after);
×
1312
                    match confirmability_error_code {
×
1313
                        Ok(_) => unreachable!(),
×
1314
                        Err(TransactionConfirmabilityError::InvalidRemovalRecord(index)) => {
×
1315
                            warn!("invalid removal record (at index {index})");
×
1316
                            let invalid_removal_record = transaction.kernel.inputs[index].clone();
×
1317
                            let removal_record_error_code = invalid_removal_record
×
1318
                                .validate_inner(&mutator_set_accumulator_after);
×
1319
                            debug!(
×
1320
                                "Absolute index set of removal record {index}: {:?}",
×
1321
                                invalid_removal_record.absolute_indices
1322
                            );
1323
                            match removal_record_error_code {
×
1324
                                Ok(_) => unreachable!(),
×
1325
                                Err(RemovalRecordValidityError::AbsentAuthenticatedChunk) => {
1326
                                    debug!("invalid because membership proof is missing");
×
1327
                                }
1328
                                Err(RemovalRecordValidityError::InvalidSwbfiMmrMp {
1329
                                    chunk_index,
×
1330
                                }) => {
1331
                                    debug!("invalid because membership proof for chunk index {chunk_index} is invalid");
×
1332
                                }
1333
                            };
1334
                            self.punish(NegativePeerSanction::UnconfirmableTransaction)
×
1335
                                .await?;
×
1336
                            return Ok(KEEP_CONNECTION_ALIVE);
×
1337
                        }
1338
                        Err(TransactionConfirmabilityError::DuplicateInputs) => {
1339
                            warn!("duplicate inputs");
×
1340
                            self.punish(NegativePeerSanction::DoubleSpendingTransaction)
×
1341
                                .await?;
×
1342
                            return Ok(KEEP_CONNECTION_ALIVE);
×
1343
                        }
1344
                        Err(TransactionConfirmabilityError::AlreadySpentInput(index)) => {
×
1345
                            warn!("already spent input (at index {index})");
×
1346
                            self.punish(NegativePeerSanction::DoubleSpendingTransaction)
×
1347
                                .await?;
×
1348
                            return Ok(KEEP_CONNECTION_ALIVE);
×
1349
                        }
1350
                    };
1351
                }
5✔
1352

1353
                // If transaction cannot be applied to mutator set, punish.
1354
                // I don't think this can happen when above checks pass but we include
1355
                // the check to ensure that transaction can be applied.
1356
                let ms_update = MutatorSetUpdate::new(
5✔
1357
                    transaction.kernel.inputs.clone(),
5✔
1358
                    transaction.kernel.outputs.clone(),
5✔
1359
                );
1360
                let can_apply = ms_update
5✔
1361
                    .apply_to_accumulator(&mut mutator_set_accumulator_after.clone())
5✔
1362
                    .is_ok();
5✔
1363
                if !can_apply {
5✔
1364
                    warn!("Cannot apply transaction to current mutator set.");
×
1365
                    warn!("Transaction ID: {}", transaction.kernel.txid());
×
1366
                    self.punish(NegativePeerSanction::CannotApplyTransactionToMutatorSet)
×
1367
                        .await?;
×
1368
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1369
                }
5✔
1370

1371
                let tx_timestamp = transaction.kernel.timestamp;
5✔
1372

1373
                // 6. Ignore if transaction is too old
1374
                let now = self.now();
5✔
1375
                if tx_timestamp < now - Timestamp::seconds(MEMPOOL_TX_THRESHOLD_AGE_IN_SECS) {
5✔
1376
                    // TODO: Consider punishing here
1377
                    warn!("Received too old tx");
×
1378
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1379
                }
5✔
1380

1381
                // 7. Ignore if transaction is too far into the future
1382
                if tx_timestamp
5✔
1383
                    > now + Timestamp::seconds(MEMPOOL_IGNORE_TRANSACTIONS_THIS_MANY_SECS_AHEAD)
5✔
1384
                {
1385
                    // TODO: Consider punishing here
1386
                    warn!("Received tx too far into the future. Got timestamp: {tx_timestamp:?}");
×
1387
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1388
                }
5✔
1389

1390
                // Otherwise, relay to main
1391
                let pt2m_transaction = PeerTaskToMainTransaction {
5✔
1392
                    transaction,
5✔
1393
                    confirmable_for_block: tip,
5✔
1394
                };
5✔
1395
                self.to_main_tx
5✔
1396
                    .send(PeerTaskToMain::Transaction(Box::new(pt2m_transaction)))
5✔
1397
                    .await?;
5✔
1398

1399
                Ok(KEEP_CONNECTION_ALIVE)
5✔
1400
            }
1401
            PeerMessage::TransactionNotification(tx_notification) => {
12✔
1402
                // addresses #457
1403
                // new scope for state read-lock to avoid holding across peer.send()
1404
                {
1405
                    log_slow_scope!(fn_name!() + "::PeerMessage::TransactionNotification");
12✔
1406

1407
                    // 1. Ignore if we already know this transaction, and
1408
                    // the proof quality is not higher than what we already know.
1409
                    let state = self.global_state_lock.lock_guard().await;
12✔
1410
                    let transaction_of_same_or_higher_proof_quality_is_known =
12✔
1411
                        state.mempool.contains_with_higher_proof_quality(
12✔
1412
                            tx_notification.txid,
12✔
1413
                            tx_notification.proof_quality,
12✔
1414
                        );
1415
                    if transaction_of_same_or_higher_proof_quality_is_known {
12✔
1416
                        debug!("transaction with same or higher proof quality was already known");
7✔
1417
                        return Ok(KEEP_CONNECTION_ALIVE);
7✔
1418
                    }
5✔
1419

1420
                    // Only accept transactions that do not require executing
1421
                    // `update`.
1422
                    if state
5✔
1423
                        .chain
5✔
1424
                        .light_state()
5✔
1425
                        .mutator_set_accumulator_after()
5✔
1426
                        .hash()
5✔
1427
                        != tx_notification.mutator_set_hash
5✔
1428
                    {
1429
                        debug!("transaction refers to non-canonical mutator set state");
×
1430
                        return Ok(KEEP_CONNECTION_ALIVE);
×
1431
                    }
5✔
1432
                }
1433

1434
                // 2. Request the actual `Transaction` from peer
1435
                debug!("requesting transaction from peer");
5✔
1436
                peer.send(PeerMessage::TransactionRequest(tx_notification.txid))
5✔
1437
                    .await?;
5✔
1438

1439
                Ok(KEEP_CONNECTION_ALIVE)
5✔
1440
            }
1441
            PeerMessage::TransactionRequest(transaction_identifier) => {
5✔
1442
                let state = self.global_state_lock.lock_guard().await;
5✔
1443
                let Some(transaction) = state.mempool.get(transaction_identifier) else {
5✔
1444
                    return Ok(KEEP_CONNECTION_ALIVE);
1✔
1445
                };
1446

1447
                let Ok(transfer_transaction) = transaction.try_into() else {
4✔
1448
                    warn!("Peer requested transaction that cannot be converted to transfer object");
×
1449
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1450
                };
1451

1452
                // Drop state immediately to prevent holding over a response.
1453
                drop(state);
4✔
1454

1455
                peer.send(PeerMessage::Transaction(Box::new(transfer_transaction)))
4✔
1456
                    .await?;
4✔
1457

1458
                Ok(KEEP_CONNECTION_ALIVE)
4✔
1459
            }
1460
            PeerMessage::BlockProposalNotification(block_proposal_notification) => {
1✔
1461
                let verdict = self
1✔
1462
                    .global_state_lock
1✔
1463
                    .lock_guard()
1✔
1464
                    .await
1✔
1465
                    .favor_incoming_block_proposal(
1✔
1466
                        block_proposal_notification.height,
1✔
1467
                        block_proposal_notification.guesser_fee,
1✔
1468
                    );
1469
                match verdict {
1✔
1470
                    Ok(_) => {
1471
                        peer.send(PeerMessage::BlockProposalRequest(
1✔
1472
                            BlockProposalRequest::new(block_proposal_notification.body_mast_hash),
1✔
1473
                        ))
1✔
1474
                        .await?
1✔
1475
                    }
1476
                    Err(reject_reason) => {
×
1477
                        info!(
×
1478
                        "Rejecting notification of block proposal with guesser fee {} from peer \
×
1479
                        {}. Reason:\n{reject_reason}",
×
1480
                        block_proposal_notification.guesser_fee.display_n_decimals(5),
×
1481
                        self.peer_address
1482
                    )
1483
                    }
1484
                }
1485

1486
                Ok(KEEP_CONNECTION_ALIVE)
1✔
1487
            }
1488
            PeerMessage::BlockProposalRequest(block_proposal_request) => {
×
1489
                let matching_proposal = self
×
1490
                    .global_state_lock
×
1491
                    .lock_guard()
×
1492
                    .await
×
1493
                    .mining_state
1494
                    .block_proposal
1495
                    .filter(|x| x.body().mast_hash() == block_proposal_request.body_mast_hash)
×
1496
                    .map(|x| x.to_owned());
×
1497
                if let Some(proposal) = matching_proposal {
×
1498
                    peer.send(PeerMessage::BlockProposal(Box::new(proposal)))
×
1499
                        .await?;
×
1500
                } else {
1501
                    self.punish(NegativePeerSanction::BlockProposalNotFound)
×
1502
                        .await?;
×
1503
                }
1504

1505
                Ok(KEEP_CONNECTION_ALIVE)
×
1506
            }
1507
            PeerMessage::BlockProposal(block) => {
1✔
1508
                info!("Got block proposal from peer.");
1✔
1509

1510
                let should_punish = {
1✔
1511
                    log_slow_scope!(fn_name!() + "::PeerMessage::BlockProposal::should_punish");
1✔
1512

1513
                    let (verdict, tip) = {
1✔
1514
                        let state = self.global_state_lock.lock_guard().await;
1✔
1515

1516
                        let verdict = state.favor_incoming_block_proposal(
1✔
1517
                            block.header().height,
1✔
1518
                            block.total_guesser_reward(),
1✔
1519
                        );
1✔
1520
                        let tip = state.chain.light_state().to_owned();
1✔
1521
                        (verdict, tip)
1✔
1522
                    };
1523

1524
                    if let Err(rejection_reason) = verdict {
1✔
1525
                        match rejection_reason {
×
1526
                            // no need to punish and log if the fees are equal.  we just ignore the incoming proposal.
1527
                            BlockProposalRejectError::InsufficientFee { current, received }
×
1528
                                if Some(received) == current =>
×
1529
                            {
1530
                                debug!("ignoring new block proposal because the fee is equal to the present one");
×
1531
                                None
×
1532
                            }
1533
                            _ => {
1534
                                warn!("Rejecting new block proposal:\n{rejection_reason}");
×
1535
                                Some(NegativePeerSanction::NonFavorableBlockProposal)
×
1536
                            }
1537
                        }
1538
                    } else {
1539
                        // Verify validity and that proposal is child of current tip
1540
                        if block
1✔
1541
                            .is_valid(&tip, self.now(), self.global_state_lock.cli().network)
1✔
1542
                            .await
1✔
1543
                        {
1544
                            None // all is well, no punishment.
1✔
1545
                        } else {
1546
                            Some(NegativePeerSanction::InvalidBlockProposal)
×
1547
                        }
1548
                    }
1549
                };
1550

1551
                if let Some(sanction) = should_punish {
1✔
1552
                    self.punish(sanction).await?;
×
1553
                } else {
1554
                    self.send_to_main(PeerTaskToMain::BlockProposal(block), line!())
1✔
1555
                        .await?;
1✔
1556

1557
                    // Valuable, new, hard-to-produce information. Reward peer.
1558
                    self.reward(PositivePeerSanction::NewBlockProposal).await?;
1✔
1559
                }
1560

1561
                Ok(KEEP_CONNECTION_ALIVE)
1✔
1562
            }
1563
        }
1564
    }
147✔
1565

1566
    /// send msg to main via mpsc channel `to_main_tx` and logs if slow.
1567
    ///
1568
    /// the channel could potentially fill up in which case the send() will
1569
    /// block until there is capacity.  we wrap the send() so we can log if
1570
    /// that ever happens to the extent it passes slow-scope threshold.
1571
    async fn send_to_main(
1✔
1572
        &self,
1✔
1573
        msg: PeerTaskToMain,
1✔
1574
        line: u32,
1✔
1575
    ) -> Result<(), tokio::sync::mpsc::error::SendError<PeerTaskToMain>> {
1✔
1576
        // we measure across the send() in case the channel ever fills up.
1577
        log_slow_scope!(fn_name!() + &format!("peer_loop.rs:{}", line));
1✔
1578

1579
        self.to_main_tx.send(msg).await
1✔
1580
    }
1✔
1581

1582
    /// Handle message from main task. The boolean return value indicates if
1583
    /// the connection should be closed.
1584
    ///
1585
    /// Locking:
1586
    ///   * acquires `global_state_lock` for write via Self::punish()
1587
    async fn handle_main_task_message<S>(
30✔
1588
        &mut self,
30✔
1589
        msg: MainToPeerTask,
30✔
1590
        peer: &mut S,
30✔
1591
        peer_state_info: &mut MutablePeerState,
30✔
1592
    ) -> Result<bool>
30✔
1593
    where
30✔
1594
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
30✔
1595
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
30✔
1596
        <S as TryStream>::Error: std::error::Error,
30✔
1597
    {
30✔
1598
        debug!("Handling {} message from main in peer loop", msg.get_type());
30✔
1599
        match msg {
30✔
1600
            MainToPeerTask::Block(block) => {
24✔
1601
                // We don't currently differentiate whether a new block came from a peer, or from our
1602
                // own miner. It's always shared through this logic.
1603
                let new_block_height = block.kernel.header.height;
24✔
1604
                if new_block_height > peer_state_info.highest_shared_block_height {
24✔
1605
                    debug!("Sending PeerMessage::BlockNotification");
12✔
1606
                    peer_state_info.highest_shared_block_height = new_block_height;
12✔
1607
                    peer.send(PeerMessage::BlockNotification(block.as_ref().into()))
12✔
1608
                        .await?;
12✔
1609
                    debug!("Sent PeerMessage::BlockNotification");
12✔
1610
                }
12✔
1611
                Ok(KEEP_CONNECTION_ALIVE)
24✔
1612
            }
1613
            MainToPeerTask::RequestBlockBatch(batch_block_request) => {
×
1614
                // Only ask one of the peers about the batch of blocks
1615
                if batch_block_request.peer_addr_target != self.peer_address {
×
1616
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1617
                }
×
1618

1619
                let max_response_len = std::cmp::min(
×
1620
                    STANDARD_BLOCK_BATCH_SIZE,
1621
                    self.global_state_lock.cli().sync_mode_threshold,
×
1622
                );
1623

1624
                peer.send(PeerMessage::BlockRequestBatch(BlockRequestBatch {
×
1625
                    known_blocks: batch_block_request.known_blocks,
×
1626
                    max_response_len,
×
1627
                    anchor: batch_block_request.anchor_mmr,
×
1628
                }))
×
1629
                .await?;
×
1630

1631
                Ok(KEEP_CONNECTION_ALIVE)
×
1632
            }
1633
            MainToPeerTask::PeerSynchronizationTimeout(socket_addr) => {
×
1634
                log_slow_scope!(fn_name!() + "::MainToPeerTask::PeerSynchronizationTimeout");
×
1635

1636
                if self.peer_address != socket_addr {
×
1637
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1638
                }
×
1639

1640
                self.punish(NegativePeerSanction::SynchronizationTimeout)
×
1641
                    .await?;
×
1642

1643
                // If this peer failed the last synchronization attempt, we only
1644
                // sanction, we don't disconnect.
1645
                Ok(KEEP_CONNECTION_ALIVE)
×
1646
            }
1647
            MainToPeerTask::MakePeerDiscoveryRequest => {
1648
                peer.send(PeerMessage::PeerListRequest).await?;
×
1649
                Ok(KEEP_CONNECTION_ALIVE)
×
1650
            }
1651
            MainToPeerTask::Disconnect(peer_address) => {
×
1652
                log_slow_scope!(fn_name!() + "::MainToPeerTask::Disconnect");
×
1653

1654
                // Only disconnect from the peer the main task requested a disconnect for.
1655
                if peer_address != self.peer_address {
×
1656
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1657
                }
×
1658
                self.register_peer_disconnection().await;
×
1659

1660
                Ok(DISCONNECT_CONNECTION)
×
1661
            }
1662
            MainToPeerTask::DisconnectAll() => {
1663
                self.register_peer_disconnection().await;
×
1664

1665
                Ok(DISCONNECT_CONNECTION)
×
1666
            }
1667
            MainToPeerTask::MakeSpecificPeerDiscoveryRequest(target_socket_addr) => {
×
1668
                if target_socket_addr == self.peer_address {
×
1669
                    peer.send(PeerMessage::PeerListRequest).await?;
×
1670
                }
×
1671
                Ok(KEEP_CONNECTION_ALIVE)
×
1672
            }
1673
            MainToPeerTask::TransactionNotification(transaction_notification) => {
6✔
1674
                debug!("Sending PeerMessage::TransactionNotification");
6✔
1675
                peer.send(PeerMessage::TransactionNotification(
6✔
1676
                    transaction_notification,
6✔
1677
                ))
6✔
1678
                .await?;
6✔
1679
                debug!("Sent PeerMessage::TransactionNotification");
6✔
1680
                Ok(KEEP_CONNECTION_ALIVE)
6✔
1681
            }
1682
            MainToPeerTask::BlockProposalNotification(block_proposal_notification) => {
×
1683
                debug!("Sending PeerMessage::BlockProposalNotification");
×
1684
                peer.send(PeerMessage::BlockProposalNotification(
×
1685
                    block_proposal_notification,
×
1686
                ))
×
1687
                .await?;
×
1688
                debug!("Sent PeerMessage::BlockProposalNotification");
×
1689
                Ok(KEEP_CONNECTION_ALIVE)
×
1690
            }
1691
        }
1692
    }
30✔
1693

1694
    /// Loop for the peer tasks. Awaits either a message from the peer over TCP,
1695
    /// or a message from main over the main-to-peer-tasks broadcast channel.
1696
    async fn run<S>(
47✔
1697
        &mut self,
47✔
1698
        mut peer: S,
47✔
1699
        mut from_main_rx: broadcast::Receiver<MainToPeerTask>,
47✔
1700
        peer_state_info: &mut MutablePeerState,
47✔
1701
    ) -> Result<()>
47✔
1702
    where
47✔
1703
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
47✔
1704
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
47✔
1705
        <S as TryStream>::Error: std::error::Error,
47✔
1706
    {
47✔
1707
        loop {
1708
            select! {
183✔
1709
                // Handle peer messages
1710
                peer_message = peer.try_next() => {
183✔
1711
                    let peer_address = self.peer_address;
147✔
1712
                    let peer_message = match peer_message {
147✔
1713
                        Ok(message) => message,
147✔
1714
                        Err(err) => {
×
1715
                            let msg = format!("Error when receiving from peer: {peer_address}");
×
1716
                            error!("{msg}. Error: {err}");
×
1717
                            bail!("{msg}. Closing connection.");
×
1718
                        }
1719
                    };
1720
                    let Some(peer_message) = peer_message else {
147✔
1721
                        info!("Peer {peer_address} closed connection.");
×
1722
                        break;
×
1723
                    };
1724

1725
                    let syncing =
147✔
1726
                        self.global_state_lock.lock(|s| s.net.sync_anchor.is_some()).await;
147✔
1727
                    let message_type = peer_message.get_type();
147✔
1728
                    if peer_message.ignore_during_sync() && syncing {
147✔
1729
                        debug!(
×
1730
                            "Ignoring {message_type} message when syncing, from {peer_address}",
×
1731
                        );
1732
                        continue;
×
1733
                    }
147✔
1734
                    if peer_message.ignore_when_not_sync() && !syncing {
147✔
1735
                        debug!(
×
1736
                            "Ignoring {message_type} message when not syncing, from {peer_address}",
×
1737
                        );
1738
                        continue;
×
1739
                    }
147✔
1740

1741
                    match self
147✔
1742
                        .handle_peer_message(peer_message, &mut peer, peer_state_info)
147✔
1743
                        .await
147✔
1744
                    {
1745
                        Ok(false) => {}
106✔
1746
                        Ok(true) => {
1747
                            info!("Closing connection to {peer_address}");
40✔
1748
                            break;
40✔
1749
                        }
1750
                        Err(err) => {
1✔
1751
                            warn!("Closing connection to {peer_address} because of error {err}.");
1✔
1752
                            bail!("{err}");
1✔
1753
                        }
1754
                    };
1755
                }
1756

1757
                // Handle messages from main task
1758
                main_msg_res = from_main_rx.recv() => {
183✔
1759
                    let main_msg = main_msg_res.unwrap_or_else(|err| {
30✔
UNCOV
1760
                        let err_msg = format!("Failed to read from main loop: {err}");
×
UNCOV
1761
                        error!(err_msg);
×
UNCOV
1762
                        panic!("{err_msg}");
×
1763
                    });
1764
                    let close_connection = self
30✔
1765
                        .handle_main_task_message(main_msg, &mut peer, peer_state_info)
30✔
1766
                        .await
30✔
1767
                        .unwrap_or_else(|err| {
30✔
1768
                            warn!("handle_main_task_message returned an error: {err}");
×
1769
                            true
×
1770
                        });
×
1771

1772
                    if close_connection {
30✔
1773
                        info!(
×
1774
                            "handle_main_task_message is closing the connection to {}",
×
1775
                            self.peer_address
1776
                        );
1777
                        break;
×
1778
                    }
30✔
1779
                }
1780
            }
1781
        }
1782

1783
        Ok(())
40✔
1784
    }
41✔
1785

1786
    /// Function called before entering the peer loop. Reads the potentially stored
1787
    /// peer standing from the database and does other book-keeping before entering
1788
    /// its final resting place: the `peer_loop`. Note that the peer has already been
1789
    /// accepted for a connection for this loop to be entered. So we don't need
1790
    /// to check the standing again.
1791
    ///
1792
    /// Locking:
1793
    ///   * acquires `global_state_lock` for write
1794
    pub(crate) async fn run_wrapper<S>(
38✔
1795
        &mut self,
38✔
1796
        mut peer: S,
38✔
1797
        from_main_rx: broadcast::Receiver<MainToPeerTask>,
38✔
1798
    ) -> Result<()>
38✔
1799
    where
38✔
1800
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
38✔
1801
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
38✔
1802
        <S as TryStream>::Error: std::error::Error,
38✔
1803
    {
38✔
1804
        const TIME_DIFFERENCE_WARN_THRESHOLD_IN_SECONDS: i128 = 120;
1805

1806
        let cli_args = self.global_state_lock.cli().clone();
38✔
1807

1808
        let standing = self
38✔
1809
            .global_state_lock
38✔
1810
            .lock_guard()
38✔
1811
            .await
38✔
1812
            .net
1813
            .peer_databases
1814
            .peer_standings
1815
            .get(self.peer_address.ip())
38✔
1816
            .await
38✔
1817
            .unwrap_or_else(|| PeerStanding::new(cli_args.peer_tolerance));
37✔
1818

1819
        // Add peer to peer map
1820
        let peer_connection_info = PeerConnectionInfo::new(
37✔
1821
            self.peer_handshake_data.listen_port,
37✔
1822
            self.peer_address,
37✔
1823
            self.inbound_connection,
37✔
1824
        );
1825
        let new_peer = PeerInfo::new(
37✔
1826
            peer_connection_info,
37✔
1827
            &self.peer_handshake_data,
37✔
1828
            SystemTime::now(),
37✔
1829
            cli_args.peer_tolerance,
37✔
1830
        )
1831
        .with_standing(standing);
37✔
1832

1833
        // If timestamps are different, we currently just log a warning.
1834
        let peer_clock_ahead_in_seconds = new_peer.time_difference_in_seconds();
37✔
1835
        let own_clock_ahead_in_seconds = -peer_clock_ahead_in_seconds;
37✔
1836
        if peer_clock_ahead_in_seconds > TIME_DIFFERENCE_WARN_THRESHOLD_IN_SECONDS
37✔
1837
            || own_clock_ahead_in_seconds > TIME_DIFFERENCE_WARN_THRESHOLD_IN_SECONDS
37✔
1838
        {
1839
            let own_datetime_utc: DateTime<Utc> =
×
1840
                new_peer.own_timestamp_connection_established.into();
×
1841
            let peer_datetime_utc: DateTime<Utc> =
×
1842
                new_peer.peer_timestamp_connection_established.into();
×
1843
            warn!(
×
1844
                "New peer {} disagrees with us about time. Peer reports time {} but our clock at handshake was {}.",
×
1845
                new_peer.connected_address(),
×
1846
                peer_datetime_utc.format("%Y-%m-%d %H:%M:%S"),
×
1847
                own_datetime_utc.format("%Y-%m-%d %H:%M:%S"));
×
1848
        }
37✔
1849

1850
        // Multiple tasks might attempt to set up a connection concurrently. So
1851
        // even though we've checked that this connection is allowed, this check
1852
        // could have been invalidated by another task, for one accepting an
1853
        // incoming connection from a peer we're currently connecting to. So we
1854
        // need to make the a check again while holding a write-lock, since
1855
        // we're modifying `peer_map` here. Holding a read-lock doesn't work
1856
        // since it would have to be dropped before acquiring the write-lock.
1857
        {
1858
            let mut global_state = self.global_state_lock.lock_guard_mut().await;
37✔
1859
            let peer_map = &mut global_state.net.peer_map;
37✔
1860
            if peer_map
37✔
1861
                .values()
37✔
1862
                .any(|pi| pi.instance_id() == self.peer_handshake_data.instance_id)
37✔
1863
            {
1864
                bail!("Attempted to connect to already connected peer. Aborting connection.");
×
1865
            }
37✔
1866

1867
            if peer_map.len() >= cli_args.max_num_peers {
37✔
1868
                bail!("Attempted to connect to more peers than allowed. Aborting connection.");
×
1869
            }
37✔
1870

1871
            if peer_map.contains_key(&self.peer_address) {
37✔
1872
                // This shouldn't be possible, unless the peer reports a different instance ID than
1873
                // for the other connection. Only a malignant client would do that.
1874
                bail!("Already connected to peer. Aborting connection");
×
1875
            }
37✔
1876

1877
            peer_map.insert(self.peer_address, new_peer);
37✔
1878
        }
1879

1880
        // `MutablePeerState` contains the part of the peer-loop's state that is mutable
1881
        let mut peer_state = MutablePeerState::new(self.peer_handshake_data.tip_header.height);
37✔
1882

1883
        // If peer indicates more canonical block, request a block notification to catch up ASAP
1884
        if self.peer_handshake_data.tip_header.cumulative_proof_of_work
37✔
1885
            > self
37✔
1886
                .global_state_lock
37✔
1887
                .lock_guard()
37✔
1888
                .await
37✔
1889
                .chain
1890
                .light_state()
37✔
1891
                .kernel
1892
                .header
1893
                .cumulative_proof_of_work
1894
        {
1895
            // Send block notification request to catch up ASAP, in case we're
1896
            // behind the newly-connected peer.
1897
            peer.send(PeerMessage::BlockNotificationRequest).await?;
×
1898
        }
37✔
1899

1900
        let res = self.run(peer, from_main_rx, &mut peer_state).await;
37✔
1901
        debug!("Exited peer loop for {}", self.peer_address);
31✔
1902

1903
        close_peer_connected_callback(
31✔
1904
            self.global_state_lock.clone(),
31✔
1905
            self.peer_address,
31✔
1906
            &self.to_main_tx,
31✔
1907
        )
31✔
1908
        .await;
31✔
1909

1910
        debug!("Ending peer loop for {}", self.peer_address);
31✔
1911

1912
        // Return any error that `run` returned. Returning and not suppressing errors is a quite nice
1913
        // feature to have for testing purposes.
1914
        res
31✔
1915
    }
31✔
1916

1917
    /// Register graceful peer disconnection in the global state.
1918
    ///
1919
    /// See also [`NetworkingState::register_peer_disconnection`][1].
1920
    ///
1921
    /// # Locking:
1922
    ///   * acquires `global_state_lock` for write
1923
    ///
1924
    /// [1]: crate::models::state::networking_state::NetworkingState::register_peer_disconnection
1925
    async fn register_peer_disconnection(&mut self) {
×
1926
        let peer_id = self.peer_handshake_data.instance_id;
×
1927
        self.global_state_lock
×
1928
            .lock_guard_mut()
×
1929
            .await
×
1930
            .net
1931
            .register_peer_disconnection(peer_id, SystemTime::now());
×
1932
    }
×
1933
}
1934

1935
#[cfg(test)]
1936
#[cfg_attr(coverage_nightly, coverage(off))]
1937
mod tests {
1938
    use macro_rules_attr::apply;
1939
    use rand::rngs::StdRng;
1940
    use rand::Rng;
1941
    use rand::SeedableRng;
1942
    use tokio::sync::mpsc::error::TryRecvError;
1943
    use tracing_test::traced_test;
1944

1945
    use super::*;
1946
    use crate::config_models::cli_args;
1947
    use crate::config_models::network::Network;
1948
    use crate::models::blockchain::block::block_header::TARGET_BLOCK_INTERVAL;
1949
    use crate::models::blockchain::type_scripts::native_currency_amount::NativeCurrencyAmount;
1950
    use crate::models::peer::peer_block_notifications::PeerBlockNotification;
1951
    use crate::models::peer::transaction_notification::TransactionNotification;
1952
    use crate::models::state::mempool::TransactionOrigin;
1953
    use crate::models::state::tx_creation_config::TxCreationConfig;
1954
    use crate::models::state::tx_proving_capability::TxProvingCapability;
1955
    use crate::models::state::wallet::wallet_entropy::WalletEntropy;
1956
    use crate::tests::shared::fake_valid_block_for_tests;
1957
    use crate::tests::shared::fake_valid_sequence_of_blocks_for_tests;
1958
    use crate::tests::shared::get_dummy_handshake_data_for_genesis;
1959
    use crate::tests::shared::get_dummy_peer_connection_data_genesis;
1960
    use crate::tests::shared::get_dummy_socket_address;
1961
    use crate::tests::shared::get_test_genesis_setup;
1962
    use crate::tests::shared::invalid_empty_single_proof_transaction;
1963
    use crate::tests::shared::Action;
1964
    use crate::tests::shared::Mock;
1965
    use crate::tests::shared_tokio_runtime;
1966

1967
    #[traced_test]
1968
    #[apply(shared_tokio_runtime)]
1969
    async fn test_peer_loop_bye() -> Result<()> {
1970
        let mock = Mock::new(vec![Action::Read(PeerMessage::Bye)]);
1971

1972
        let (peer_broadcast_tx, _from_main_rx_clone, to_main_tx, _to_main_rx1, state_lock, hsd) =
1973
            get_test_genesis_setup(Network::Alpha, 2, cli_args::Args::default()).await?;
1974

1975
        let peer_address = get_dummy_socket_address(2);
1976
        let from_main_rx_clone = peer_broadcast_tx.subscribe();
1977
        let mut peer_loop_handler =
1978
            PeerLoopHandler::new(to_main_tx, state_lock.clone(), peer_address, hsd, true, 1);
1979
        peer_loop_handler
1980
            .run_wrapper(mock, from_main_rx_clone)
1981
            .await?;
1982

1983
        assert_eq!(
1984
            2,
1985
            state_lock.lock_guard().await.net.peer_map.len(),
1986
            "peer map length must be back to 2 after goodbye"
1987
        );
1988

1989
        Ok(())
1990
    }
1991

1992
    #[traced_test]
1993
    #[apply(shared_tokio_runtime)]
1994
    async fn test_peer_loop_peer_list() {
1995
        let (peer_broadcast_tx, _from_main_rx_clone, to_main_tx, _to_main_rx1, state_lock, _hsd) =
1996
            get_test_genesis_setup(Network::Alpha, 2, cli_args::Args::default())
1997
                .await
1998
                .unwrap();
1999

2000
        let mut peer_infos = state_lock
2001
            .lock_guard()
2002
            .await
2003
            .net
2004
            .peer_map
2005
            .clone()
2006
            .into_values()
2007
            .collect::<Vec<_>>();
2008
        peer_infos.sort_by_cached_key(|x| x.connected_address());
2009
        let (peer_address0, instance_id0) = (
2010
            peer_infos[0].connected_address(),
2011
            peer_infos[0].instance_id(),
2012
        );
2013
        let (peer_address1, instance_id1) = (
2014
            peer_infos[1].connected_address(),
2015
            peer_infos[1].instance_id(),
2016
        );
2017

2018
        let (hsd2, sa2) = get_dummy_peer_connection_data_genesis(Network::Alpha, 2);
2019
        let expected_response = vec![
2020
            (peer_address0, instance_id0),
2021
            (peer_address1, instance_id1),
2022
            (sa2, hsd2.instance_id),
2023
        ];
2024
        let mock = Mock::new(vec![
2025
            Action::Read(PeerMessage::PeerListRequest),
2026
            Action::Write(PeerMessage::PeerListResponse(expected_response)),
2027
            Action::Read(PeerMessage::Bye),
2028
        ]);
2029

2030
        let from_main_rx_clone = peer_broadcast_tx.subscribe();
2031

2032
        let mut peer_loop_handler =
2033
            PeerLoopHandler::new(to_main_tx, state_lock.clone(), sa2, hsd2, true, 0);
2034
        peer_loop_handler
2035
            .run_wrapper(mock, from_main_rx_clone)
2036
            .await
2037
            .unwrap();
2038

2039
        assert_eq!(
2040
            2,
2041
            state_lock.lock_guard().await.net.peer_map.len(),
2042
            "peer map must have length 2 after saying goodbye to peer 2"
2043
        );
2044
    }
2045

2046
    #[traced_test]
2047
    #[apply(shared_tokio_runtime)]
2048
    async fn different_genesis_test() -> Result<()> {
2049
        // In this scenario a peer provides another genesis block than what has been
2050
        // hardcoded. This should lead to the closing of the connection to this peer
2051
        // and a ban.
2052

2053
        let network = Network::Main;
2054
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, mut to_main_rx1, state_lock, hsd) =
2055
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2056
        assert_eq!(1000, state_lock.cli().peer_tolerance);
2057
        let peer_address = get_dummy_socket_address(0);
2058

2059
        // Although the database is empty, `get_latest_block` still returns the genesis block,
2060
        // since that block is hardcoded.
2061
        let mut different_genesis_block = state_lock
2062
            .lock_guard()
2063
            .await
2064
            .chain
2065
            .archival_state()
2066
            .get_tip()
2067
            .await;
2068

2069
        different_genesis_block.set_header_nonce(StdRng::seed_from_u64(5550001).random());
2070
        let [block_1_with_different_genesis] = fake_valid_sequence_of_blocks_for_tests(
2071
            &different_genesis_block,
2072
            Timestamp::hours(1),
2073
            StdRng::seed_from_u64(5550001).random(),
2074
            network,
2075
        )
2076
        .await;
2077
        let mock = Mock::new(vec![Action::Read(PeerMessage::Block(Box::new(
2078
            block_1_with_different_genesis.try_into().unwrap(),
2079
        )))]);
2080

2081
        let mut peer_loop_handler = PeerLoopHandler::new(
2082
            to_main_tx.clone(),
2083
            state_lock.clone(),
2084
            peer_address,
2085
            hsd,
2086
            true,
2087
            1,
2088
        );
2089
        let res = peer_loop_handler
2090
            .run_wrapper(mock, from_main_rx_clone)
2091
            .await;
2092
        assert!(
2093
            res.is_err(),
2094
            "run_wrapper must return failure when genesis is different"
2095
        );
2096

2097
        match to_main_rx1.recv().await {
2098
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
2099
            _ => bail!("Must receive remove of peer block max height"),
2100
        }
2101

2102
        // Verify that no further message was sent to main loop
2103
        match to_main_rx1.try_recv() {
2104
            Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (),
2105
            _ => bail!("Block notification must not be sent for block with invalid PoW"),
2106
        };
2107

2108
        drop(to_main_tx);
2109

2110
        let peer_standing = state_lock
2111
            .lock_guard()
2112
            .await
2113
            .net
2114
            .get_peer_standing_from_database(peer_address.ip())
2115
            .await;
2116
        assert_eq!(
2117
            -i32::from(state_lock.cli().peer_tolerance),
2118
            peer_standing.unwrap().standing
2119
        );
2120
        assert_eq!(
2121
            NegativePeerSanction::DifferentGenesis,
2122
            peer_standing.unwrap().latest_punishment.unwrap().0
2123
        );
2124

2125
        Ok(())
2126
    }
2127

2128
    #[traced_test]
2129
    #[apply(shared_tokio_runtime)]
2130
    async fn node_does_not_record_disconnection_time_when_peer_initiates_disconnect() -> Result<()>
2131
    {
2132
        let args = cli_args::Args::default();
2133
        let network = args.network;
2134
        let (from_main_tx, from_main_rx, to_main_tx, to_main_rx, state_lock, _) =
2135
            get_test_genesis_setup(network, 0, args).await?;
2136

2137
        let peer_address = get_dummy_socket_address(0);
2138
        let peer_handshake_data = get_dummy_handshake_data_for_genesis(network);
2139
        let peer_id = peer_handshake_data.instance_id;
2140
        let mut peer_loop_handler = PeerLoopHandler::new(
2141
            to_main_tx,
2142
            state_lock.clone(),
2143
            peer_address,
2144
            peer_handshake_data,
2145
            true,
2146
            1,
2147
        );
2148
        let mock = Mock::new(vec![Action::Read(PeerMessage::Bye)]);
2149
        peer_loop_handler.run_wrapper(mock, from_main_rx).await?;
2150

2151
        let global_state = state_lock.lock_guard().await;
2152
        assert!(global_state
2153
            .net
2154
            .last_disconnection_time_of_peer(peer_id)
2155
            .is_none());
2156

2157
        drop(to_main_rx);
2158
        drop(from_main_tx);
2159

2160
        Ok(())
2161
    }
2162

2163
    #[traced_test]
2164
    #[apply(shared_tokio_runtime)]
2165
    async fn block_without_valid_pow_test() -> Result<()> {
2166
        // In this scenario, a block without a valid PoW is received. This block should be rejected
2167
        // by the peer loop and a notification should never reach the main loop.
2168

2169
        let network = Network::Main;
2170
        let (peer_broadcast_tx, _from_main_rx_clone, to_main_tx, mut to_main_rx1, state_lock, hsd) =
2171
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2172
        let peer_address = get_dummy_socket_address(0);
2173
        let genesis_block: Block = state_lock
2174
            .lock_guard()
2175
            .await
2176
            .chain
2177
            .archival_state()
2178
            .get_tip()
2179
            .await;
2180

2181
        // Make a with hash above what the implied threshold from
2182
        let [mut block_without_valid_pow] = fake_valid_sequence_of_blocks_for_tests(
2183
            &genesis_block,
2184
            Timestamp::hours(1),
2185
            StdRng::seed_from_u64(5550001).random(),
2186
            network,
2187
        )
2188
        .await;
2189

2190
        // This *probably* is invalid PoW -- and needs to be for this test to
2191
        // work.
2192
        block_without_valid_pow.set_header_nonce(Digest::default());
2193

2194
        // Sending an invalid block will not necessarily result in a ban. This depends on the peer
2195
        // tolerance that is set in the client. For this reason, we include a "Bye" here.
2196
        let mock = Mock::new(vec![
2197
            Action::Read(PeerMessage::Block(Box::new(
2198
                block_without_valid_pow.clone().try_into().unwrap(),
2199
            ))),
2200
            Action::Read(PeerMessage::Bye),
2201
        ]);
2202

2203
        let from_main_rx_clone = peer_broadcast_tx.subscribe();
2204

2205
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
2206
            to_main_tx.clone(),
2207
            state_lock.clone(),
2208
            peer_address,
2209
            hsd,
2210
            true,
2211
            1,
2212
            block_without_valid_pow.header().timestamp,
2213
        );
2214
        peer_loop_handler
2215
            .run_wrapper(mock, from_main_rx_clone)
2216
            .await
2217
            .expect("sending (one) invalid block should not result in closed connection");
2218

2219
        match to_main_rx1.recv().await {
2220
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
2221
            _ => bail!("Must receive remove of peer block max height"),
2222
        }
2223

2224
        // Verify that no further message was sent to main loop
2225
        match to_main_rx1.try_recv() {
2226
            Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (),
2227
            _ => bail!("Block notification must not be sent for block with invalid PoW"),
2228
        };
2229

2230
        // We need to have the transmitter in scope until we have received from it
2231
        // otherwise the receiver will report the disconnected error when we attempt
2232
        // to read from it. And the purpose is to verify that the channel is empty,
2233
        // not that it has been closed.
2234
        drop(to_main_tx);
2235

2236
        // Verify that peer standing was stored in database
2237
        let standing = state_lock
2238
            .lock_guard()
2239
            .await
2240
            .net
2241
            .peer_databases
2242
            .peer_standings
2243
            .get(peer_address.ip())
2244
            .await
2245
            .unwrap();
2246
        assert!(
2247
            standing.standing < 0,
2248
            "Peer must be sanctioned for sending a bad block"
2249
        );
2250

2251
        Ok(())
2252
    }
2253

2254
    #[traced_test]
2255
    #[apply(shared_tokio_runtime)]
2256
    async fn test_peer_loop_block_with_block_in_db() -> Result<()> {
2257
        // The scenario tested here is that a client receives a block that is already
2258
        // known and stored. The expected behavior is to ignore the block and not send
2259
        // a message to the main task.
2260

2261
        let network = Network::Main;
2262
        let (peer_broadcast_tx, _from_main_rx_clone, to_main_tx, mut to_main_rx1, mut alice, hsd) =
2263
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2264
        let peer_address = get_dummy_socket_address(0);
2265
        let genesis_block: Block = Block::genesis(network);
2266

2267
        let now = genesis_block.header().timestamp + Timestamp::hours(1);
2268
        let block_1 =
2269
            fake_valid_block_for_tests(&alice, StdRng::seed_from_u64(5550001).random()).await;
2270
        assert!(
2271
            block_1.is_valid(&genesis_block, now, network).await,
2272
            "Block must be valid for this test to make sense"
2273
        );
2274
        alice.set_new_tip(block_1.clone()).await?;
2275

2276
        let mock_peer_messages = Mock::new(vec![
2277
            Action::Read(PeerMessage::Block(Box::new(
2278
                block_1.clone().try_into().unwrap(),
2279
            ))),
2280
            Action::Read(PeerMessage::Bye),
2281
        ]);
2282

2283
        let from_main_rx_clone = peer_broadcast_tx.subscribe();
2284

2285
        let mut alice_peer_loop_handler = PeerLoopHandler::with_mocked_time(
2286
            to_main_tx.clone(),
2287
            alice.clone(),
2288
            peer_address,
2289
            hsd,
2290
            false,
2291
            1,
2292
            block_1.header().timestamp,
2293
        );
2294
        alice_peer_loop_handler
2295
            .run_wrapper(mock_peer_messages, from_main_rx_clone)
2296
            .await?;
2297

2298
        match to_main_rx1.recv().await {
2299
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
2300
            other => bail!("Must receive remove of peer block max height. Got:\n {other:?}"),
2301
        }
2302
        match to_main_rx1.try_recv() {
2303
            Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (),
2304
            _ => bail!("Block notification must not be sent for block with invalid PoW"),
2305
        };
2306
        drop(to_main_tx);
2307

2308
        if !alice.lock_guard().await.net.peer_map.is_empty() {
2309
            bail!("peer map must be empty after closing connection gracefully");
2310
        }
2311

2312
        Ok(())
2313
    }
2314

2315
    #[traced_test]
2316
    #[apply(shared_tokio_runtime)]
2317
    async fn block_request_batch_simple() {
2318
        // Scenario: Six blocks (including genesis) are known. Peer requests
2319
        // from all possible starting points, and client responds with the
2320
        // correct list of blocks.
2321
        let network = Network::Main;
2322
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, mut state_lock, hsd) =
2323
            get_test_genesis_setup(network, 0, cli_args::Args::default())
2324
                .await
2325
                .unwrap();
2326
        let genesis_block: Block = Block::genesis(network);
2327
        let peer_address = get_dummy_socket_address(0);
2328
        let [block_1, block_2, block_3, block_4, block_5] =
2329
            fake_valid_sequence_of_blocks_for_tests(
2330
                &genesis_block,
2331
                Timestamp::hours(1),
2332
                StdRng::seed_from_u64(5550001).random(),
2333
                network,
2334
            )
2335
            .await;
2336
        let blocks = vec![
2337
            genesis_block,
2338
            block_1,
2339
            block_2,
2340
            block_3,
2341
            block_4,
2342
            block_5.clone(),
2343
        ];
2344
        for block in blocks.iter().skip(1) {
2345
            state_lock.set_new_tip(block.to_owned()).await.unwrap();
2346
        }
2347

2348
        let mmra = state_lock
2349
            .lock_guard()
2350
            .await
2351
            .chain
2352
            .archival_state()
2353
            .archival_block_mmr
2354
            .ammr()
2355
            .to_accumulator_async()
2356
            .await;
2357
        for i in 0..=4 {
2358
            let expected_response = {
2359
                let state = state_lock.lock_guard().await;
2360
                let blocks_for_response = blocks.iter().skip(i + 1).cloned().collect_vec();
2361
                PeerLoopHandler::batch_response(&state, blocks_for_response, &mmra)
2362
                    .await
2363
                    .unwrap()
2364
            };
2365
            let mock = Mock::new(vec![
2366
                Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch {
2367
                    known_blocks: vec![blocks[i].hash()],
2368
                    max_response_len: 14,
2369
                    anchor: mmra.clone(),
2370
                })),
2371
                Action::Write(PeerMessage::BlockResponseBatch(expected_response)),
2372
                Action::Read(PeerMessage::Bye),
2373
            ]);
2374
            let mut peer_loop_handler = PeerLoopHandler::new(
2375
                to_main_tx.clone(),
2376
                state_lock.clone(),
2377
                peer_address,
2378
                hsd.clone(),
2379
                false,
2380
                1,
2381
            );
2382

2383
            peer_loop_handler
2384
                .run_wrapper(mock, from_main_rx_clone.resubscribe())
2385
                .await
2386
                .unwrap();
2387
        }
2388
    }
2389

2390
    #[traced_test]
2391
    #[apply(shared_tokio_runtime)]
2392
    async fn block_request_batch_in_order_test() -> Result<()> {
2393
        // Scenario: A fork began at block 2, node knows two blocks of height 2 and two of height 3.
2394
        // A peer requests a batch of blocks starting from block 1. Ensure that the correct blocks
2395
        // are returned.
2396

2397
        let network = Network::Main;
2398
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, mut state_lock, hsd) =
2399
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2400
        let genesis_block: Block = Block::genesis(network);
2401
        let peer_address = get_dummy_socket_address(0);
2402
        let [block_1, block_2_a, block_3_a] = fake_valid_sequence_of_blocks_for_tests(
2403
            &genesis_block,
2404
            Timestamp::hours(1),
2405
            StdRng::seed_from_u64(5550001).random(),
2406
            network,
2407
        )
2408
        .await;
2409
        let [block_2_b, block_3_b] = fake_valid_sequence_of_blocks_for_tests(
2410
            &block_1,
2411
            Timestamp::hours(1),
2412
            StdRng::seed_from_u64(5550002).random(),
2413
            network,
2414
        )
2415
        .await;
2416
        assert_ne!(block_2_b.hash(), block_2_a.hash());
2417

2418
        state_lock.set_new_tip(block_1.clone()).await?;
2419
        state_lock.set_new_tip(block_2_a.clone()).await?;
2420
        state_lock.set_new_tip(block_2_b.clone()).await?;
2421
        state_lock.set_new_tip(block_3_b.clone()).await?;
2422
        state_lock.set_new_tip(block_3_a.clone()).await?;
2423

2424
        let anchor = state_lock
2425
            .lock_guard()
2426
            .await
2427
            .chain
2428
            .archival_state()
2429
            .archival_block_mmr
2430
            .ammr()
2431
            .to_accumulator_async()
2432
            .await;
2433
        let response_1 = {
2434
            let state_lock = state_lock.lock_guard().await;
2435
            PeerLoopHandler::batch_response(
2436
                &state_lock,
2437
                vec![block_1.clone(), block_2_a.clone(), block_3_a.clone()],
2438
                &anchor,
2439
            )
2440
            .await
2441
            .unwrap()
2442
        };
2443

2444
        let mut mock = Mock::new(vec![
2445
            Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch {
2446
                known_blocks: vec![genesis_block.hash()],
2447
                max_response_len: 14,
2448
                anchor: anchor.clone(),
2449
            })),
2450
            Action::Write(PeerMessage::BlockResponseBatch(response_1)),
2451
            Action::Read(PeerMessage::Bye),
2452
        ]);
2453

2454
        let mut peer_loop_handler_1 = PeerLoopHandler::with_mocked_time(
2455
            to_main_tx.clone(),
2456
            state_lock.clone(),
2457
            peer_address,
2458
            hsd.clone(),
2459
            false,
2460
            1,
2461
            block_3_a.header().timestamp,
2462
        );
2463

2464
        peer_loop_handler_1
2465
            .run_wrapper(mock, from_main_rx_clone.resubscribe())
2466
            .await?;
2467

2468
        // Peer knows block 2_b, verify that canonical chain with 2_a is returned
2469
        let response_2 = {
2470
            let state_lock = state_lock.lock_guard().await;
2471
            PeerLoopHandler::batch_response(
2472
                &state_lock,
2473
                vec![block_2_a, block_3_a.clone()],
2474
                &anchor,
2475
            )
2476
            .await
2477
            .unwrap()
2478
        };
2479
        mock = Mock::new(vec![
2480
            Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch {
2481
                known_blocks: vec![block_2_b.hash(), block_1.hash(), genesis_block.hash()],
2482
                max_response_len: 14,
2483
                anchor,
2484
            })),
2485
            Action::Write(PeerMessage::BlockResponseBatch(response_2)),
2486
            Action::Read(PeerMessage::Bye),
2487
        ]);
2488

2489
        let mut peer_loop_handler_2 = PeerLoopHandler::with_mocked_time(
2490
            to_main_tx.clone(),
2491
            state_lock.clone(),
2492
            peer_address,
2493
            hsd,
2494
            false,
2495
            1,
2496
            block_3_a.header().timestamp,
2497
        );
2498

2499
        peer_loop_handler_2
2500
            .run_wrapper(mock, from_main_rx_clone)
2501
            .await?;
2502

2503
        Ok(())
2504
    }
2505

2506
    #[traced_test]
2507
    #[apply(shared_tokio_runtime)]
2508
    async fn block_request_batch_out_of_order_test() -> Result<()> {
2509
        // Scenario: A fork began at block 2, node knows two blocks of height 2 and two of height 3.
2510
        // A peer requests a batch of blocks starting from block 1, but the peer supplies their
2511
        // hashes in a wrong order. Ensure that the correct blocks are returned, in the right order.
2512
        // The blocks will be supplied in the correct order but starting from the first digest in
2513
        // the list that is known and canonical.
2514

2515
        let network = Network::Main;
2516
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, mut state_lock, hsd) =
2517
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2518
        let genesis_block = Block::genesis(network);
2519
        let peer_address = get_dummy_socket_address(0);
2520
        let [block_1, block_2_a, block_3_a] = fake_valid_sequence_of_blocks_for_tests(
2521
            &genesis_block,
2522
            Timestamp::hours(1),
2523
            StdRng::seed_from_u64(5550001).random(),
2524
            network,
2525
        )
2526
        .await;
2527
        let [block_2_b, block_3_b] = fake_valid_sequence_of_blocks_for_tests(
2528
            &block_1,
2529
            Timestamp::hours(1),
2530
            StdRng::seed_from_u64(5550002).random(),
2531
            network,
2532
        )
2533
        .await;
2534
        assert_ne!(block_2_a.hash(), block_2_b.hash());
2535

2536
        state_lock.set_new_tip(block_1.clone()).await?;
2537
        state_lock.set_new_tip(block_2_a.clone()).await?;
2538
        state_lock.set_new_tip(block_2_b.clone()).await?;
2539
        state_lock.set_new_tip(block_3_b.clone()).await?;
2540
        state_lock.set_new_tip(block_3_a.clone()).await?;
2541

2542
        // Peer knows block 2_b, verify that canonical chain with 2_a is returned
2543
        let mut expected_anchor = block_3_a.body().block_mmr_accumulator.clone();
2544
        expected_anchor.append(block_3_a.hash());
2545
        let state_anchor = state_lock
2546
            .lock_guard()
2547
            .await
2548
            .chain
2549
            .archival_state()
2550
            .archival_block_mmr
2551
            .ammr()
2552
            .to_accumulator_async()
2553
            .await;
2554
        assert_eq!(
2555
            expected_anchor, state_anchor,
2556
            "Catching assumption about MMRA in tip and in archival state"
2557
        );
2558

2559
        let response = {
2560
            let state_lock = state_lock.lock_guard().await;
2561
            PeerLoopHandler::batch_response(
2562
                &state_lock,
2563
                vec![block_1.clone(), block_2_a, block_3_a.clone()],
2564
                &expected_anchor,
2565
            )
2566
            .await
2567
            .unwrap()
2568
        };
2569
        let mock = Mock::new(vec![
2570
            Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch {
2571
                known_blocks: vec![block_2_b.hash(), genesis_block.hash(), block_1.hash()],
2572
                max_response_len: 14,
2573
                anchor: expected_anchor,
2574
            })),
2575
            // Since genesis block is the 1st known in the list of known blocks,
2576
            // it's immediate descendent, block_1, is the first one returned.
2577
            Action::Write(PeerMessage::BlockResponseBatch(response)),
2578
            Action::Read(PeerMessage::Bye),
2579
        ]);
2580

2581
        let mut peer_loop_handler_2 = PeerLoopHandler::with_mocked_time(
2582
            to_main_tx.clone(),
2583
            state_lock.clone(),
2584
            peer_address,
2585
            hsd,
2586
            false,
2587
            1,
2588
            block_3_a.header().timestamp,
2589
        );
2590

2591
        peer_loop_handler_2
2592
            .run_wrapper(mock, from_main_rx_clone)
2593
            .await?;
2594

2595
        Ok(())
2596
    }
2597

2598
    #[traced_test]
2599
    #[apply(shared_tokio_runtime)]
2600
    async fn request_unknown_height_doesnt_crash() {
2601
        // Scenario: Only genesis block is known. Peer requests block of height
2602
        // 2.
2603
        let network = Network::Main;
2604
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, state_lock, hsd) =
2605
            get_test_genesis_setup(network, 0, cli_args::Args::default())
2606
                .await
2607
                .unwrap();
2608
        let peer_address = get_dummy_socket_address(0);
2609
        let mock = Mock::new(vec![
2610
            Action::Read(PeerMessage::BlockRequestByHeight(2.into())),
2611
            Action::Read(PeerMessage::Bye),
2612
        ]);
2613

2614
        let mut peer_loop_handler = PeerLoopHandler::new(
2615
            to_main_tx.clone(),
2616
            state_lock.clone(),
2617
            peer_address,
2618
            hsd,
2619
            false,
2620
            1,
2621
        );
2622

2623
        // This will return error if seen read/write order does not match that of the
2624
        // mocked object.
2625
        peer_loop_handler
2626
            .run_wrapper(mock, from_main_rx_clone)
2627
            .await
2628
            .unwrap();
2629

2630
        // Verify that peer is sanctioned for this nonsense.
2631
        assert!(state_lock
2632
            .lock_guard()
2633
            .await
2634
            .net
2635
            .get_peer_standing_from_database(peer_address.ip())
2636
            .await
2637
            .unwrap()
2638
            .standing
2639
            .is_negative());
2640
    }
2641

2642
    #[traced_test]
2643
    #[apply(shared_tokio_runtime)]
2644
    async fn find_canonical_chain_when_multiple_blocks_at_same_height_test() -> Result<()> {
2645
        // Scenario: A fork began at block 2, node knows two blocks of height 2 and two of height 3.
2646
        // A peer requests a block at height 2. Verify that the correct block at height 2 is
2647
        // returned.
2648

2649
        let network = Network::Main;
2650
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, mut state_lock, hsd) =
2651
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2652
        let genesis_block = Block::genesis(network);
2653
        let peer_address = get_dummy_socket_address(0);
2654

2655
        let [block_1, block_2_a, block_3_a] = fake_valid_sequence_of_blocks_for_tests(
2656
            &genesis_block,
2657
            Timestamp::hours(1),
2658
            StdRng::seed_from_u64(5550001).random(),
2659
            network,
2660
        )
2661
        .await;
2662
        let [block_2_b, block_3_b] = fake_valid_sequence_of_blocks_for_tests(
2663
            &block_1,
2664
            Timestamp::hours(1),
2665
            StdRng::seed_from_u64(5550002).random(),
2666
            network,
2667
        )
2668
        .await;
2669
        assert_ne!(block_2_a.hash(), block_2_b.hash());
2670

2671
        state_lock.set_new_tip(block_1.clone()).await?;
2672
        state_lock.set_new_tip(block_2_a.clone()).await?;
2673
        state_lock.set_new_tip(block_2_b.clone()).await?;
2674
        state_lock.set_new_tip(block_3_b.clone()).await?;
2675
        state_lock.set_new_tip(block_3_a.clone()).await?;
2676

2677
        let mock = Mock::new(vec![
2678
            Action::Read(PeerMessage::BlockRequestByHeight(2.into())),
2679
            Action::Write(PeerMessage::Block(Box::new(block_2_a.try_into().unwrap()))),
2680
            Action::Read(PeerMessage::BlockRequestByHeight(3.into())),
2681
            Action::Write(PeerMessage::Block(Box::new(
2682
                block_3_a.clone().try_into().unwrap(),
2683
            ))),
2684
            Action::Read(PeerMessage::Bye),
2685
        ]);
2686

2687
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
2688
            to_main_tx.clone(),
2689
            state_lock.clone(),
2690
            peer_address,
2691
            hsd,
2692
            false,
2693
            1,
2694
            block_3_a.header().timestamp,
2695
        );
2696

2697
        // This will return error if seen read/write order does not match that of the
2698
        // mocked object.
2699
        peer_loop_handler
2700
            .run_wrapper(mock, from_main_rx_clone)
2701
            .await?;
2702

2703
        Ok(())
2704
    }
2705

2706
    #[traced_test]
2707
    #[apply(shared_tokio_runtime)]
2708
    async fn receival_of_block_notification_height_1() {
2709
        // Scenario: client only knows genesis block. Then receives block
2710
        // notification of height 1. Must request block 1.
2711
        let network = Network::Main;
2712
        let mut rng = StdRng::seed_from_u64(5552401);
2713
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, to_main_rx1, state_lock, hsd) =
2714
            get_test_genesis_setup(network, 0, cli_args::Args::default())
2715
                .await
2716
                .unwrap();
2717
        let block_1 = fake_valid_block_for_tests(&state_lock, rng.random()).await;
2718
        let notification_height1 = (&block_1).into();
2719
        let mock = Mock::new(vec![
2720
            Action::Read(PeerMessage::BlockNotification(notification_height1)),
2721
            Action::Write(PeerMessage::BlockRequestByHeight(1u64.into())),
2722
            Action::Read(PeerMessage::Bye),
2723
        ]);
2724

2725
        let peer_address = get_dummy_socket_address(0);
2726
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
2727
            to_main_tx.clone(),
2728
            state_lock.clone(),
2729
            peer_address,
2730
            hsd,
2731
            false,
2732
            1,
2733
            block_1.header().timestamp,
2734
        );
2735
        peer_loop_handler
2736
            .run_wrapper(mock, from_main_rx_clone)
2737
            .await
2738
            .unwrap();
2739

2740
        drop(to_main_rx1);
2741
    }
2742

2743
    #[traced_test]
2744
    #[apply(shared_tokio_runtime)]
2745
    async fn receive_block_request_by_height_block_7() {
2746
        // Scenario: client only knows blocks up to height 7. Then receives block-
2747
        // request-by-height for height 7. Must respond with block 7.
2748
        let network = Network::Main;
2749
        let mut rng = StdRng::seed_from_u64(5552401);
2750
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, to_main_rx1, mut state_lock, hsd) =
2751
            get_test_genesis_setup(network, 0, cli_args::Args::default())
2752
                .await
2753
                .unwrap();
2754
        let genesis_block = Block::genesis(network);
2755
        let blocks: [Block; 7] = fake_valid_sequence_of_blocks_for_tests(
2756
            &genesis_block,
2757
            Timestamp::hours(1),
2758
            rng.random(),
2759
            network,
2760
        )
2761
        .await;
2762
        let block7 = blocks.last().unwrap().to_owned();
2763
        let tip_height: u64 = block7.header().height.into();
2764
        assert_eq!(7, tip_height);
2765

2766
        for block in &blocks {
2767
            state_lock.set_new_tip(block.to_owned()).await.unwrap();
2768
        }
2769

2770
        let block7_response = PeerMessage::Block(Box::new(block7.try_into().unwrap()));
2771
        let mock = Mock::new(vec![
2772
            Action::Read(PeerMessage::BlockRequestByHeight(7u64.into())),
2773
            Action::Write(block7_response),
2774
            Action::Read(PeerMessage::Bye),
2775
        ]);
2776

2777
        let peer_address = get_dummy_socket_address(0);
2778
        let mut peer_loop_handler = PeerLoopHandler::new(
2779
            to_main_tx.clone(),
2780
            state_lock.clone(),
2781
            peer_address,
2782
            hsd,
2783
            false,
2784
            1,
2785
        );
2786
        peer_loop_handler
2787
            .run_wrapper(mock, from_main_rx_clone)
2788
            .await
2789
            .unwrap();
2790

2791
        drop(to_main_rx1);
2792
    }
2793

2794
    #[traced_test]
2795
    #[apply(shared_tokio_runtime)]
2796
    async fn test_peer_loop_receival_of_first_block() -> Result<()> {
2797
        // Scenario: client only knows genesis block. Then receives block 1.
2798

2799
        let network = Network::Main;
2800
        let mut rng = StdRng::seed_from_u64(5550001);
2801
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, mut to_main_rx1, state_lock, hsd) =
2802
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2803
        let peer_address = get_dummy_socket_address(0);
2804

2805
        let block_1 = fake_valid_block_for_tests(&state_lock, rng.random()).await;
2806
        let mock = Mock::new(vec![
2807
            Action::Read(PeerMessage::Block(Box::new(
2808
                block_1.clone().try_into().unwrap(),
2809
            ))),
2810
            Action::Read(PeerMessage::Bye),
2811
        ]);
2812

2813
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
2814
            to_main_tx.clone(),
2815
            state_lock.clone(),
2816
            peer_address,
2817
            hsd,
2818
            false,
2819
            1,
2820
            block_1.header().timestamp,
2821
        );
2822
        peer_loop_handler
2823
            .run_wrapper(mock, from_main_rx_clone)
2824
            .await?;
2825

2826
        // Verify that a block was sent to `main_loop`
2827
        match to_main_rx1.recv().await {
2828
            Some(PeerTaskToMain::NewBlocks(_block)) => (),
2829
            _ => bail!("Did not find msg sent to main task"),
2830
        };
2831

2832
        match to_main_rx1.recv().await {
2833
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
2834
            _ => bail!("Must receive remove of peer block max height"),
2835
        }
2836

2837
        if !state_lock.lock_guard().await.net.peer_map.is_empty() {
2838
            bail!("peer map must be empty after closing connection gracefully");
2839
        }
2840

2841
        Ok(())
2842
    }
2843

2844
    #[traced_test]
2845
    #[apply(shared_tokio_runtime)]
2846
    async fn test_peer_loop_receival_of_second_block_no_blocks_in_db() -> Result<()> {
2847
        // In this scenario, the client only knows the genesis block (block 0) and then
2848
        // receives block 2, meaning that block 1 will have to be requested.
2849

2850
        let network = Network::Main;
2851
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, mut to_main_rx1, state_lock, hsd) =
2852
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2853
        let peer_address = get_dummy_socket_address(0);
2854
        let genesis_block: Block = state_lock
2855
            .lock_guard()
2856
            .await
2857
            .chain
2858
            .archival_state()
2859
            .get_tip()
2860
            .await;
2861
        let [block_1, block_2] = fake_valid_sequence_of_blocks_for_tests(
2862
            &genesis_block,
2863
            Timestamp::hours(1),
2864
            StdRng::seed_from_u64(5550001).random(),
2865
            network,
2866
        )
2867
        .await;
2868

2869
        let mock = Mock::new(vec![
2870
            Action::Read(PeerMessage::Block(Box::new(
2871
                block_2.clone().try_into().unwrap(),
2872
            ))),
2873
            Action::Write(PeerMessage::BlockRequestByHash(block_1.hash())),
2874
            Action::Read(PeerMessage::Block(Box::new(
2875
                block_1.clone().try_into().unwrap(),
2876
            ))),
2877
            Action::Read(PeerMessage::Bye),
2878
        ]);
2879

2880
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
2881
            to_main_tx.clone(),
2882
            state_lock.clone(),
2883
            peer_address,
2884
            hsd,
2885
            true,
2886
            1,
2887
            block_2.header().timestamp,
2888
        );
2889
        peer_loop_handler
2890
            .run_wrapper(mock, from_main_rx_clone)
2891
            .await?;
2892

2893
        match to_main_rx1.recv().await {
2894
            Some(PeerTaskToMain::NewBlocks(blocks)) => {
2895
                if blocks[0].hash() != block_1.hash() {
2896
                    bail!("1st received block by main loop must be block 1");
2897
                }
2898
                if blocks[1].hash() != block_2.hash() {
2899
                    bail!("2nd received block by main loop must be block 2");
2900
                }
2901
            }
2902
            _ => bail!("Did not find msg sent to main task 1"),
2903
        };
2904
        match to_main_rx1.recv().await {
2905
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
2906
            _ => bail!("Must receive remove of peer block max height"),
2907
        }
2908

2909
        if !state_lock.lock_guard().await.net.peer_map.is_empty() {
2910
            bail!("peer map must be empty after closing connection gracefully");
2911
        }
2912

2913
        Ok(())
2914
    }
2915

2916
    #[traced_test]
2917
    #[apply(shared_tokio_runtime)]
2918
    async fn prevent_ram_exhaustion_test() -> Result<()> {
2919
        // In this scenario the peer sends more blocks than the client allows to store in the
2920
        // fork-reconciliation field. This should result in abandonment of the fork-reconciliation
2921
        // process as the alternative is that the program will crash because it runs out of RAM.
2922

2923
        let network = Network::Main;
2924
        let mut rng = StdRng::seed_from_u64(5550001);
2925
        let (
2926
            _peer_broadcast_tx,
2927
            from_main_rx_clone,
2928
            to_main_tx,
2929
            mut to_main_rx1,
2930
            mut state_lock,
2931
            _hsd,
2932
        ) = get_test_genesis_setup(network, 1, cli_args::Args::default()).await?;
2933
        let genesis_block = Block::genesis(network);
2934

2935
        // Restrict max number of blocks held in memory to 2.
2936
        let mut cli = state_lock.cli().clone();
2937
        cli.sync_mode_threshold = 2;
2938
        state_lock.set_cli(cli).await;
2939

2940
        let (hsd1, peer_address1) = get_dummy_peer_connection_data_genesis(Network::Alpha, 1);
2941
        let [block_1, _block_2, block_3, block_4] = fake_valid_sequence_of_blocks_for_tests(
2942
            &genesis_block,
2943
            Timestamp::hours(1),
2944
            rng.random(),
2945
            network,
2946
        )
2947
        .await;
2948
        state_lock.set_new_tip(block_1.clone()).await?;
2949

2950
        let mock = Mock::new(vec![
2951
            Action::Read(PeerMessage::Block(Box::new(
2952
                block_4.clone().try_into().unwrap(),
2953
            ))),
2954
            Action::Write(PeerMessage::BlockRequestByHash(block_3.hash())),
2955
            Action::Read(PeerMessage::Block(Box::new(
2956
                block_3.clone().try_into().unwrap(),
2957
            ))),
2958
            Action::Read(PeerMessage::Bye),
2959
        ]);
2960

2961
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
2962
            to_main_tx.clone(),
2963
            state_lock.clone(),
2964
            peer_address1,
2965
            hsd1,
2966
            true,
2967
            1,
2968
            block_4.header().timestamp,
2969
        );
2970
        peer_loop_handler
2971
            .run_wrapper(mock, from_main_rx_clone)
2972
            .await?;
2973

2974
        match to_main_rx1.recv().await {
2975
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
2976
            _ => bail!("Must receive remove of peer block max height"),
2977
        }
2978

2979
        // Verify that no block is sent to main loop.
2980
        match to_main_rx1.try_recv() {
2981
            Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (),
2982
            _ => bail!("Peer must not handle more fork-reconciliation blocks than specified in CLI arguments"),
2983
        };
2984
        drop(to_main_tx);
2985

2986
        // Verify that peer is sanctioned for failed fork reconciliation attempt
2987
        assert!(state_lock
2988
            .lock_guard()
2989
            .await
2990
            .net
2991
            .get_peer_standing_from_database(peer_address1.ip())
2992
            .await
2993
            .unwrap()
2994
            .standing
2995
            .is_negative());
2996

2997
        Ok(())
2998
    }
2999

3000
    #[traced_test]
3001
    #[apply(shared_tokio_runtime)]
3002
    async fn test_peer_loop_receival_of_fourth_block_one_block_in_db() {
3003
        // In this scenario, the client know the genesis block (block 0) and block 1, it
3004
        // then receives block 4, meaning that block 3 and 2 will have to be requested.
3005

3006
        let network = Network::Main;
3007
        let (
3008
            _peer_broadcast_tx,
3009
            from_main_rx_clone,
3010
            to_main_tx,
3011
            mut to_main_rx1,
3012
            mut state_lock,
3013
            hsd,
3014
        ) = get_test_genesis_setup(network, 0, cli_args::Args::default())
3015
            .await
3016
            .unwrap();
3017
        let peer_address: SocketAddr = get_dummy_socket_address(0);
3018
        let genesis_block = Block::genesis(network);
3019
        let [block_1, block_2, block_3, block_4] = fake_valid_sequence_of_blocks_for_tests(
3020
            &genesis_block,
3021
            Timestamp::hours(1),
3022
            StdRng::seed_from_u64(5550001).random(),
3023
            network,
3024
        )
3025
        .await;
3026
        state_lock.set_new_tip(block_1.clone()).await.unwrap();
3027

3028
        let mock = Mock::new(vec![
3029
            Action::Read(PeerMessage::Block(Box::new(
3030
                block_4.clone().try_into().unwrap(),
3031
            ))),
3032
            Action::Write(PeerMessage::BlockRequestByHash(block_3.hash())),
3033
            Action::Read(PeerMessage::Block(Box::new(
3034
                block_3.clone().try_into().unwrap(),
3035
            ))),
3036
            Action::Write(PeerMessage::BlockRequestByHash(block_2.hash())),
3037
            Action::Read(PeerMessage::Block(Box::new(
3038
                block_2.clone().try_into().unwrap(),
3039
            ))),
3040
            Action::Read(PeerMessage::Bye),
3041
        ]);
3042

3043
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
3044
            to_main_tx.clone(),
3045
            state_lock.clone(),
3046
            peer_address,
3047
            hsd,
3048
            true,
3049
            1,
3050
            block_4.header().timestamp,
3051
        );
3052
        peer_loop_handler
3053
            .run_wrapper(mock, from_main_rx_clone)
3054
            .await
3055
            .unwrap();
3056

3057
        let Some(PeerTaskToMain::NewBlocks(blocks)) = to_main_rx1.recv().await else {
3058
            panic!("Did not find msg sent to main task");
3059
        };
3060
        assert_eq!(blocks[0].hash(), block_2.hash());
3061
        assert_eq!(blocks[1].hash(), block_3.hash());
3062
        assert_eq!(blocks[2].hash(), block_4.hash());
3063

3064
        let Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) = to_main_rx1.recv().await else {
3065
            panic!("Must receive remove of peer block max height");
3066
        };
3067

3068
        assert!(
3069
            state_lock.lock_guard().await.net.peer_map.is_empty(),
3070
            "peer map must be empty after closing connection gracefully"
3071
        );
3072
    }
3073

3074
    #[traced_test]
3075
    #[apply(shared_tokio_runtime)]
3076
    async fn test_peer_loop_receival_of_third_block_no_blocks_in_db() -> Result<()> {
3077
        // In this scenario, the client only knows the genesis block (block 0) and then
3078
        // receives block 3, meaning that block 2 and 1 will have to be requested.
3079

3080
        let network = Network::Main;
3081
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, mut to_main_rx1, state_lock, hsd) =
3082
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
3083
        let peer_address = get_dummy_socket_address(0);
3084
        let genesis_block = Block::genesis(network);
3085

3086
        let [block_1, block_2, block_3] = fake_valid_sequence_of_blocks_for_tests(
3087
            &genesis_block,
3088
            Timestamp::hours(1),
3089
            StdRng::seed_from_u64(5550001).random(),
3090
            network,
3091
        )
3092
        .await;
3093

3094
        let mock = Mock::new(vec![
3095
            Action::Read(PeerMessage::Block(Box::new(
3096
                block_3.clone().try_into().unwrap(),
3097
            ))),
3098
            Action::Write(PeerMessage::BlockRequestByHash(block_2.hash())),
3099
            Action::Read(PeerMessage::Block(Box::new(
3100
                block_2.clone().try_into().unwrap(),
3101
            ))),
3102
            Action::Write(PeerMessage::BlockRequestByHash(block_1.hash())),
3103
            Action::Read(PeerMessage::Block(Box::new(
3104
                block_1.clone().try_into().unwrap(),
3105
            ))),
3106
            Action::Read(PeerMessage::Bye),
3107
        ]);
3108

3109
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
3110
            to_main_tx.clone(),
3111
            state_lock.clone(),
3112
            peer_address,
3113
            hsd,
3114
            true,
3115
            1,
3116
            block_3.header().timestamp,
3117
        );
3118
        peer_loop_handler
3119
            .run_wrapper(mock, from_main_rx_clone)
3120
            .await?;
3121

3122
        match to_main_rx1.recv().await {
3123
            Some(PeerTaskToMain::NewBlocks(blocks)) => {
3124
                if blocks[0].hash() != block_1.hash() {
3125
                    bail!("1st received block by main loop must be block 1");
3126
                }
3127
                if blocks[1].hash() != block_2.hash() {
3128
                    bail!("2nd received block by main loop must be block 2");
3129
                }
3130
                if blocks[2].hash() != block_3.hash() {
3131
                    bail!("3rd received block by main loop must be block 3");
3132
                }
3133
            }
3134
            _ => bail!("Did not find msg sent to main task"),
3135
        };
3136
        match to_main_rx1.recv().await {
3137
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
3138
            _ => bail!("Must receive remove of peer block max height"),
3139
        }
3140

3141
        if !state_lock.lock_guard().await.net.peer_map.is_empty() {
3142
            bail!("peer map must be empty after closing connection gracefully");
3143
        }
3144

3145
        Ok(())
3146
    }
3147

3148
    #[traced_test]
3149
    #[apply(shared_tokio_runtime)]
3150
    async fn test_block_reconciliation_interrupted_by_block_notification() -> Result<()> {
3151
        // In this scenario, the client know the genesis block (block 0) and block 1, it
3152
        // then receives block 4, meaning that block 3, 2, and 1 will have to be requested.
3153
        // But the requests are interrupted by the peer sending another message: a new block
3154
        // notification.
3155

3156
        let network = Network::Main;
3157
        let (
3158
            _peer_broadcast_tx,
3159
            from_main_rx_clone,
3160
            to_main_tx,
3161
            mut to_main_rx1,
3162
            mut state_lock,
3163
            hsd,
3164
        ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
3165
        let peer_socket_address: SocketAddr = get_dummy_socket_address(0);
3166
        let genesis_block: Block = state_lock
3167
            .lock_guard()
3168
            .await
3169
            .chain
3170
            .archival_state()
3171
            .get_tip()
3172
            .await;
3173

3174
        let [block_1, block_2, block_3, block_4, block_5] =
3175
            fake_valid_sequence_of_blocks_for_tests(
3176
                &genesis_block,
3177
                Timestamp::hours(1),
3178
                StdRng::seed_from_u64(5550001).random(),
3179
                network,
3180
            )
3181
            .await;
3182
        state_lock.set_new_tip(block_1.clone()).await?;
3183

3184
        let mock = Mock::new(vec![
3185
            Action::Read(PeerMessage::Block(Box::new(
3186
                block_4.clone().try_into().unwrap(),
3187
            ))),
3188
            Action::Write(PeerMessage::BlockRequestByHash(block_3.hash())),
3189
            Action::Read(PeerMessage::Block(Box::new(
3190
                block_3.clone().try_into().unwrap(),
3191
            ))),
3192
            Action::Write(PeerMessage::BlockRequestByHash(block_2.hash())),
3193
            //
3194
            // Now make the interruption of the block reconciliation process
3195
            Action::Read(PeerMessage::BlockNotification((&block_5).into())),
3196
            //
3197
            // Complete the block reconciliation process by requesting the last block
3198
            // in this process, to get back to a mutually known block.
3199
            Action::Read(PeerMessage::Block(Box::new(
3200
                block_2.clone().try_into().unwrap(),
3201
            ))),
3202
            //
3203
            // Then anticipate the request of the block that was announced
3204
            // in the interruption.
3205
            // Note that we cannot anticipate the response, as only the main
3206
            // task writes to the database. And the database needs to be updated
3207
            // for the handling of block 5 to be done correctly.
3208
            Action::Write(PeerMessage::BlockRequestByHeight(
3209
                block_5.kernel.header.height,
3210
            )),
3211
            Action::Read(PeerMessage::Bye),
3212
        ]);
3213

3214
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
3215
            to_main_tx.clone(),
3216
            state_lock.clone(),
3217
            peer_socket_address,
3218
            hsd,
3219
            false,
3220
            1,
3221
            block_5.header().timestamp,
3222
        );
3223
        peer_loop_handler
3224
            .run_wrapper(mock, from_main_rx_clone)
3225
            .await?;
3226

3227
        match to_main_rx1.recv().await {
3228
            Some(PeerTaskToMain::NewBlocks(blocks)) => {
3229
                if blocks[0].hash() != block_2.hash() {
3230
                    bail!("1st received block by main loop must be block 1");
3231
                }
3232
                if blocks[1].hash() != block_3.hash() {
3233
                    bail!("2nd received block by main loop must be block 2");
3234
                }
3235
                if blocks[2].hash() != block_4.hash() {
3236
                    bail!("3rd received block by main loop must be block 3");
3237
                }
3238
            }
3239
            _ => bail!("Did not find msg sent to main task"),
3240
        };
3241
        match to_main_rx1.recv().await {
3242
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
3243
            _ => bail!("Must receive remove of peer block max height"),
3244
        }
3245

3246
        if !state_lock.lock_guard().await.net.peer_map.is_empty() {
3247
            bail!("peer map must be empty after closing connection gracefully");
3248
        }
3249

3250
        Ok(())
3251
    }
3252

3253
    #[traced_test]
3254
    #[apply(shared_tokio_runtime)]
3255
    async fn test_block_reconciliation_interrupted_by_peer_list_request() -> Result<()> {
3256
        // In this scenario, the client knows the genesis block (block 0) and block 1, it
3257
        // then receives block 4, meaning that block 3, 2, and 1 will have to be requested.
3258
        // But the requests are interrupted by the peer sending another message: a request
3259
        // for a list of peers.
3260

3261
        let network = Network::Main;
3262
        let (
3263
            _peer_broadcast_tx,
3264
            from_main_rx_clone,
3265
            to_main_tx,
3266
            mut to_main_rx1,
3267
            mut state_lock,
3268
            _hsd,
3269
        ) = get_test_genesis_setup(network, 1, cli_args::Args::default()).await?;
3270
        let genesis_block = Block::genesis(network);
3271
        let peer_infos: Vec<PeerInfo> = state_lock
3272
            .lock_guard()
3273
            .await
3274
            .net
3275
            .peer_map
3276
            .clone()
3277
            .into_values()
3278
            .collect::<Vec<_>>();
3279

3280
        let [block_1, block_2, block_3, block_4] = fake_valid_sequence_of_blocks_for_tests(
3281
            &genesis_block,
3282
            Timestamp::hours(1),
3283
            StdRng::seed_from_u64(5550001).random(),
3284
            network,
3285
        )
3286
        .await;
3287
        state_lock.set_new_tip(block_1.clone()).await?;
3288

3289
        let (hsd_1, sa_1) = get_dummy_peer_connection_data_genesis(network, 1);
3290
        let expected_peer_list_resp = vec![
3291
            (
3292
                peer_infos[0].listen_address().unwrap(),
3293
                peer_infos[0].instance_id(),
3294
            ),
3295
            (sa_1, hsd_1.instance_id),
3296
        ];
3297
        let mock = Mock::new(vec![
3298
            Action::Read(PeerMessage::Block(Box::new(
3299
                block_4.clone().try_into().unwrap(),
3300
            ))),
3301
            Action::Write(PeerMessage::BlockRequestByHash(block_3.hash())),
3302
            Action::Read(PeerMessage::Block(Box::new(
3303
                block_3.clone().try_into().unwrap(),
3304
            ))),
3305
            Action::Write(PeerMessage::BlockRequestByHash(block_2.hash())),
3306
            //
3307
            // Now make the interruption of the block reconciliation process
3308
            Action::Read(PeerMessage::PeerListRequest),
3309
            //
3310
            // Answer the request for a peer list
3311
            Action::Write(PeerMessage::PeerListResponse(expected_peer_list_resp)),
3312
            //
3313
            // Complete the block reconciliation process by requesting the last block
3314
            // in this process, to get back to a mutually known block.
3315
            Action::Read(PeerMessage::Block(Box::new(
3316
                block_2.clone().try_into().unwrap(),
3317
            ))),
3318
            Action::Read(PeerMessage::Bye),
3319
        ]);
3320

3321
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
3322
            to_main_tx,
3323
            state_lock.clone(),
3324
            sa_1,
3325
            hsd_1,
3326
            true,
3327
            1,
3328
            block_4.header().timestamp,
3329
        );
3330
        peer_loop_handler
3331
            .run_wrapper(mock, from_main_rx_clone)
3332
            .await?;
3333

3334
        // Verify that blocks are sent to `main_loop` in expected ordering
3335
        match to_main_rx1.recv().await {
3336
            Some(PeerTaskToMain::NewBlocks(blocks)) => {
3337
                if blocks[0].hash() != block_2.hash() {
3338
                    bail!("1st received block by main loop must be block 1");
3339
                }
3340
                if blocks[1].hash() != block_3.hash() {
3341
                    bail!("2nd received block by main loop must be block 2");
3342
                }
3343
                if blocks[2].hash() != block_4.hash() {
3344
                    bail!("3rd received block by main loop must be block 3");
3345
                }
3346
            }
3347
            _ => bail!("Did not find msg sent to main task"),
3348
        };
3349

3350
        assert_eq!(
3351
            1,
3352
            state_lock.lock_guard().await.net.peer_map.len(),
3353
            "One peer must remain in peer list after peer_1 closed gracefully"
3354
        );
3355

3356
        Ok(())
3357
    }
3358

3359
    #[traced_test]
3360
    #[apply(shared_tokio_runtime)]
3361
    async fn receive_transaction_request() {
3362
        let network = Network::Main;
3363
        let dummy_tx = invalid_empty_single_proof_transaction();
3364
        let txid = dummy_tx.kernel.txid();
3365

3366
        for transaction_is_known in [false, true] {
3367
            let (_peer_broadcast_tx, from_main_rx, to_main_tx, _, mut state_lock, _hsd) =
3368
                get_test_genesis_setup(network, 1, cli_args::Args::default())
3369
                    .await
3370
                    .unwrap();
3371
            if transaction_is_known {
3372
                state_lock
3373
                    .lock_guard_mut()
3374
                    .await
3375
                    .mempool_insert(dummy_tx.clone(), TransactionOrigin::Own)
3376
                    .await;
3377
            }
3378

3379
            let mock = if transaction_is_known {
3380
                Mock::new(vec![
3381
                    Action::Read(PeerMessage::TransactionRequest(txid)),
3382
                    Action::Write(PeerMessage::Transaction(Box::new(
3383
                        (&dummy_tx).try_into().unwrap(),
3384
                    ))),
3385
                    Action::Read(PeerMessage::Bye),
3386
                ])
3387
            } else {
3388
                Mock::new(vec![
3389
                    Action::Read(PeerMessage::TransactionRequest(txid)),
3390
                    Action::Read(PeerMessage::Bye),
3391
                ])
3392
            };
3393

3394
            let hsd = get_dummy_handshake_data_for_genesis(network);
3395
            let mut peer_state = MutablePeerState::new(hsd.tip_header.height);
3396
            let mut peer_loop_handler = PeerLoopHandler::new(
3397
                to_main_tx,
3398
                state_lock,
3399
                get_dummy_socket_address(0),
3400
                hsd,
3401
                true,
3402
                1,
3403
            );
3404

3405
            peer_loop_handler
3406
                .run(mock, from_main_rx, &mut peer_state)
3407
                .await
3408
                .unwrap();
3409
        }
3410
    }
3411

3412
    #[traced_test]
3413
    #[apply(shared_tokio_runtime)]
3414
    async fn empty_mempool_request_tx_test() {
3415
        // In this scenario the client receives a transaction notification from
3416
        // a peer of a transaction it doesn't know; the client must then request it.
3417

3418
        let network = Network::Main;
3419
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, mut to_main_rx1, state_lock, _hsd) =
3420
            get_test_genesis_setup(network, 1, cli_args::Args::default())
3421
                .await
3422
                .unwrap();
3423

3424
        let spending_key = state_lock
3425
            .lock_guard()
3426
            .await
3427
            .wallet_state
3428
            .wallet_entropy
3429
            .nth_symmetric_key_for_tests(0);
3430
        let genesis_block = Block::genesis(network);
3431
        let now = genesis_block.kernel.header.timestamp;
3432
        let config = TxCreationConfig::default()
3433
            .recover_change_off_chain(spending_key.into())
3434
            .with_prover_capability(TxProvingCapability::ProofCollection);
3435
        let transaction_1: Transaction = state_lock
3436
            .api()
3437
            .tx_initiator_internal()
3438
            .create_transaction(
3439
                Default::default(),
3440
                NativeCurrencyAmount::coins(0),
3441
                now,
3442
                config,
3443
            )
3444
            .await
3445
            .unwrap()
3446
            .transaction
3447
            .into();
3448

3449
        // Build the resulting transaction notification
3450
        let tx_notification: TransactionNotification = (&transaction_1).try_into().unwrap();
3451
        let mock = Mock::new(vec![
3452
            Action::Read(PeerMessage::TransactionNotification(tx_notification)),
3453
            Action::Write(PeerMessage::TransactionRequest(tx_notification.txid)),
3454
            Action::Read(PeerMessage::Transaction(Box::new(
3455
                (&transaction_1).try_into().unwrap(),
3456
            ))),
3457
            Action::Read(PeerMessage::Bye),
3458
        ]);
3459

3460
        let (hsd_1, _sa_1) = get_dummy_peer_connection_data_genesis(network, 1);
3461

3462
        // Mock a timestamp to allow transaction to be considered valid
3463
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
3464
            to_main_tx,
3465
            state_lock.clone(),
3466
            get_dummy_socket_address(0),
3467
            hsd_1.clone(),
3468
            true,
3469
            1,
3470
            now,
3471
        );
3472

3473
        let mut peer_state = MutablePeerState::new(hsd_1.tip_header.height);
3474

3475
        assert!(
3476
            state_lock.lock_guard().await.mempool.is_empty(),
3477
            "Mempool must be empty at init"
3478
        );
3479
        peer_loop_handler
3480
            .run(mock, from_main_rx_clone, &mut peer_state)
3481
            .await
3482
            .unwrap();
3483

3484
        // Transaction must be sent to `main_loop`. The transaction is stored to the mempool
3485
        // by the `main_loop`.
3486
        match to_main_rx1.recv().await {
3487
            Some(PeerTaskToMain::Transaction(_)) => (),
3488
            _ => panic!("Must receive remove of peer block max height"),
3489
        };
3490
    }
3491

3492
    #[traced_test]
3493
    #[apply(shared_tokio_runtime)]
3494
    async fn populated_mempool_request_tx_test() -> Result<()> {
3495
        // In this scenario the peer is informed of a transaction that it already knows
3496

3497
        let network = Network::Main;
3498
        let (
3499
            _peer_broadcast_tx,
3500
            from_main_rx_clone,
3501
            to_main_tx,
3502
            mut to_main_rx1,
3503
            mut state_lock,
3504
            _hsd,
3505
        ) = get_test_genesis_setup(network, 1, cli_args::Args::default())
3506
            .await
3507
            .unwrap();
3508
        let spending_key = state_lock
3509
            .lock_guard()
3510
            .await
3511
            .wallet_state
3512
            .wallet_entropy
3513
            .nth_symmetric_key_for_tests(0);
3514

3515
        let genesis_block = Block::genesis(network);
3516
        let now = genesis_block.kernel.header.timestamp;
3517
        let config = TxCreationConfig::default()
3518
            .recover_change_off_chain(spending_key.into())
3519
            .with_prover_capability(TxProvingCapability::ProofCollection);
3520
        let transaction_1: Transaction = state_lock
3521
            .api()
3522
            .tx_initiator_internal()
3523
            .create_transaction(
3524
                Default::default(),
3525
                NativeCurrencyAmount::coins(0),
3526
                now,
3527
                config,
3528
            )
3529
            .await
3530
            .unwrap()
3531
            .transaction
3532
            .into();
3533

3534
        let (hsd_1, _sa_1) = get_dummy_peer_connection_data_genesis(network, 1);
3535
        let mut peer_loop_handler = PeerLoopHandler::new(
3536
            to_main_tx,
3537
            state_lock.clone(),
3538
            get_dummy_socket_address(0),
3539
            hsd_1.clone(),
3540
            true,
3541
            1,
3542
        );
3543
        let mut peer_state = MutablePeerState::new(hsd_1.tip_header.height);
3544

3545
        assert!(
3546
            state_lock.lock_guard().await.mempool.is_empty(),
3547
            "Mempool must be empty at init"
3548
        );
3549
        state_lock
3550
            .lock_guard_mut()
3551
            .await
3552
            .mempool_insert(transaction_1.clone(), TransactionOrigin::Foreign)
3553
            .await;
3554
        assert!(
3555
            !state_lock.lock_guard().await.mempool.is_empty(),
3556
            "Mempool must be non-empty after insertion"
3557
        );
3558

3559
        // Run the peer loop and verify expected exchange -- namely that the
3560
        // tx notification is received and the the transaction is *not*
3561
        // requested.
3562
        let tx_notification: TransactionNotification = (&transaction_1).try_into().unwrap();
3563
        let mock = Mock::new(vec![
3564
            Action::Read(PeerMessage::TransactionNotification(tx_notification)),
3565
            Action::Read(PeerMessage::Bye),
3566
        ]);
3567
        peer_loop_handler
3568
            .run(mock, from_main_rx_clone, &mut peer_state)
3569
            .await
3570
            .unwrap();
3571

3572
        // nothing is allowed to be sent to `main_loop`
3573
        match to_main_rx1.try_recv() {
3574
            Err(TryRecvError::Empty) => (),
3575
            Err(TryRecvError::Disconnected) => panic!("to_main channel must still be open"),
3576
            Ok(_) => panic!("to_main channel must be empty"),
3577
        };
3578
        Ok(())
3579
    }
3580

3581
    mod block_proposals {
3582
        use super::*;
3583
        use crate::tests::shared::get_dummy_handshake_data_for_genesis;
3584

3585
        struct TestSetup {
3586
            peer_loop_handler: PeerLoopHandler,
3587
            to_main_rx: mpsc::Receiver<PeerTaskToMain>,
3588
            from_main_rx: broadcast::Receiver<MainToPeerTask>,
3589
            peer_state: MutablePeerState,
3590
            to_main_tx: mpsc::Sender<PeerTaskToMain>,
3591
            genesis_block: Block,
3592
            peer_broadcast_tx: broadcast::Sender<MainToPeerTask>,
3593
        }
3594

3595
        async fn genesis_setup(network: Network) -> TestSetup {
3596
            let (peer_broadcast_tx, from_main_rx, to_main_tx, to_main_rx, alice, _hsd) =
3597
                get_test_genesis_setup(network, 0, cli_args::Args::default())
3598
                    .await
3599
                    .unwrap();
3600
            let peer_hsd = get_dummy_handshake_data_for_genesis(network);
3601
            let peer_loop_handler = PeerLoopHandler::new(
3602
                to_main_tx.clone(),
3603
                alice.clone(),
3604
                get_dummy_socket_address(0),
3605
                peer_hsd.clone(),
3606
                true,
3607
                1,
3608
            );
3609
            let peer_state = MutablePeerState::new(peer_hsd.tip_header.height);
3610

3611
            // (peer_loop_handler, to_main_rx1)
3612
            TestSetup {
3613
                peer_broadcast_tx,
3614
                peer_loop_handler,
3615
                to_main_rx,
3616
                from_main_rx,
3617
                peer_state,
3618
                to_main_tx,
3619
                genesis_block: Block::genesis(network),
3620
            }
3621
        }
3622

3623
        #[traced_test]
3624
        #[apply(shared_tokio_runtime)]
3625
        async fn accept_block_proposal_height_one() {
3626
            // Node knows genesis block, receives a block proposal for block 1
3627
            // and must accept this. Verify that main loop is informed of block
3628
            // proposal.
3629
            let TestSetup {
3630
                peer_broadcast_tx,
3631
                mut peer_loop_handler,
3632
                mut to_main_rx,
3633
                from_main_rx,
3634
                mut peer_state,
3635
                to_main_tx,
3636
                genesis_block,
3637
            } = genesis_setup(Network::Main).await;
3638
            let block1 = fake_valid_block_for_tests(
3639
                &peer_loop_handler.global_state_lock,
3640
                StdRng::seed_from_u64(5550001).random(),
3641
            )
3642
            .await;
3643

3644
            let mock = Mock::new(vec![
3645
                Action::Read(PeerMessage::BlockProposal(Box::new(block1))),
3646
                Action::Read(PeerMessage::Bye),
3647
            ]);
3648
            peer_loop_handler
3649
                .run(mock, from_main_rx, &mut peer_state)
3650
                .await
3651
                .unwrap();
3652

3653
            match to_main_rx.try_recv().unwrap() {
3654
                PeerTaskToMain::BlockProposal(block) => {
3655
                    assert_eq!(genesis_block.hash(), block.header().prev_block_digest);
3656
                }
3657
                _ => panic!("Expected main loop to be informed of block proposal"),
3658
            };
3659

3660
            drop(to_main_tx);
3661
            drop(peer_broadcast_tx);
3662
        }
3663

3664
        #[traced_test]
3665
        #[apply(shared_tokio_runtime)]
3666
        async fn accept_block_proposal_notification_height_one() {
3667
            // Node knows genesis block, receives a block proposal notification
3668
            // for block 1 and must accept this by requesting the block
3669
            // proposal from peer.
3670
            let TestSetup {
3671
                peer_broadcast_tx,
3672
                mut peer_loop_handler,
3673
                to_main_rx: _,
3674
                from_main_rx,
3675
                mut peer_state,
3676
                to_main_tx,
3677
                ..
3678
            } = genesis_setup(Network::Main).await;
3679
            let block1 = fake_valid_block_for_tests(
3680
                &peer_loop_handler.global_state_lock,
3681
                StdRng::seed_from_u64(5550001).random(),
3682
            )
3683
            .await;
3684

3685
            let mock = Mock::new(vec![
3686
                Action::Read(PeerMessage::BlockProposalNotification((&block1).into())),
3687
                Action::Write(PeerMessage::BlockProposalRequest(
3688
                    BlockProposalRequest::new(block1.body().mast_hash()),
3689
                )),
3690
                Action::Read(PeerMessage::Bye),
3691
            ]);
3692
            peer_loop_handler
3693
                .run(mock, from_main_rx, &mut peer_state)
3694
                .await
3695
                .unwrap();
3696

3697
            drop(to_main_tx);
3698
            drop(peer_broadcast_tx);
3699
        }
3700
    }
3701

3702
    mod proof_qualities {
3703
        use strum::IntoEnumIterator;
3704

3705
        use super::*;
3706
        use crate::config_models::cli_args;
3707
        use crate::models::blockchain::transaction::Transaction;
3708
        use crate::models::peer::transfer_transaction::TransactionProofQuality;
3709
        use crate::models::state::wallet::transaction_output::TxOutput;
3710
        use crate::tests::shared::mock_genesis_global_state;
3711

3712
        async fn tx_of_proof_quality(
3713
            network: Network,
3714
            quality: TransactionProofQuality,
3715
        ) -> Transaction {
3716
            let wallet_secret = WalletEntropy::devnet_wallet();
3717
            let alice_key = wallet_secret.nth_generation_spending_key_for_tests(0);
3718
            let alice_gsl =
3719
                mock_genesis_global_state(network, 1, wallet_secret, cli_args::Args::default())
3720
                    .await;
3721
            let alice = alice_gsl.lock_guard().await;
3722
            let genesis_block = alice.chain.light_state();
3723
            let in_seven_months = genesis_block.header().timestamp + Timestamp::months(7);
3724
            let prover_capability = match quality {
3725
                TransactionProofQuality::ProofCollection => TxProvingCapability::ProofCollection,
3726
                TransactionProofQuality::SingleProof => TxProvingCapability::SingleProof,
3727
            };
3728
            let config = TxCreationConfig::default()
3729
                .recover_change_off_chain(alice_key.into())
3730
                .with_prover_capability(prover_capability);
3731
            alice_gsl
3732
                .api()
3733
                .tx_initiator_internal()
3734
                .create_transaction(
3735
                    Vec::<TxOutput>::new().into(),
3736
                    NativeCurrencyAmount::coins(1),
3737
                    in_seven_months,
3738
                    config,
3739
                )
3740
                .await
3741
                .unwrap()
3742
                .transaction()
3743
                .clone()
3744
        }
3745

3746
        #[traced_test]
3747
        #[apply(shared_tokio_runtime)]
3748
        async fn client_favors_higher_proof_quality() {
3749
            // In this scenario the peer is informed of a transaction that it
3750
            // already knows, and it's tested that it checks the proof quality
3751
            // field and verifies that it exceeds the proof in the mempool
3752
            // before requesting the transasction.
3753
            let network = Network::Main;
3754
            let proof_collection_tx =
3755
                tx_of_proof_quality(network, TransactionProofQuality::ProofCollection).await;
3756
            let single_proof_tx =
3757
                tx_of_proof_quality(network, TransactionProofQuality::SingleProof).await;
3758

3759
            for (own_tx_pq, new_tx_pq) in
3760
                TransactionProofQuality::iter().cartesian_product(TransactionProofQuality::iter())
3761
            {
3762
                use TransactionProofQuality::*;
3763

3764
                let (
3765
                    _peer_broadcast_tx,
3766
                    from_main_rx_clone,
3767
                    to_main_tx,
3768
                    mut to_main_rx1,
3769
                    mut alice,
3770
                    handshake_data,
3771
                ) = get_test_genesis_setup(network, 1, cli_args::Args::default())
3772
                    .await
3773
                    .unwrap();
3774

3775
                let (own_tx, new_tx) = match (own_tx_pq, new_tx_pq) {
3776
                    (ProofCollection, ProofCollection) => {
3777
                        (&proof_collection_tx, &proof_collection_tx)
3778
                    }
3779
                    (ProofCollection, SingleProof) => (&proof_collection_tx, &single_proof_tx),
3780
                    (SingleProof, ProofCollection) => (&single_proof_tx, &proof_collection_tx),
3781
                    (SingleProof, SingleProof) => (&single_proof_tx, &single_proof_tx),
3782
                };
3783

3784
                alice
3785
                    .lock_guard_mut()
3786
                    .await
3787
                    .mempool_insert((*own_tx).to_owned(), TransactionOrigin::Foreign)
3788
                    .await;
3789

3790
                let tx_notification: TransactionNotification = new_tx.try_into().unwrap();
3791

3792
                let own_proof_is_supreme = own_tx_pq >= new_tx_pq;
3793
                let mock = if own_proof_is_supreme {
3794
                    Mock::new(vec![
3795
                        Action::Read(PeerMessage::TransactionNotification(tx_notification)),
3796
                        Action::Read(PeerMessage::Bye),
3797
                    ])
3798
                } else {
3799
                    Mock::new(vec![
3800
                        Action::Read(PeerMessage::TransactionNotification(tx_notification)),
3801
                        Action::Write(PeerMessage::TransactionRequest(tx_notification.txid)),
3802
                        Action::Read(PeerMessage::Transaction(Box::new(
3803
                            new_tx.try_into().unwrap(),
3804
                        ))),
3805
                        Action::Read(PeerMessage::Bye),
3806
                    ])
3807
                };
3808

3809
                let now = proof_collection_tx.kernel.timestamp;
3810
                let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
3811
                    to_main_tx,
3812
                    alice.clone(),
3813
                    get_dummy_socket_address(0),
3814
                    handshake_data.clone(),
3815
                    true,
3816
                    1,
3817
                    now,
3818
                );
3819
                let mut peer_state = MutablePeerState::new(handshake_data.tip_header.height);
3820

3821
                peer_loop_handler
3822
                    .run(mock, from_main_rx_clone, &mut peer_state)
3823
                    .await
3824
                    .unwrap();
3825

3826
                if own_proof_is_supreme {
3827
                    match to_main_rx1.try_recv() {
3828
                        Err(TryRecvError::Empty) => (),
3829
                        Err(TryRecvError::Disconnected) => {
3830
                            panic!("to_main channel must still be open")
3831
                        }
3832
                        Ok(_) => panic!("to_main channel must be empty"),
3833
                    }
3834
                } else {
3835
                    match to_main_rx1.try_recv() {
3836
                        Err(TryRecvError::Empty) => panic!("Transaction must be sent to main loop"),
3837
                        Err(TryRecvError::Disconnected) => {
3838
                            panic!("to_main channel must still be open")
3839
                        }
3840
                        Ok(PeerTaskToMain::Transaction(_)) => (),
3841
                        _ => panic!("Unexpected result from channel"),
3842
                    }
3843
                }
3844
            }
3845
        }
3846
    }
3847

3848
    mod sync_challenges {
3849
        use super::*;
3850
        use crate::tests::shared::fake_valid_sequence_of_blocks_for_tests_dyn;
3851

3852
        #[traced_test]
3853
        #[apply(shared_tokio_runtime)]
3854
        async fn bad_sync_challenge_height_greater_than_tip() {
3855
            // Criterium: Challenge height may not exceed that of tip in the
3856
            // request.
3857

3858
            let network = Network::Main;
3859
            let (
3860
                _alice_main_to_peer_tx,
3861
                alice_main_to_peer_rx,
3862
                alice_peer_to_main_tx,
3863
                alice_peer_to_main_rx,
3864
                mut alice,
3865
                alice_hsd,
3866
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default())
3867
                .await
3868
                .unwrap();
3869
            let genesis_block: Block = Block::genesis(network);
3870
            let blocks: [Block; 11] = fake_valid_sequence_of_blocks_for_tests(
3871
                &genesis_block,
3872
                Timestamp::hours(1),
3873
                [0u8; 32],
3874
                network,
3875
            )
3876
            .await;
3877
            for block in &blocks {
3878
                alice.set_new_tip(block.clone()).await.unwrap();
3879
            }
3880

3881
            let bh12 = blocks.last().unwrap().header().height;
3882
            let sync_challenge = SyncChallenge {
3883
                tip_digest: blocks[9].hash(),
3884
                challenges: [bh12; 10],
3885
            };
3886
            let alice_p2p_messages = Mock::new(vec![
3887
                Action::Read(PeerMessage::SyncChallenge(sync_challenge)),
3888
                Action::Read(PeerMessage::Bye),
3889
            ]);
3890

3891
            let peer_address = get_dummy_socket_address(0);
3892
            let mut alice_peer_loop_handler = PeerLoopHandler::new(
3893
                alice_peer_to_main_tx.clone(),
3894
                alice.clone(),
3895
                peer_address,
3896
                alice_hsd,
3897
                false,
3898
                1,
3899
            );
3900
            alice_peer_loop_handler
3901
                .run_wrapper(alice_p2p_messages, alice_main_to_peer_rx)
3902
                .await
3903
                .unwrap();
3904

3905
            drop(alice_peer_to_main_rx);
3906

3907
            let latest_sanction = alice
3908
                .lock_guard()
3909
                .await
3910
                .net
3911
                .get_peer_standing_from_database(peer_address.ip())
3912
                .await
3913
                .unwrap();
3914
            assert_eq!(
3915
                NegativePeerSanction::InvalidSyncChallenge,
3916
                latest_sanction
3917
                    .latest_punishment
3918
                    .expect("peer must be sanctioned")
3919
                    .0
3920
            );
3921
        }
3922

3923
        #[traced_test]
3924
        #[apply(shared_tokio_runtime)]
3925
        async fn bad_sync_challenge_genesis_block_doesnt_crash_client() {
3926
            // Criterium: Challenge may not point to genesis block, or block 1, as
3927
            // tip.
3928

3929
            let network = Network::Main;
3930
            let genesis_block: Block = Block::genesis(network);
3931

3932
            let alice_cli = cli_args::Args::default();
3933
            let (
3934
                _alice_main_to_peer_tx,
3935
                alice_main_to_peer_rx,
3936
                alice_peer_to_main_tx,
3937
                alice_peer_to_main_rx,
3938
                alice,
3939
                alice_hsd,
3940
            ) = get_test_genesis_setup(network, 0, alice_cli).await.unwrap();
3941

3942
            let sync_challenge = SyncChallenge {
3943
                tip_digest: genesis_block.hash(),
3944
                challenges: [BlockHeight::genesis(); 10],
3945
            };
3946

3947
            let alice_p2p_messages = Mock::new(vec![
3948
                Action::Read(PeerMessage::SyncChallenge(sync_challenge)),
3949
                Action::Read(PeerMessage::Bye),
3950
            ]);
3951

3952
            let peer_address = get_dummy_socket_address(0);
3953
            let mut alice_peer_loop_handler = PeerLoopHandler::new(
3954
                alice_peer_to_main_tx.clone(),
3955
                alice.clone(),
3956
                peer_address,
3957
                alice_hsd,
3958
                false,
3959
                1,
3960
            );
3961
            alice_peer_loop_handler
3962
                .run_wrapper(alice_p2p_messages, alice_main_to_peer_rx)
3963
                .await
3964
                .unwrap();
3965

3966
            drop(alice_peer_to_main_rx);
3967

3968
            let latest_sanction = alice
3969
                .lock_guard()
3970
                .await
3971
                .net
3972
                .get_peer_standing_from_database(peer_address.ip())
3973
                .await
3974
                .unwrap();
3975
            assert_eq!(
3976
                NegativePeerSanction::InvalidSyncChallenge,
3977
                latest_sanction
3978
                    .latest_punishment
3979
                    .expect("peer must be sanctioned")
3980
                    .0
3981
            );
3982
        }
3983

3984
        #[traced_test]
3985
        #[apply(shared_tokio_runtime)]
3986
        async fn sync_challenge_happy_path() -> Result<()> {
3987
            // Bob notifies Alice of a block whose parameters satisfy the sync mode
3988
            // criterion. Alice issues a challenge. Bob responds. Alice enters into
3989
            // sync mode.
3990

3991
            let mut rng = rand::rng();
3992
            let network = Network::Main;
3993
            let genesis_block: Block = Block::genesis(network);
3994

3995
            const ALICE_SYNC_MODE_THRESHOLD: usize = 10;
3996
            let alice_cli = cli_args::Args {
3997
                sync_mode_threshold: ALICE_SYNC_MODE_THRESHOLD,
3998
                ..Default::default()
3999
            };
4000
            let (
4001
                _alice_main_to_peer_tx,
4002
                alice_main_to_peer_rx,
4003
                alice_peer_to_main_tx,
4004
                mut alice_peer_to_main_rx,
4005
                mut alice,
4006
                alice_hsd,
4007
            ) = get_test_genesis_setup(network, 0, alice_cli).await?;
4008
            let _alice_socket_address = get_dummy_socket_address(0);
4009

4010
            let (
4011
                _bob_main_to_peer_tx,
4012
                _bob_main_to_peer_rx,
4013
                _bob_peer_to_main_tx,
4014
                _bob_peer_to_main_rx,
4015
                mut bob,
4016
                _bob_hsd,
4017
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
4018
            let bob_socket_address = get_dummy_socket_address(0);
4019

4020
            let now = genesis_block.header().timestamp + Timestamp::hours(1);
4021
            let block_1 = fake_valid_block_for_tests(&alice, rng.random()).await;
4022
            assert!(
4023
                block_1.is_valid(&genesis_block, now, network).await,
4024
                "Block must be valid for this test to make sense"
4025
            );
4026
            let alice_tip = &block_1;
4027
            alice.set_new_tip(block_1.clone()).await?;
4028
            bob.set_new_tip(block_1.clone()).await?;
4029

4030
            // produce enough blocks to ensure alice needs to go into sync mode
4031
            // with this block notification.
4032
            let blocks = fake_valid_sequence_of_blocks_for_tests_dyn(
4033
                &block_1,
4034
                TARGET_BLOCK_INTERVAL,
4035
                rng.random(),
4036
                network,
4037
                rng.random_range(ALICE_SYNC_MODE_THRESHOLD + 1..20),
4038
            )
4039
            .await;
4040
            for block in &blocks {
4041
                bob.set_new_tip(block.clone()).await?;
4042
            }
4043
            let bob_tip = blocks.last().unwrap();
4044

4045
            let block_notification_from_bob = PeerBlockNotification {
4046
                hash: bob_tip.hash(),
4047
                height: bob_tip.header().height,
4048
                cumulative_proof_of_work: bob_tip.header().cumulative_proof_of_work,
4049
            };
4050

4051
            let alice_rng_seed = rng.random::<[u8; 32]>();
4052
            let mut alice_rng_clone = StdRng::from_seed(alice_rng_seed);
4053
            let sync_challenge_from_alice = SyncChallenge::generate(
4054
                &block_notification_from_bob,
4055
                alice_tip.header().height,
4056
                alice_rng_clone.random(),
4057
            );
4058

4059
            println!(
4060
                "sync challenge from alice:\n{:?}",
4061
                sync_challenge_from_alice
4062
            );
4063

4064
            let sync_challenge_response_from_bob = bob
4065
                .lock_guard()
4066
                .await
4067
                .response_to_sync_challenge(sync_challenge_from_alice)
4068
                .await
4069
                .expect("should be able to respond to sync challenge");
4070

4071
            let alice_p2p_messages = Mock::new(vec![
4072
                Action::Read(PeerMessage::BlockNotification(block_notification_from_bob)),
4073
                Action::Write(PeerMessage::SyncChallenge(sync_challenge_from_alice)),
4074
                Action::Read(PeerMessage::SyncChallengeResponse(Box::new(
4075
                    sync_challenge_response_from_bob,
4076
                ))),
4077
                Action::Read(PeerMessage::BlockNotification(block_notification_from_bob)),
4078
                // Ensure no 2nd sync challenge is sent, as timeout has not yet passed.
4079
                // The absence of a Write here checks that a 2nd challenge isn't sent
4080
                // when a successful was just received.
4081
                Action::Read(PeerMessage::Bye),
4082
            ]);
4083

4084
            let mut alice_peer_loop_handler = PeerLoopHandler::with_mocked_time(
4085
                alice_peer_to_main_tx.clone(),
4086
                alice.clone(),
4087
                bob_socket_address,
4088
                alice_hsd,
4089
                false,
4090
                1,
4091
                bob_tip.header().timestamp,
4092
            );
4093
            alice_peer_loop_handler.set_rng(StdRng::from_seed(alice_rng_seed));
4094
            alice_peer_loop_handler
4095
                .run_wrapper(alice_p2p_messages, alice_main_to_peer_rx)
4096
                .await?;
4097

4098
            // AddPeerMaxBlockHeight message triggered *after* sync challenge
4099
            let mut expected_anchor_mmra = bob_tip.body().block_mmr_accumulator.clone();
4100
            expected_anchor_mmra.append(bob_tip.hash());
4101
            let expected_message_from_alice_peer_loop = PeerTaskToMain::AddPeerMaxBlockHeight {
4102
                peer_address: bob_socket_address,
4103
                claimed_height: bob_tip.header().height,
4104
                claimed_cumulative_pow: bob_tip.header().cumulative_proof_of_work,
4105
                claimed_block_mmra: expected_anchor_mmra,
4106
            };
4107
            let observed_message_from_alice_peer_loop = alice_peer_to_main_rx.recv().await.unwrap();
4108
            assert_eq!(
4109
                expected_message_from_alice_peer_loop,
4110
                observed_message_from_alice_peer_loop
4111
            );
4112

4113
            Ok(())
4114
        }
4115
    }
4116
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc