• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Neptune-Crypto / neptune-core / 15745526366

18 Jun 2025 11:11PM UTC coverage: 71.88% (-0.01%) from 71.891%
15745526366

Pull #617

github

web-flow
Merge 441a87665 into 580610b7e
Pull Request #617: chore: Expose more info on process failed

20263 of 28190 relevant lines covered (71.88%)

488416.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.39
/src/peer_loop.rs
1
use std::cmp;
2
use std::marker::Unpin;
3
use std::net::SocketAddr;
4
use std::time::SystemTime;
5

6
use anyhow::bail;
7
use anyhow::Result;
8
use chrono::DateTime;
9
use chrono::Utc;
10
use futures::sink::Sink;
11
use futures::sink::SinkExt;
12
use futures::stream::TryStream;
13
use futures::stream::TryStreamExt;
14
use itertools::Itertools;
15
use rand::rngs::StdRng;
16
use rand::Rng;
17
use rand::SeedableRng;
18
use tasm_lib::triton_vm::prelude::Digest;
19
use tasm_lib::twenty_first::prelude::Mmr;
20
use tasm_lib::twenty_first::prelude::MmrMembershipProof;
21
use tasm_lib::twenty_first::util_types::mmr::mmr_accumulator::MmrAccumulator;
22
use tokio::select;
23
use tokio::sync::broadcast;
24
use tokio::sync::mpsc;
25
use tracing::debug;
26
use tracing::error;
27
use tracing::info;
28
use tracing::warn;
29

30
use crate::connect_to_peers::close_peer_connected_callback;
31
use crate::macros::fn_name;
32
use crate::macros::log_slow_scope;
33
use crate::main_loop::MAX_NUM_DIGESTS_IN_BATCH_REQUEST;
34
use crate::models::blockchain::block::block_height::BlockHeight;
35
use crate::models::blockchain::block::mutator_set_update::MutatorSetUpdate;
36
use crate::models::blockchain::block::Block;
37
use crate::models::blockchain::transaction::transaction_kernel::TransactionConfirmabilityError;
38
use crate::models::blockchain::transaction::Transaction;
39
use crate::models::channel::MainToPeerTask;
40
use crate::models::channel::PeerTaskToMain;
41
use crate::models::channel::PeerTaskToMainTransaction;
42
use crate::models::peer::handshake_data::HandshakeData;
43
use crate::models::peer::peer_info::PeerConnectionInfo;
44
use crate::models::peer::peer_info::PeerInfo;
45
use crate::models::peer::transfer_block::TransferBlock;
46
use crate::models::peer::BlockProposalRequest;
47
use crate::models::peer::BlockRequestBatch;
48
use crate::models::peer::IssuedSyncChallenge;
49
use crate::models::peer::MutablePeerState;
50
use crate::models::peer::NegativePeerSanction;
51
use crate::models::peer::PeerMessage;
52
use crate::models::peer::PeerSanction;
53
use crate::models::peer::PeerStanding;
54
use crate::models::peer::PositivePeerSanction;
55
use crate::models::peer::SyncChallenge;
56
use crate::models::proof_abstractions::mast_hash::MastHash;
57
use crate::models::proof_abstractions::timestamp::Timestamp;
58
use crate::models::state::block_proposal::BlockProposalRejectError;
59
use crate::models::state::mempool::MEMPOOL_IGNORE_TRANSACTIONS_THIS_MANY_SECS_AHEAD;
60
use crate::models::state::mempool::MEMPOOL_TX_THRESHOLD_AGE_IN_SECS;
61
use crate::models::state::GlobalState;
62
use crate::models::state::GlobalStateLock;
63
use crate::util_types::mutator_set::removal_record::RemovalRecordValidityError;
64

65
const STANDARD_BLOCK_BATCH_SIZE: usize = 250;
66
const MAX_PEER_LIST_LENGTH: usize = 10;
67
const MINIMUM_BLOCK_BATCH_SIZE: usize = 2;
68

69
const KEEP_CONNECTION_ALIVE: bool = false;
70
const DISCONNECT_CONNECTION: bool = true;
71

72
pub type PeerStandingNumber = i32;
73

74
/// Handles messages from peers via TCP
75
///
76
/// also handles messages from main task over the main-to-peer-tasks broadcast
77
/// channel.
78
#[derive(Debug, Clone)]
79
pub struct PeerLoopHandler {
80
    to_main_tx: mpsc::Sender<PeerTaskToMain>,
81
    global_state_lock: GlobalStateLock,
82
    peer_address: SocketAddr,
83
    peer_handshake_data: HandshakeData,
84
    inbound_connection: bool,
85
    distance: u8,
86
    rng: StdRng,
87
    #[cfg(test)]
88
    mock_now: Option<Timestamp>,
89
}
90

91
impl PeerLoopHandler {
92
    pub(crate) fn new(
28✔
93
        to_main_tx: mpsc::Sender<PeerTaskToMain>,
28✔
94
        global_state_lock: GlobalStateLock,
28✔
95
        peer_address: SocketAddr,
28✔
96
        peer_handshake_data: HandshakeData,
28✔
97
        inbound_connection: bool,
28✔
98
        distance: u8,
28✔
99
    ) -> Self {
28✔
100
        Self {
28✔
101
            to_main_tx,
28✔
102
            global_state_lock,
28✔
103
            peer_address,
28✔
104
            peer_handshake_data,
28✔
105
            inbound_connection,
28✔
106
            distance,
28✔
107
            rng: StdRng::from_rng(&mut rand::rng()),
28✔
108
            #[cfg(test)]
28✔
109
            mock_now: None,
28✔
110
        }
28✔
111
    }
28✔
112

113
    /// Allows for mocked timestamps such that time dependencies may be tested.
114
    #[cfg(test)]
115
    pub(crate) fn with_mocked_time(
20✔
116
        to_main_tx: mpsc::Sender<PeerTaskToMain>,
20✔
117
        global_state_lock: GlobalStateLock,
20✔
118
        peer_address: SocketAddr,
20✔
119
        peer_handshake_data: HandshakeData,
20✔
120
        inbound_connection: bool,
20✔
121
        distance: u8,
20✔
122
        mocked_time: Timestamp,
20✔
123
    ) -> Self {
20✔
124
        Self {
20✔
125
            to_main_tx,
20✔
126
            global_state_lock,
20✔
127
            peer_address,
20✔
128
            peer_handshake_data,
20✔
129
            inbound_connection,
20✔
130
            distance,
20✔
131
            mock_now: Some(mocked_time),
20✔
132
            rng: StdRng::from_rng(&mut rand::rng()),
20✔
133
        }
20✔
134
    }
20✔
135

136
    /// Overwrite the random number generator object with a specific one.
137
    ///
138
    /// Useful for derandomizing tests.
139
    #[cfg(test)]
140
    fn set_rng(&mut self, rng: StdRng) {
1✔
141
        self.rng = rng;
1✔
142
    }
1✔
143

144
    fn now(&self) -> Timestamp {
54✔
145
        #[cfg(not(test))]
146
        {
147
            Timestamp::now()
27✔
148
        }
149
        #[cfg(test)]
150
        {
151
            self.mock_now.unwrap_or(Timestamp::now())
27✔
152
        }
153
    }
54✔
154

155
    /// Punish a peer for bad behavior.
156
    ///
157
    /// Return `Err` if the peer in question is (now) banned.
158
    ///
159
    /// # Locking:
160
    ///   * acquires `global_state_lock` for write
161
    async fn punish(&mut self, reason: NegativePeerSanction) -> Result<()> {
6✔
162
        let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
6✔
163
        warn!("Punishing peer {} for {:?}", self.peer_address.ip(), reason);
6✔
164
        debug!(
6✔
165
            "Peer standing before punishment is {}",
×
166
            global_state_mut
×
167
                .net
×
168
                .peer_map
×
169
                .get(&self.peer_address)
×
170
                .unwrap()
×
171
                .standing
172
        );
173

174
        let Some(peer_info) = global_state_mut.net.peer_map.get_mut(&self.peer_address) else {
6✔
175
            bail!("Could not read peer map.");
×
176
        };
177
        let sanction_result = peer_info.standing.sanction(PeerSanction::Negative(reason));
6✔
178
        if let Err(err) = sanction_result {
6✔
179
            warn!("Banning peer: {err}");
1✔
180
        }
5✔
181

182
        sanction_result.map_err(|err| anyhow::anyhow!("Banning peer: {err}"))
6✔
183
    }
6✔
184

185
    /// Reward a peer for good behavior.
186
    ///
187
    /// Return `Err` if the peer in question is banned.
188
    ///
189
    /// # Locking:
190
    ///   * acquires `global_state_lock` for write
191
    async fn reward(&mut self, reason: PositivePeerSanction) -> Result<()> {
19✔
192
        let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
19✔
193
        info!("Rewarding peer {} for {:?}", self.peer_address.ip(), reason);
19✔
194
        let Some(peer_info) = global_state_mut.net.peer_map.get_mut(&self.peer_address) else {
19✔
195
            error!("Could not read peer map.");
1✔
196
            return Ok(());
1✔
197
        };
198
        let sanction_result = peer_info.standing.sanction(PeerSanction::Positive(reason));
18✔
199
        if sanction_result.is_err() {
18✔
200
            error!("Cannot reward banned peer");
×
201
        }
18✔
202

203
        sanction_result.map_err(|err| anyhow::anyhow!("Cannot reward banned peer: {err}"))
18✔
204
    }
19✔
205

206
    /// Construct a batch response, with blocks and their MMR membership proofs
207
    /// relative to a specified anchor.
208
    ///
209
    /// Returns `None` if the anchor has a lower leaf count than the blocks, or
210
    /// a block height of the response exceeds own tip height.
211
    async fn batch_response(
16✔
212
        state: &GlobalState,
16✔
213
        blocks: Vec<Block>,
16✔
214
        anchor: &MmrAccumulator,
16✔
215
    ) -> Option<Vec<(TransferBlock, MmrMembershipProof)>> {
16✔
216
        let own_tip_height = state.chain.light_state().header().height;
16✔
217
        let block_heights_match_anchor = blocks
16✔
218
            .iter()
16✔
219
            .all(|bl| bl.header().height < anchor.num_leafs().into());
46✔
220
        let block_heights_known = blocks.iter().all(|bl| bl.header().height <= own_tip_height);
46✔
221
        if !block_heights_match_anchor || !block_heights_known {
16✔
222
            let max_block_height = match blocks.iter().map(|bl| bl.header().height).max() {
×
223
                Some(height) => height.to_string(),
×
224
                None => "None".to_owned(),
×
225
            };
226

227
            debug!("max_block_height: {max_block_height}");
×
228
            debug!("own_tip_height: {own_tip_height}");
×
229
            debug!("anchor.num_leafs(): {}", anchor.num_leafs());
×
230
            debug!("block_heights_match_anchor: {block_heights_match_anchor}");
×
231
            debug!("block_heights_known: {block_heights_known}");
×
232
            return None;
×
233
        }
16✔
234

235
        let mut ret = vec![];
16✔
236
        for block in blocks {
62✔
237
            let mmr_mp = state
46✔
238
                .chain
46✔
239
                .archival_state()
46✔
240
                .archival_block_mmr
46✔
241
                .ammr()
46✔
242
                .prove_membership_relative_to_smaller_mmr(
46✔
243
                    block.header().height.into(),
46✔
244
                    anchor.num_leafs(),
46✔
245
                )
46✔
246
                .await;
46✔
247
            let block: TransferBlock = block.try_into().unwrap();
46✔
248
            ret.push((block, mmr_mp));
46✔
249
        }
250

251
        Some(ret)
16✔
252
    }
16✔
253

254
    /// Handle validation and send all blocks to the main task if they're all
255
    /// valid. Use with a list of blocks or a single block. When the
256
    /// `received_blocks` is a list, the parent of the `i+1`th block in the
257
    /// list is the `i`th block. The parent of element zero in this list is
258
    /// `parent_of_first_block`.
259
    ///
260
    /// # Return Value
261
    ///  - `Err` when the connection should be closed;
262
    ///  - `Ok(None)` if some block is invalid
263
    ///  - `Ok(None)` if the last block has insufficient cumulative PoW and we
264
    ///    are not syncing;
265
    ///  - `Ok(None)` if the last block has insufficient height and we are
266
    ///    syncing;
267
    ///  - `Ok(Some(block_height))` otherwise, referring to the block with the
268
    ///    highest height in the batch.
269
    ///
270
    /// A return value of Ok(Some(_)) means that the message was passed on to
271
    /// main loop.
272
    ///
273
    /// # Locking
274
    ///   * Acquires `global_state_lock` for write via `self.punish(..)` and
275
    ///     `self.reward(..)`.
276
    ///
277
    /// # Panics
278
    ///
279
    ///  - Panics if called with the empty list.
280
    async fn handle_blocks(
8✔
281
        &mut self,
8✔
282
        received_blocks: Vec<Block>,
8✔
283
        parent_of_first_block: Block,
8✔
284
    ) -> Result<Option<BlockHeight>> {
20✔
285
        debug!(
20✔
286
            "attempting to validate {} {}",
×
287
            received_blocks.len(),
×
288
            if received_blocks.len() == 1 {
×
289
                "block"
×
290
            } else {
291
                "blocks"
×
292
            }
293
        );
294
        let now = self.now();
20✔
295
        debug!("validating with respect to current timestamp {now}");
20✔
296
        let mut previous_block = &parent_of_first_block;
20✔
297
        for new_block in &received_blocks {
48✔
298
            let new_block_has_proof_of_work = new_block.has_proof_of_work(
29✔
299
                self.global_state_lock.cli().network,
29✔
300
                previous_block.header(),
29✔
301
            );
302
            debug!("new block has proof of work? {new_block_has_proof_of_work}");
29✔
303
            let new_block_is_valid = new_block
29✔
304
                .is_valid(previous_block, now, self.global_state_lock.cli().network)
29✔
305
                .await;
29✔
306
            debug!("new block is valid? {new_block_is_valid}");
29✔
307
            if !new_block_has_proof_of_work {
29✔
308
                warn!(
1✔
309
                    "Received invalid proof-of-work for block of height {} from peer with IP {}",
×
310
                    new_block.kernel.header.height, self.peer_address
×
311
                );
312
                warn!("Difficulty is {}.", previous_block.kernel.header.difficulty);
1✔
313
                warn!(
1✔
314
                    "Proof of work should be {} (or more) but was [{}].",
×
315
                    previous_block.kernel.header.difficulty.target(),
×
316
                    new_block.hash().values().iter().join(", ")
×
317
                );
318
                self.punish(NegativePeerSanction::InvalidBlock((
1✔
319
                    new_block.kernel.header.height,
1✔
320
                    new_block.hash(),
1✔
321
                )))
1✔
322
                .await?;
1✔
323
                warn!("Failed to validate block due to insufficient PoW");
1✔
324
                return Ok(None);
1✔
325
            } else if !new_block_is_valid {
28✔
326
                warn!(
×
327
                    "Received invalid block of height {} from peer with IP {}",
×
328
                    new_block.kernel.header.height, self.peer_address
×
329
                );
330
                self.punish(NegativePeerSanction::InvalidBlock((
×
331
                    new_block.kernel.header.height,
×
332
                    new_block.hash(),
×
333
                )))
×
334
                .await?;
×
335
                warn!("Failed to validate block: invalid block");
×
336
                return Ok(None);
×
337
            }
28✔
338
            info!(
28✔
339
                "Block with height {} is valid. mined: {}",
×
340
                new_block.kernel.header.height,
×
341
                new_block.kernel.header.timestamp.standard_format()
×
342
            );
343

344
            previous_block = new_block;
28✔
345
        }
346

347
        // evaluate the fork choice rule
348
        debug!("Checking last block's canonicity ...");
19✔
349
        let last_block = received_blocks.last().unwrap();
19✔
350
        let is_canonical = self
19✔
351
            .global_state_lock
19✔
352
            .lock_guard()
19✔
353
            .await
19✔
354
            .incoming_block_is_more_canonical(last_block);
19✔
355
        let last_block_height = last_block.header().height;
19✔
356
        let sync_mode_active_and_have_new_champion = self
19✔
357
            .global_state_lock
19✔
358
            .lock_guard()
19✔
359
            .await
19✔
360
            .net
361
            .sync_anchor
362
            .as_ref()
19✔
363
            .is_some_and(|x| {
19✔
364
                x.champion
×
365
                    .is_none_or(|(height, _)| height < last_block_height)
×
366
            });
×
367
        if !is_canonical && !sync_mode_active_and_have_new_champion {
19✔
368
            warn!(
1✔
369
                "Received {} blocks from peer but incoming blocks are less \
×
370
            canonical than current tip, or current sync-champion.",
×
371
                received_blocks.len()
×
372
            );
373
            return Ok(None);
1✔
374
        }
18✔
375

376
        // Send the new blocks to the main task which handles the state update
377
        // and storage to the database.
378
        let number_of_received_blocks = received_blocks.len();
18✔
379
        self.to_main_tx
18✔
380
            .send(PeerTaskToMain::NewBlocks(received_blocks))
18✔
381
            .await?;
18✔
382
        info!(
18✔
383
            "Updated block info by block from peer. block height {}",
×
384
            last_block_height
385
        );
386

387
        // Valuable, new, hard-to-produce information. Reward peer.
388
        self.reward(PositivePeerSanction::ValidBlocks(number_of_received_blocks))
18✔
389
            .await?;
18✔
390

391
        Ok(Some(last_block_height))
18✔
392
    }
20✔
393

394
    /// Take a single block received from a peer and (attempt to) find a path
395
    /// between the received block and a common ancestor stored in the blocks
396
    /// database.
397
    ///
398
    /// This function attempts to find the parent of the received block, either
399
    /// by searching the database or by requesting it from a peer.
400
    ///  - If the parent is not stored, it is requested from the peer and the
401
    ///    received block is pushed to the fork reconciliation list for later
402
    ///    handling by this function. The fork reconciliation list starts out
403
    ///    empty, but grows as more parents are requested and transmitted.
404
    ///  - If the parent is found in the database, a) block handling continues:
405
    ///    the entire list of fork reconciliation blocks are passed down the
406
    ///    pipeline, potentially leading to a state update; and b) the fork
407
    ///    reconciliation list is cleared.
408
    ///
409
    /// Locking:
410
    ///   * Acquires `global_state_lock` for write via `self.punish(..)` and
411
    ///     `self.reward(..)`.
412
    async fn try_ensure_path<S>(
32✔
413
        &mut self,
32✔
414
        received_block: Box<Block>,
32✔
415
        peer: &mut S,
32✔
416
        peer_state: &mut MutablePeerState,
32✔
417
    ) -> Result<()>
32✔
418
    where
32✔
419
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
32✔
420
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
32✔
421
        <S as TryStream>::Error: std::error::Error,
32✔
422
    {
32✔
423
        // Does the received block match the fork reconciliation list?
424
        let received_block_matches_fork_reconciliation_list = if let Some(successor) =
32✔
425
            peer_state.fork_reconciliation_blocks.last()
32✔
426
        {
427
            let valid = successor
10✔
428
                .is_valid(
10✔
429
                    received_block.as_ref(),
10✔
430
                    self.now(),
10✔
431
                    self.global_state_lock.cli().network,
10✔
432
                )
10✔
433
                .await;
10✔
434
            if !valid {
10✔
435
                warn!(
×
436
                        "Fork reconciliation failed after receiving {} blocks: successor of received block is invalid",
×
437
                        peer_state.fork_reconciliation_blocks.len() + 1
×
438
                    );
439
            }
10✔
440
            valid
10✔
441
        } else {
442
            true
22✔
443
        };
444

445
        // Are we running out of RAM?
446
        let too_many_blocks = peer_state.fork_reconciliation_blocks.len() + 1
32✔
447
            >= self.global_state_lock.cli().sync_mode_threshold;
32✔
448
        if too_many_blocks {
32✔
449
            warn!(
1✔
450
                "Fork reconciliation failed after receiving {} blocks: block count exceeds sync mode threshold",
×
451
                peer_state.fork_reconciliation_blocks.len() + 1
×
452
            );
453
        }
31✔
454

455
        // Block mismatch or too many blocks: abort!
456
        if !received_block_matches_fork_reconciliation_list || too_many_blocks {
32✔
457
            self.punish(NegativePeerSanction::ForkResolutionError((
1✔
458
                received_block.header().height,
1✔
459
                peer_state.fork_reconciliation_blocks.len() as u16,
1✔
460
                received_block.hash(),
1✔
461
            )))
1✔
462
            .await?;
1✔
463
            peer_state.fork_reconciliation_blocks = vec![];
1✔
464
            return Ok(());
1✔
465
        }
31✔
466

467
        // otherwise, append
468
        peer_state.fork_reconciliation_blocks.push(*received_block);
31✔
469

470
        // Try fetch parent
471
        let received_block_header = *peer_state
31✔
472
            .fork_reconciliation_blocks
31✔
473
            .last()
31✔
474
            .unwrap()
31✔
475
            .header();
31✔
476

477
        let parent_digest = received_block_header.prev_block_digest;
31✔
478
        let parent_height = received_block_header.height.previous()
31✔
479
            .expect("transferred block must have previous height because genesis block cannot be transferred");
31✔
480
        debug!("Try ensure path: fetching parent block");
31✔
481
        let parent_block = self
31✔
482
            .global_state_lock
31✔
483
            .lock_guard()
31✔
484
            .await
31✔
485
            .chain
486
            .archival_state()
31✔
487
            .get_block(parent_digest)
31✔
488
            .await?;
31✔
489
        debug!(
31✔
490
            "Completed parent block fetching from DB: {}",
×
491
            if parent_block.is_some() {
×
492
                "found".to_string()
×
493
            } else {
494
                "not found".to_string()
×
495
            }
496
        );
497

498
        // If parent is not known (but not genesis) request it.
499
        let Some(parent_block) = parent_block else {
31✔
500
            if parent_height.is_genesis() {
11✔
501
                peer_state.fork_reconciliation_blocks.clear();
1✔
502
                self.punish(NegativePeerSanction::DifferentGenesis).await?;
1✔
503
                return Ok(());
×
504
            }
10✔
505
            info!(
10✔
506
                "Parent not known: Requesting previous block with height {} from peer",
×
507
                parent_height
508
            );
509

510
            peer.send(PeerMessage::BlockRequestByHash(parent_digest))
10✔
511
                .await?;
10✔
512

513
            return Ok(());
10✔
514
        };
515

516
        // We want to treat the received fork reconciliation blocks (plus the
517
        // received block) in reverse order, from oldest to newest, because
518
        // they were requested from high to low block height.
519
        let mut new_blocks = peer_state.fork_reconciliation_blocks.clone();
20✔
520
        new_blocks.reverse();
20✔
521

522
        // Reset the fork resolution state since we got all the way back to a
523
        // block that we have.
524
        let fork_reconciliation_event = !peer_state.fork_reconciliation_blocks.is_empty();
20✔
525
        peer_state.fork_reconciliation_blocks.clear();
20✔
526

527
        if let Some(new_block_height) = self.handle_blocks(new_blocks, parent_block).await? {
20✔
528
            // If `BlockNotification` was received during a block reconciliation
529
            // event, then the peer might have one (or more (unlikely)) blocks
530
            // that we do not have. We should thus request those blocks.
531
            if fork_reconciliation_event
18✔
532
                && peer_state.highest_shared_block_height > new_block_height
18✔
533
            {
534
                peer.send(PeerMessage::BlockRequestByHeight(
1✔
535
                    peer_state.highest_shared_block_height,
1✔
536
                ))
1✔
537
                .await?;
1✔
538
            }
17✔
539
        }
2✔
540

541
        Ok(())
20✔
542
    }
32✔
543

544
    /// Handle peer messages and returns Ok(true) if connection should be closed.
545
    /// Connection should also be closed if an error is returned.
546
    /// Otherwise, returns OK(false).
547
    ///
548
    /// Locking:
549
    ///   * Acquires `global_state_lock` for read.
550
    ///   * Acquires `global_state_lock` for write via `self.punish(..)` and
551
    ///     `self.reward(..)`.
552
    async fn handle_peer_message<S>(
147✔
553
        &mut self,
147✔
554
        msg: PeerMessage,
147✔
555
        peer: &mut S,
147✔
556
        peer_state_info: &mut MutablePeerState,
147✔
557
    ) -> Result<bool>
147✔
558
    where
147✔
559
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
147✔
560
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
147✔
561
        <S as TryStream>::Error: std::error::Error,
147✔
562
    {
147✔
563
        debug!(
147✔
564
            "Received {} from peer {}",
×
565
            msg.get_type(),
×
566
            self.peer_address
567
        );
568
        match msg {
147✔
569
            PeerMessage::Bye => {
570
                // Note that the current peer is not removed from the global_state.peer_map here
571
                // but that this is done by the caller.
572
                info!("Got bye. Closing connection to peer");
40✔
573
                Ok(DISCONNECT_CONNECTION)
40✔
574
            }
575
            PeerMessage::PeerListRequest => {
576
                let peer_info = {
5✔
577
                    log_slow_scope!(fn_name!() + "::PeerMessage::PeerListRequest");
5✔
578

579
                    // We are interested in the address on which peers accept ingoing connections,
580
                    // not in the address in which they are connected to us. We are only interested in
581
                    // peers that accept incoming connections.
582
                    let mut peer_info: Vec<(SocketAddr, u128)> = self
5✔
583
                        .global_state_lock
5✔
584
                        .lock_guard()
5✔
585
                        .await
5✔
586
                        .net
587
                        .peer_map
588
                        .values()
5✔
589
                        .filter(|peer_info| peer_info.listen_address().is_some())
8✔
590
                        .take(MAX_PEER_LIST_LENGTH) // limit length of response
5✔
591
                        .map(|peer_info| {
8✔
592
                            (
8✔
593
                                // unwrap is safe bc of above `filter`
8✔
594
                                peer_info.listen_address().unwrap(),
8✔
595
                                peer_info.instance_id(),
8✔
596
                            )
8✔
597
                        })
8✔
598
                        .collect();
5✔
599

600
                    // We sort the returned list, so this function is easier to test
601
                    peer_info.sort_by_cached_key(|x| x.0);
5✔
602
                    peer_info
5✔
603
                };
604

605
                debug!("Responding with: {:?}", peer_info);
5✔
606
                peer.send(PeerMessage::PeerListResponse(peer_info)).await?;
5✔
607
                Ok(KEEP_CONNECTION_ALIVE)
5✔
608
            }
609
            PeerMessage::PeerListResponse(peers) => {
3✔
610
                log_slow_scope!(fn_name!() + "::PeerMessage::PeerListResponse");
3✔
611

612
                if peers.len() > MAX_PEER_LIST_LENGTH {
3✔
613
                    self.punish(NegativePeerSanction::FloodPeerListResponse)
×
614
                        .await?;
×
615
                }
3✔
616
                self.to_main_tx
3✔
617
                    .send(PeerTaskToMain::PeerDiscoveryAnswer((
3✔
618
                        peers,
3✔
619
                        self.peer_address,
3✔
620
                        // The distance to the revealed peers is 1 + this peer's distance
3✔
621
                        self.distance + 1,
3✔
622
                    )))
3✔
623
                    .await?;
3✔
624
                Ok(KEEP_CONNECTION_ALIVE)
3✔
625
            }
626
            PeerMessage::BlockNotificationRequest => {
627
                debug!("Got BlockNotificationRequest");
×
628

629
                peer.send(PeerMessage::BlockNotification(
×
630
                    self.global_state_lock
×
631
                        .lock_guard()
×
632
                        .await
×
633
                        .chain
634
                        .light_state()
×
635
                        .into(),
×
636
                ))
637
                .await?;
×
638

639
                Ok(KEEP_CONNECTION_ALIVE)
×
640
            }
641
            PeerMessage::BlockNotification(block_notification) => {
16✔
642
                const SYNC_CHALLENGE_COOLDOWN: Timestamp = Timestamp::minutes(10);
643

644
                let (tip_header, sync_anchor_is_set) = {
16✔
645
                    let state = self.global_state_lock.lock_guard().await;
16✔
646
                    (
16✔
647
                        *state.chain.light_state().header(),
16✔
648
                        state.net.sync_anchor.is_some(),
16✔
649
                    )
16✔
650
                };
651
                debug!(
16✔
652
                    "Got BlockNotification of height {}. Own height is {}",
×
653
                    block_notification.height, tip_header.height
654
                );
655

656
                let sync_mode_threshold = self.global_state_lock.cli().sync_mode_threshold;
16✔
657
                let now = self.now();
16✔
658
                let time_since_latest_successful_challenge = peer_state_info
16✔
659
                    .successful_sync_challenge_response_time
16✔
660
                    .map(|then| now - then);
16✔
661
                let cooldown_expired = time_since_latest_successful_challenge
16✔
662
                    .is_none_or(|time_passed| time_passed > SYNC_CHALLENGE_COOLDOWN);
16✔
663
                let exceeds_sync_mode_threshold = GlobalState::sync_mode_threshold_stateless(
16✔
664
                    &tip_header,
16✔
665
                    block_notification.height,
16✔
666
                    block_notification.cumulative_proof_of_work,
16✔
667
                    sync_mode_threshold,
16✔
668
                );
669
                if cooldown_expired && exceeds_sync_mode_threshold {
16✔
670
                    debug!("sync mode criterion satisfied.");
1✔
671

672
                    if peer_state_info.sync_challenge.is_some() {
1✔
673
                        warn!("Cannot launch new sync challenge because one is already on-going.");
×
674
                        return Ok(KEEP_CONNECTION_ALIVE);
×
675
                    }
1✔
676

677
                    info!(
1✔
678
                        "Peer indicates block which satisfies sync mode criterion, issuing challenge."
×
679
                    );
680
                    let challenge = SyncChallenge::generate(
1✔
681
                        &block_notification,
1✔
682
                        tip_header.height,
1✔
683
                        self.rng.random(),
1✔
684
                    );
685
                    peer_state_info.sync_challenge = Some(IssuedSyncChallenge::new(
1✔
686
                        challenge,
1✔
687
                        block_notification.cumulative_proof_of_work,
1✔
688
                        self.now(),
1✔
689
                    ));
1✔
690

691
                    debug!("sending challenge ...");
1✔
692
                    peer.send(PeerMessage::SyncChallenge(challenge)).await?;
1✔
693

694
                    return Ok(KEEP_CONNECTION_ALIVE);
1✔
695
                }
15✔
696

697
                peer_state_info.highest_shared_block_height = block_notification.height;
15✔
698
                let block_is_new = tip_header.cumulative_proof_of_work
15✔
699
                    < block_notification.cumulative_proof_of_work;
15✔
700

701
                debug!("block_is_new: {}", block_is_new);
15✔
702

703
                if block_is_new
15✔
704
                    && peer_state_info.fork_reconciliation_blocks.is_empty()
15✔
705
                    && !sync_anchor_is_set
14✔
706
                    && !exceeds_sync_mode_threshold
14✔
707
                {
708
                    debug!(
13✔
709
                        "sending BlockRequestByHeight to peer for block with height {}",
×
710
                        block_notification.height
711
                    );
712
                    peer.send(PeerMessage::BlockRequestByHeight(block_notification.height))
13✔
713
                        .await?;
13✔
714
                } else {
715
                    debug!(
2✔
716
                        "ignoring peer block. height {}. new: {}, reconciling_fork: {}",
×
717
                        block_notification.height,
718
                        block_is_new,
719
                        !peer_state_info.fork_reconciliation_blocks.is_empty()
×
720
                    );
721
                }
722

723
                Ok(KEEP_CONNECTION_ALIVE)
15✔
724
            }
725
            PeerMessage::SyncChallenge(sync_challenge) => {
2✔
726
                let response = {
×
727
                    log_slow_scope!(fn_name!() + "::PeerMessage::SyncChallenge");
2✔
728

729
                    info!("Got sync challenge from {}", self.peer_address.ip());
2✔
730

731
                    let response = self
2✔
732
                        .global_state_lock
2✔
733
                        .lock_guard()
2✔
734
                        .await
2✔
735
                        .response_to_sync_challenge(sync_challenge)
2✔
736
                        .await;
2✔
737

738
                    match response {
2✔
739
                        Ok(resp) => resp,
×
740
                        Err(e) => {
2✔
741
                            warn!("could not generate sync challenge response:\n{e}");
2✔
742
                            self.punish(NegativePeerSanction::InvalidSyncChallenge)
2✔
743
                                .await?;
2✔
744
                            return Ok(KEEP_CONNECTION_ALIVE);
2✔
745
                        }
746
                    }
747
                };
748

749
                info!(
×
750
                    "Responding to sync challenge from {}",
×
751
                    self.peer_address.ip()
×
752
                );
753
                peer.send(PeerMessage::SyncChallengeResponse(Box::new(response)))
×
754
                    .await?;
×
755

756
                Ok(KEEP_CONNECTION_ALIVE)
×
757
            }
758
            PeerMessage::SyncChallengeResponse(challenge_response) => {
1✔
759
                const SYNC_RESPONSE_TIMEOUT: Timestamp = Timestamp::seconds(45);
760

761
                log_slow_scope!(fn_name!() + "::PeerMessage::SyncChallengeResponse");
1✔
762
                info!(
1✔
763
                    "Got sync challenge response from {}",
×
764
                    self.peer_address.ip()
×
765
                );
766

767
                // The purpose of the sync challenge and sync challenge response
768
                // is to avoid going into sync mode based on a malicious target
769
                // fork. Instead of verifying that the claimed proof-of-work
770
                // number is correct (which would require sending and verifying,
771
                // at least, all blocks between luca (whatever that is) and the
772
                // claimed tip), we use a heuristic that requires less
773
                // communication and less verification work. The downside of
774
                // using a heuristic here is a nonzero false positive and false
775
                // negative rate. Note that the false negative event
776
                // (maliciously sending someone into sync mode based on a bogus
777
                // fork) still requires a significant amount of work from the
778
                // attacker, *in addition* to being lucky. Also, down the line
779
                // succinctness (and more specifically, recursive block
780
                // validation) obviates this entire subprotocol.
781

782
                // Did we issue a challenge?
783
                let Some(issued_challenge) = peer_state_info.sync_challenge else {
1✔
784
                    warn!("Sync challenge response was not prompted.");
×
785
                    self.punish(NegativePeerSanction::UnexpectedSyncChallengeResponse)
×
786
                        .await?;
×
787
                    return Ok(KEEP_CONNECTION_ALIVE);
×
788
                };
789

790
                // Reset the challenge, regardless of the response's success.
791
                peer_state_info.sync_challenge = None;
1✔
792

793
                // Does response match issued challenge?
794
                if !challenge_response
1✔
795
                    .matches(self.global_state_lock.cli().network, issued_challenge)
1✔
796
                {
797
                    self.punish(NegativePeerSanction::InvalidSyncChallengeResponse)
×
798
                        .await?;
×
799
                    return Ok(KEEP_CONNECTION_ALIVE);
×
800
                }
1✔
801

802
                // Does response verify?
803
                let claimed_tip_height = challenge_response.tip.header.height;
1✔
804
                let now = self.now();
1✔
805
                if !challenge_response
1✔
806
                    .is_valid(now, self.global_state_lock.cli().network)
1✔
807
                    .await
1✔
808
                {
809
                    self.punish(NegativePeerSanction::InvalidSyncChallengeResponse)
×
810
                        .await?;
×
811
                    return Ok(KEEP_CONNECTION_ALIVE);
×
812
                }
1✔
813

814
                // Does cumulative proof-of-work evolve reasonably?
815
                let own_tip_header = *self
1✔
816
                    .global_state_lock
1✔
817
                    .lock_guard()
1✔
818
                    .await
1✔
819
                    .chain
820
                    .light_state()
1✔
821
                    .header();
1✔
822
                if !challenge_response
1✔
823
                    .check_pow(self.global_state_lock.cli().network, own_tip_header.height)
1✔
824
                {
825
                    self.punish(NegativePeerSanction::FishyPowEvolutionChallengeResponse)
×
826
                        .await?;
×
827
                    return Ok(KEEP_CONNECTION_ALIVE);
×
828
                }
1✔
829

830
                // Is there some specific (*i.e.*, not aggregate) proof of work?
831
                if !challenge_response.check_difficulty(own_tip_header.difficulty) {
1✔
832
                    self.punish(NegativePeerSanction::FishyDifficultiesChallengeResponse)
×
833
                        .await?;
×
834
                    return Ok(KEEP_CONNECTION_ALIVE);
×
835
                }
1✔
836

837
                // Did it come in time?
838
                if now - issued_challenge.issued_at > SYNC_RESPONSE_TIMEOUT {
1✔
839
                    self.punish(NegativePeerSanction::TimedOutSyncChallengeResponse)
×
840
                        .await?;
×
841
                    return Ok(KEEP_CONNECTION_ALIVE);
×
842
                }
1✔
843

844
                info!("Successful sync challenge response; relaying peer tip info to main loop.");
1✔
845
                peer_state_info.successful_sync_challenge_response_time = Some(now);
1✔
846

847
                let mut sync_mmra_anchor = challenge_response.tip.body.block_mmr_accumulator;
1✔
848
                sync_mmra_anchor.append(issued_challenge.challenge.tip_digest);
1✔
849

850
                // Inform main loop
851
                self.to_main_tx
1✔
852
                    .send(PeerTaskToMain::AddPeerMaxBlockHeight {
1✔
853
                        peer_address: self.peer_address,
1✔
854
                        claimed_height: claimed_tip_height,
1✔
855
                        claimed_cumulative_pow: issued_challenge.accumulated_pow,
1✔
856
                        claimed_block_mmra: sync_mmra_anchor,
1✔
857
                    })
1✔
858
                    .await?;
1✔
859

860
                Ok(KEEP_CONNECTION_ALIVE)
1✔
861
            }
862
            PeerMessage::BlockRequestByHash(block_digest) => {
×
863
                let block = self
×
864
                    .global_state_lock
×
865
                    .lock_guard()
×
866
                    .await
×
867
                    .chain
868
                    .archival_state()
×
869
                    .get_block(block_digest)
×
870
                    .await?;
×
871

872
                match block {
×
873
                    None => {
874
                        // TODO: Consider punishing here
875
                        warn!("Peer requested unknown block with hash {}", block_digest);
×
876
                        Ok(KEEP_CONNECTION_ALIVE)
×
877
                    }
878
                    Some(b) => {
×
879
                        peer.send(PeerMessage::Block(Box::new(b.try_into().unwrap())))
×
880
                            .await?;
×
881
                        Ok(KEEP_CONNECTION_ALIVE)
×
882
                    }
883
                }
884
            }
885
            PeerMessage::BlockRequestByHeight(block_height) => {
16✔
886
                let block_response = {
15✔
887
                    log_slow_scope!(fn_name!() + "::PeerMessage::BlockRequestByHeight");
16✔
888

889
                    debug!("Got BlockRequestByHeight of height {}", block_height);
16✔
890

891
                    let canonical_block_digest = self
16✔
892
                        .global_state_lock
16✔
893
                        .lock_guard()
16✔
894
                        .await
16✔
895
                        .chain
896
                        .archival_state()
16✔
897
                        .archival_block_mmr
898
                        .ammr()
16✔
899
                        .try_get_leaf(block_height.into())
16✔
900
                        .await;
16✔
901

902
                    let Some(canonical_block_digest) = canonical_block_digest else {
16✔
903
                        let own_tip_height = self
1✔
904
                            .global_state_lock
1✔
905
                            .lock_guard()
1✔
906
                            .await
1✔
907
                            .chain
908
                            .light_state()
1✔
909
                            .header()
1✔
910
                            .height;
911
                        warn!("Got block request by height ({block_height}) for unknown block. Own tip height is {own_tip_height}.");
1✔
912
                        self.punish(NegativePeerSanction::BlockRequestUnknownHeight)
1✔
913
                            .await?;
1✔
914

915
                        return Ok(KEEP_CONNECTION_ALIVE);
1✔
916
                    };
917

918
                    let canonical_chain_block: Block = self
15✔
919
                        .global_state_lock
15✔
920
                        .lock_guard()
15✔
921
                        .await
15✔
922
                        .chain
923
                        .archival_state()
15✔
924
                        .get_block(canonical_block_digest)
15✔
925
                        .await?
15✔
926
                        .unwrap();
15✔
927

928
                    PeerMessage::Block(Box::new(canonical_chain_block.try_into().unwrap()))
15✔
929
                };
930

931
                debug!("Sending block");
15✔
932
                peer.send(block_response).await?;
15✔
933
                debug!("Sent block");
15✔
934
                Ok(KEEP_CONNECTION_ALIVE)
15✔
935
            }
936
            PeerMessage::Block(t_block) => {
32✔
937
                log_slow_scope!(fn_name!() + "::PeerMessage::Block");
32✔
938

939
                info!(
32✔
940
                    "Got new block from peer {}, height {}, mined {}",
×
941
                    self.peer_address,
942
                    t_block.header.height,
943
                    t_block.header.timestamp.standard_format()
×
944
                );
945
                let new_block_height = t_block.header.height;
32✔
946

947
                let block = match Block::try_from(*t_block) {
32✔
948
                    Ok(block) => Box::new(block),
32✔
949
                    Err(e) => {
×
950
                        warn!("Peer sent invalid block: {e:?}");
×
951
                        self.punish(NegativePeerSanction::InvalidTransferBlock)
×
952
                            .await?;
×
953

954
                        return Ok(KEEP_CONNECTION_ALIVE);
×
955
                    }
956
                };
957

958
                // Update the value for the highest known height that peer possesses iff
959
                // we are not in a fork reconciliation state.
960
                if peer_state_info.fork_reconciliation_blocks.is_empty() {
32✔
961
                    peer_state_info.highest_shared_block_height = new_block_height;
22✔
962
                }
22✔
963

964
                self.try_ensure_path(block, peer, peer_state_info).await?;
32✔
965

966
                // Reward happens as part of `try_ensure_path`
967

968
                Ok(KEEP_CONNECTION_ALIVE)
31✔
969
            }
970
            PeerMessage::BlockRequestBatch(BlockRequestBatch {
971
                known_blocks,
8✔
972
                max_response_len,
8✔
973
                anchor,
8✔
974
            }) => {
975
                debug!(
8✔
976
                    "Received BlockRequestBatch from peer {}, max_response_len: {max_response_len}",
×
977
                    self.peer_address
978
                );
979

980
                if known_blocks.len() > MAX_NUM_DIGESTS_IN_BATCH_REQUEST {
8✔
981
                    self.punish(NegativePeerSanction::BatchBlocksRequestTooManyDigests)
×
982
                        .await?;
×
983

984
                    return Ok(KEEP_CONNECTION_ALIVE);
×
985
                }
8✔
986

987
                // The last block in the list of the peers known block is the
988
                // earliest block, block with lowest height, the peer has
989
                // requested. If it does not belong to canonical chain, none of
990
                // the later will. So we can do an early abort in that case.
991
                let least_preferred = match known_blocks.last() {
8✔
992
                    Some(least_preferred) => *least_preferred,
8✔
993
                    None => {
994
                        self.punish(NegativePeerSanction::BatchBlocksRequestEmpty)
×
995
                            .await?;
×
996

997
                        return Ok(KEEP_CONNECTION_ALIVE);
×
998
                    }
999
                };
1000

1001
                let state = self.global_state_lock.lock_guard().await;
8✔
1002
                let block_mmr_num_leafs = state.chain.light_state().header().height.next().into();
8✔
1003
                let luca_is_known = state
8✔
1004
                    .chain
8✔
1005
                    .archival_state()
8✔
1006
                    .block_belongs_to_canonical_chain(least_preferred)
8✔
1007
                    .await;
8✔
1008
                if !luca_is_known || anchor.num_leafs() > block_mmr_num_leafs {
8✔
1009
                    drop(state);
×
1010
                    self.punish(NegativePeerSanction::BatchBlocksUnknownRequest)
×
1011
                        .await?;
×
1012
                    peer.send(PeerMessage::UnableToSatisfyBatchRequest).await?;
×
1013

1014
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1015
                }
8✔
1016

1017
                // Happy case: At least *one* of the blocks referenced by peer
1018
                // is known to us.
1019
                let first_block_in_response = {
8✔
1020
                    let mut first_block_in_response: Option<BlockHeight> = None;
8✔
1021
                    for block_digest in known_blocks {
10✔
1022
                        if state
10✔
1023
                            .chain
10✔
1024
                            .archival_state()
10✔
1025
                            .block_belongs_to_canonical_chain(block_digest)
10✔
1026
                            .await
10✔
1027
                        {
1028
                            let height = state
8✔
1029
                                .chain
8✔
1030
                                .archival_state()
8✔
1031
                                .get_block_header(block_digest)
8✔
1032
                                .await
8✔
1033
                                .unwrap()
8✔
1034
                                .height;
1035
                            first_block_in_response = Some(height);
8✔
1036
                            debug!(
8✔
1037
                                "Found block in canonical chain for batch response: {}",
×
1038
                                block_digest
1039
                            );
1040
                            break;
8✔
1041
                        }
2✔
1042
                    }
1043

1044
                    first_block_in_response
8✔
1045
                        .expect("existence of LUCA should have been established already.")
8✔
1046
                };
1047

1048
                debug!(
8✔
1049
                    "Peer's most preferred block has height {first_block_in_response}.\
×
1050
                 Now building response from that height."
×
1051
                );
1052

1053
                // Get the relevant blocks, at most batch-size many, descending from the
1054
                // peer's (alleged) most canonical block. Don't exceed `max_response_len`
1055
                // or `STANDARD_BLOCK_BATCH_SIZE` number of blocks in response.
1056
                let max_response_len = cmp::min(
8✔
1057
                    max_response_len,
8✔
1058
                    self.global_state_lock.cli().sync_mode_threshold,
8✔
1059
                );
1060
                let max_response_len = cmp::max(max_response_len, MINIMUM_BLOCK_BATCH_SIZE);
8✔
1061
                let max_response_len = cmp::min(max_response_len, STANDARD_BLOCK_BATCH_SIZE);
8✔
1062

1063
                let mut digests_of_returned_blocks = Vec::with_capacity(max_response_len);
8✔
1064
                let response_start_height: u64 = first_block_in_response.into();
8✔
1065
                let mut i: u64 = 1;
8✔
1066
                while digests_of_returned_blocks.len() < max_response_len {
31✔
1067
                    let block_height = response_start_height + i;
31✔
1068
                    match state
31✔
1069
                        .chain
31✔
1070
                        .archival_state()
31✔
1071
                        .archival_block_mmr
31✔
1072
                        .ammr()
31✔
1073
                        .try_get_leaf(block_height)
31✔
1074
                        .await
31✔
1075
                    {
1076
                        Some(digest) => {
23✔
1077
                            digests_of_returned_blocks.push(digest);
23✔
1078
                        }
23✔
1079
                        None => break,
8✔
1080
                    }
1081
                    i += 1;
23✔
1082
                }
1083

1084
                let mut returned_blocks: Vec<Block> =
8✔
1085
                    Vec::with_capacity(digests_of_returned_blocks.len());
8✔
1086
                for block_digest in digests_of_returned_blocks {
31✔
1087
                    let block = state
23✔
1088
                        .chain
23✔
1089
                        .archival_state()
23✔
1090
                        .get_block(block_digest)
23✔
1091
                        .await?
23✔
1092
                        .unwrap();
23✔
1093
                    returned_blocks.push(block);
23✔
1094
                }
1095

1096
                let response = Self::batch_response(&state, returned_blocks, &anchor).await;
8✔
1097

1098
                // issue 457. do not hold lock across a peer.send(), nor self.punish()
1099
                drop(state);
8✔
1100

1101
                let Some(response) = response else {
8✔
1102
                    warn!("Unable to satisfy batch-block request");
×
1103
                    self.punish(NegativePeerSanction::BatchBlocksUnknownRequest)
×
1104
                        .await?;
×
1105
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1106
                };
1107

1108
                debug!("Returning {} blocks in batch response", response.len());
8✔
1109

1110
                let response = PeerMessage::BlockResponseBatch(response);
8✔
1111
                peer.send(response).await?;
8✔
1112

1113
                Ok(KEEP_CONNECTION_ALIVE)
8✔
1114
            }
1115
            PeerMessage::BlockResponseBatch(authenticated_blocks) => {
×
1116
                log_slow_scope!(fn_name!() + "::PeerMessage::BlockResponseBatch");
×
1117

1118
                debug!(
×
1119
                    "handling block response batch with {} blocks",
×
1120
                    authenticated_blocks.len()
×
1121
                );
1122

1123
                // (Alan:) why is there even a minimum?
1124
                if authenticated_blocks.len() < MINIMUM_BLOCK_BATCH_SIZE {
×
1125
                    warn!("Got smaller batch response than allowed");
×
1126
                    self.punish(NegativePeerSanction::TooShortBlockBatch)
×
1127
                        .await?;
×
1128
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1129
                }
×
1130

1131
                // Verify that we are in fact in syncing mode
1132
                // TODO: Separate peer messages into those allowed under syncing
1133
                // and those that are not
1134
                let Some(sync_anchor) = self
×
1135
                    .global_state_lock
×
1136
                    .lock_guard()
×
1137
                    .await
×
1138
                    .net
1139
                    .sync_anchor
1140
                    .clone()
×
1141
                else {
1142
                    warn!("Received a batch of blocks without being in syncing mode");
×
1143
                    self.punish(NegativePeerSanction::ReceivedBatchBlocksOutsideOfSync)
×
1144
                        .await?;
×
1145
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1146
                };
1147

1148
                // Verify that the response matches the current state
1149
                // We get the latest block from the DB here since this message is
1150
                // only valid for archival nodes.
1151
                let (first_block, _) = &authenticated_blocks[0];
×
1152
                let first_blocks_parent_digest: Digest = first_block.header.prev_block_digest;
×
1153
                let most_canonical_own_block_match: Option<Block> = self
×
1154
                    .global_state_lock
×
1155
                    .lock_guard()
×
1156
                    .await
×
1157
                    .chain
1158
                    .archival_state()
×
1159
                    .get_block(first_blocks_parent_digest)
×
1160
                    .await
×
1161
                    .expect("Block lookup must succeed");
×
1162
                let most_canonical_own_block_match: Block = match most_canonical_own_block_match {
×
1163
                    Some(block) => block,
×
1164
                    None => {
1165
                        warn!("Got batch response with invalid start block");
×
1166
                        self.punish(NegativePeerSanction::BatchBlocksInvalidStartHeight)
×
1167
                            .await?;
×
1168
                        return Ok(KEEP_CONNECTION_ALIVE);
×
1169
                    }
1170
                };
1171

1172
                // Convert all blocks to Block objects
1173
                debug!(
×
1174
                    "Found own block of height {} to match received batch",
×
1175
                    most_canonical_own_block_match.kernel.header.height
×
1176
                );
1177
                let mut received_blocks = vec![];
×
1178
                for (t_block, membership_proof) in authenticated_blocks {
×
1179
                    let Ok(block) = Block::try_from(t_block) else {
×
1180
                        warn!("Received invalid transfer block from peer");
×
1181
                        self.punish(NegativePeerSanction::InvalidTransferBlock)
×
1182
                            .await?;
×
1183
                        return Ok(KEEP_CONNECTION_ALIVE);
×
1184
                    };
1185

1186
                    if !membership_proof.verify(
×
1187
                        block.header().height.into(),
×
1188
                        block.hash(),
×
1189
                        &sync_anchor.block_mmr.peaks(),
×
1190
                        sync_anchor.block_mmr.num_leafs(),
×
1191
                    ) {
×
1192
                        warn!("Authentication of received block fails relative to anchor");
×
1193
                        self.punish(NegativePeerSanction::InvalidBlockMmrAuthentication)
×
1194
                            .await?;
×
1195
                        return Ok(KEEP_CONNECTION_ALIVE);
×
1196
                    }
×
1197

1198
                    received_blocks.push(block);
×
1199
                }
1200

1201
                // Get the latest block that we know of and handle all received blocks
1202
                self.handle_blocks(received_blocks, most_canonical_own_block_match)
×
1203
                    .await?;
×
1204

1205
                // Reward happens as part of `handle_blocks`.
1206

1207
                Ok(KEEP_CONNECTION_ALIVE)
×
1208
            }
1209
            PeerMessage::UnableToSatisfyBatchRequest => {
1210
                log_slow_scope!(fn_name!() + "::PeerMessage::UnableToSatisfyBatchRequest");
×
1211
                warn!(
×
1212
                    "Peer {} reports inability to satisfy batch request.",
×
1213
                    self.peer_address
1214
                );
1215

1216
                Ok(KEEP_CONNECTION_ALIVE)
×
1217
            }
1218
            PeerMessage::Handshake(_) => {
1219
                log_slow_scope!(fn_name!() + "::PeerMessage::Handshake");
×
1220

1221
                // The handshake should have been sent during connection
1222
                // initialization. Here it is out of order at best, malicious at
1223
                // worst.
1224
                self.punish(NegativePeerSanction::InvalidMessage).await?;
×
1225
                Ok(KEEP_CONNECTION_ALIVE)
×
1226
            }
1227
            PeerMessage::ConnectionStatus(_) => {
1228
                log_slow_scope!(fn_name!() + "::PeerMessage::ConnectionStatus");
×
1229

1230
                // The connection status should have been sent during connection
1231
                // initialization. Here it is out of order at best, malicious at
1232
                // worst.
1233

1234
                self.punish(NegativePeerSanction::InvalidMessage).await?;
×
1235
                Ok(KEEP_CONNECTION_ALIVE)
×
1236
            }
1237
            PeerMessage::Transaction(transaction) => {
5✔
1238
                log_slow_scope!(fn_name!() + "::PeerMessage::Transaction");
5✔
1239

1240
                debug!(
5✔
1241
                    "`peer_loop` received following transaction from peer. {} inputs, {} outputs. Synced to mutator set hash: {}",
×
1242
                    transaction.kernel.inputs.len(),
×
1243
                    transaction.kernel.outputs.len(),
×
1244
                    transaction.kernel.mutator_set_hash
×
1245
                );
1246

1247
                let transaction: Transaction = (*transaction).into();
5✔
1248

1249
                // 1. If transaction is invalid, punish.
1250
                if !transaction
5✔
1251
                    .is_valid(self.global_state_lock.cli().network)
5✔
1252
                    .await
5✔
1253
                {
1254
                    warn!("Received invalid tx");
×
1255
                    self.punish(NegativePeerSanction::InvalidTransaction)
×
1256
                        .await?;
×
1257
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1258
                }
5✔
1259

1260
                // 2. If transaction has coinbase, punish.
1261
                // Transactions received from peers have not been mined yet.
1262
                // Only the miner is allowed to produce transactions with non-empty coinbase fields.
1263
                if transaction.kernel.coinbase.is_some() {
5✔
1264
                    warn!("Received non-mined transaction with coinbase.");
×
1265
                    self.punish(NegativePeerSanction::NonMinedTransactionHasCoinbase)
×
1266
                        .await?;
×
1267
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1268
                }
5✔
1269

1270
                // 3. If negative fee, punish.
1271
                if transaction.kernel.fee.is_negative() {
5✔
1272
                    warn!("Received negative-fee transaction.");
×
1273
                    self.punish(NegativePeerSanction::TransactionWithNegativeFee)
×
1274
                        .await?;
×
1275
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1276
                }
5✔
1277

1278
                // 4. If transaction is already known, ignore.
1279
                if self
5✔
1280
                    .global_state_lock
5✔
1281
                    .lock_guard()
5✔
1282
                    .await
5✔
1283
                    .mempool
1284
                    .contains_with_higher_proof_quality(
5✔
1285
                        transaction.kernel.txid(),
5✔
1286
                        transaction.proof.proof_quality()?,
5✔
1287
                    )
1288
                {
1289
                    warn!("Received transaction that was already known");
×
1290

1291
                    // We received a transaction that we *probably* haven't requested.
1292
                    // Consider punishing here, if this is abused.
1293
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1294
                }
5✔
1295

1296
                // 5. if transaction is not confirmable, punish.
1297
                let (tip, mutator_set_accumulator_after) = {
5✔
1298
                    let state = self.global_state_lock.lock_guard().await;
5✔
1299

1300
                    (
5✔
1301
                        state.chain.light_state().hash(),
5✔
1302
                        state
5✔
1303
                            .chain
5✔
1304
                            .light_state()
5✔
1305
                            .mutator_set_accumulator_after()
5✔
1306
                            .expect("Block from state must have mutator set after"),
5✔
1307
                    )
5✔
1308
                };
1309
                if !transaction.is_confirmable_relative_to(&mutator_set_accumulator_after) {
5✔
1310
                    warn!(
×
1311
                        "Received unconfirmable transaction with TXID {}. Unconfirmable because:",
×
1312
                        transaction.kernel.txid()
×
1313
                    );
1314
                    // get fine-grained error code for informative logging
1315
                    let confirmability_error_code = transaction
×
1316
                        .kernel
×
1317
                        .is_confirmable_relative_to(&mutator_set_accumulator_after);
×
1318
                    match confirmability_error_code {
×
1319
                        Ok(_) => unreachable!(),
×
1320
                        Err(TransactionConfirmabilityError::InvalidRemovalRecord(index)) => {
×
1321
                            warn!("invalid removal record (at index {index})");
×
1322
                            let invalid_removal_record = transaction.kernel.inputs[index].clone();
×
1323
                            let removal_record_error_code = invalid_removal_record
×
1324
                                .validate_inner(&mutator_set_accumulator_after);
×
1325
                            debug!(
×
1326
                                "Absolute index set of removal record {index}: {:?}",
×
1327
                                invalid_removal_record.absolute_indices
1328
                            );
1329
                            match removal_record_error_code {
×
1330
                                Ok(_) => unreachable!(),
×
1331
                                Err(RemovalRecordValidityError::AbsentAuthenticatedChunk) => {
1332
                                    debug!("invalid because membership proof is missing");
×
1333
                                }
1334
                                Err(RemovalRecordValidityError::InvalidSwbfiMmrMp {
1335
                                    chunk_index,
×
1336
                                }) => {
1337
                                    debug!("invalid because membership proof for chunk index {chunk_index} is invalid");
×
1338
                                }
1339
                            };
1340
                            self.punish(NegativePeerSanction::UnconfirmableTransaction)
×
1341
                                .await?;
×
1342
                            return Ok(KEEP_CONNECTION_ALIVE);
×
1343
                        }
1344
                        Err(TransactionConfirmabilityError::DuplicateInputs) => {
1345
                            warn!("duplicate inputs");
×
1346
                            self.punish(NegativePeerSanction::DoubleSpendingTransaction)
×
1347
                                .await?;
×
1348
                            return Ok(KEEP_CONNECTION_ALIVE);
×
1349
                        }
1350
                        Err(TransactionConfirmabilityError::AlreadySpentInput(index)) => {
×
1351
                            warn!("already spent input (at index {index})");
×
1352
                            self.punish(NegativePeerSanction::DoubleSpendingTransaction)
×
1353
                                .await?;
×
1354
                            return Ok(KEEP_CONNECTION_ALIVE);
×
1355
                        }
1356
                    };
1357
                }
5✔
1358

1359
                // If transaction cannot be applied to mutator set, punish.
1360
                // I don't think this can happen when above checks pass but we include
1361
                // the check to ensure that transaction can be applied.
1362
                let ms_update = MutatorSetUpdate::new(
5✔
1363
                    transaction.kernel.inputs.clone(),
5✔
1364
                    transaction.kernel.outputs.clone(),
5✔
1365
                );
1366
                let can_apply = ms_update
5✔
1367
                    .apply_to_accumulator(&mut mutator_set_accumulator_after.clone())
5✔
1368
                    .is_ok();
5✔
1369
                if !can_apply {
5✔
1370
                    warn!("Cannot apply transaction to current mutator set.");
×
1371
                    warn!("Transaction ID: {}", transaction.kernel.txid());
×
1372
                    self.punish(NegativePeerSanction::CannotApplyTransactionToMutatorSet)
×
1373
                        .await?;
×
1374
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1375
                }
5✔
1376

1377
                let tx_timestamp = transaction.kernel.timestamp;
5✔
1378

1379
                // 6. Ignore if transaction is too old
1380
                let now = self.now();
5✔
1381
                if tx_timestamp < now - Timestamp::seconds(MEMPOOL_TX_THRESHOLD_AGE_IN_SECS) {
5✔
1382
                    // TODO: Consider punishing here
1383
                    warn!("Received too old tx");
×
1384
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1385
                }
5✔
1386

1387
                // 7. Ignore if transaction is too far into the future
1388
                if tx_timestamp
5✔
1389
                    > now + Timestamp::seconds(MEMPOOL_IGNORE_TRANSACTIONS_THIS_MANY_SECS_AHEAD)
5✔
1390
                {
1391
                    // TODO: Consider punishing here
1392
                    warn!("Received tx too far into the future. Got timestamp: {tx_timestamp:?}");
×
1393
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1394
                }
5✔
1395

1396
                // Otherwise, relay to main
1397
                let pt2m_transaction = PeerTaskToMainTransaction {
5✔
1398
                    transaction,
5✔
1399
                    confirmable_for_block: tip,
5✔
1400
                };
5✔
1401
                self.to_main_tx
5✔
1402
                    .send(PeerTaskToMain::Transaction(Box::new(pt2m_transaction)))
5✔
1403
                    .await?;
5✔
1404

1405
                Ok(KEEP_CONNECTION_ALIVE)
5✔
1406
            }
1407
            PeerMessage::TransactionNotification(tx_notification) => {
12✔
1408
                // addresses #457
1409
                // new scope for state read-lock to avoid holding across peer.send()
1410
                {
1411
                    log_slow_scope!(fn_name!() + "::PeerMessage::TransactionNotification");
12✔
1412

1413
                    // 1. Ignore if we already know this transaction, and
1414
                    // the proof quality is not higher than what we already know.
1415
                    let state = self.global_state_lock.lock_guard().await;
12✔
1416
                    let transaction_of_same_or_higher_proof_quality_is_known =
12✔
1417
                        state.mempool.contains_with_higher_proof_quality(
12✔
1418
                            tx_notification.txid,
12✔
1419
                            tx_notification.proof_quality,
12✔
1420
                        );
1421
                    if transaction_of_same_or_higher_proof_quality_is_known {
12✔
1422
                        debug!("transaction with same or higher proof quality was already known");
7✔
1423
                        return Ok(KEEP_CONNECTION_ALIVE);
7✔
1424
                    }
5✔
1425

1426
                    // Only accept transactions that do not require executing
1427
                    // `update`.
1428
                    if state
5✔
1429
                        .chain
5✔
1430
                        .light_state()
5✔
1431
                        .mutator_set_accumulator_after()
5✔
1432
                        .expect("Block from state must have mutator set after")
5✔
1433
                        .hash()
5✔
1434
                        != tx_notification.mutator_set_hash
5✔
1435
                    {
1436
                        debug!("transaction refers to non-canonical mutator set state");
×
1437
                        return Ok(KEEP_CONNECTION_ALIVE);
×
1438
                    }
5✔
1439
                }
1440

1441
                // 2. Request the actual `Transaction` from peer
1442
                debug!("requesting transaction from peer");
5✔
1443
                peer.send(PeerMessage::TransactionRequest(tx_notification.txid))
5✔
1444
                    .await?;
5✔
1445

1446
                Ok(KEEP_CONNECTION_ALIVE)
5✔
1447
            }
1448
            PeerMessage::TransactionRequest(transaction_identifier) => {
5✔
1449
                let state = self.global_state_lock.lock_guard().await;
5✔
1450
                let Some(transaction) = state.mempool.get(transaction_identifier) else {
5✔
1451
                    return Ok(KEEP_CONNECTION_ALIVE);
1✔
1452
                };
1453

1454
                let Ok(transfer_transaction) = transaction.try_into() else {
4✔
1455
                    warn!("Peer requested transaction that cannot be converted to transfer object");
×
1456
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1457
                };
1458

1459
                // Drop state immediately to prevent holding over a response.
1460
                drop(state);
4✔
1461

1462
                peer.send(PeerMessage::Transaction(Box::new(transfer_transaction)))
4✔
1463
                    .await?;
4✔
1464

1465
                Ok(KEEP_CONNECTION_ALIVE)
4✔
1466
            }
1467
            PeerMessage::BlockProposalNotification(block_proposal_notification) => {
1✔
1468
                let verdict = self
1✔
1469
                    .global_state_lock
1✔
1470
                    .lock_guard()
1✔
1471
                    .await
1✔
1472
                    .favor_incoming_block_proposal_legacy(
1✔
1473
                        block_proposal_notification.height,
1✔
1474
                        block_proposal_notification.guesser_fee,
1✔
1475
                    );
1476
                match verdict {
1✔
1477
                    Ok(_) => {
1478
                        peer.send(PeerMessage::BlockProposalRequest(
1✔
1479
                            BlockProposalRequest::new(block_proposal_notification.body_mast_hash),
1✔
1480
                        ))
1✔
1481
                        .await?
1✔
1482
                    }
1483
                    Err(reject_reason) => {
×
1484
                        info!(
×
1485
                        "Rejecting notification of block proposal with guesser fee {} from peer \
×
1486
                        {}. Reason:\n{reject_reason}",
×
1487
                        block_proposal_notification.guesser_fee.display_n_decimals(5),
×
1488
                        self.peer_address
1489
                    )
1490
                    }
1491
                }
1492

1493
                Ok(KEEP_CONNECTION_ALIVE)
1✔
1494
            }
1495
            PeerMessage::BlockProposalRequest(block_proposal_request) => {
×
1496
                let matching_proposal = self
×
1497
                    .global_state_lock
×
1498
                    .lock_guard()
×
1499
                    .await
×
1500
                    .mining_state
1501
                    .block_proposal
1502
                    .filter(|x| x.body().mast_hash() == block_proposal_request.body_mast_hash)
×
1503
                    .map(|x| x.to_owned());
×
1504
                if let Some(proposal) = matching_proposal {
×
1505
                    peer.send(PeerMessage::BlockProposal(Box::new(proposal)))
×
1506
                        .await?;
×
1507
                } else {
1508
                    self.punish(NegativePeerSanction::BlockProposalNotFound)
×
1509
                        .await?;
×
1510
                }
1511

1512
                Ok(KEEP_CONNECTION_ALIVE)
×
1513
            }
1514
            PeerMessage::BlockProposal(new_proposal) => {
1✔
1515
                info!("Got block proposal from peer.");
1✔
1516

1517
                // Is the proposal valid?
1518
                // Lock needs to be held here because race conditions: otherwise
1519
                // the block proposal that was validated might not match with
1520
                // the one whose favorability is being computed.
1521
                let state = self.global_state_lock.lock_guard().await;
1✔
1522
                let tip = state.chain.light_state();
1✔
1523
                let proposal_is_valid = new_proposal
1✔
1524
                    .is_valid(tip, self.now(), self.global_state_lock.cli().network)
1✔
1525
                    .await;
1✔
1526
                if !proposal_is_valid {
1✔
1527
                    drop(state);
×
1528
                    self.punish(NegativePeerSanction::InvalidBlockProposal)
×
1529
                        .await?;
×
1530
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1531
                }
1✔
1532

1533
                // Is block proposal favorable?
1534
                let is_favorable = state.favor_incoming_block_proposal(
1✔
1535
                    new_proposal.header().prev_block_digest,
1✔
1536
                    new_proposal
1✔
1537
                        .total_guesser_reward()
1✔
1538
                        .expect("Block was validated"),
1✔
1539
                );
1✔
1540
                drop(state);
1✔
1541

1542
                if let Err(rejection_reason) = is_favorable {
1✔
1543
                    match rejection_reason {
×
1544
                        // no need to punish and log if the fees are equal.  we just ignore the incoming proposal.
1545
                        BlockProposalRejectError::InsufficientFee { current, received }
×
1546
                            if Some(received) == current =>
×
1547
                        {
1548
                            debug!("ignoring new block proposal because the fee is equal to the present one");
×
1549
                            return Ok(KEEP_CONNECTION_ALIVE);
×
1550
                        }
1551
                        _ => {
1552
                            warn!("Rejecting new block proposal:\n{rejection_reason}");
×
1553
                            self.punish(NegativePeerSanction::NonFavorableBlockProposal)
×
1554
                                .await?;
×
1555
                            return Ok(KEEP_CONNECTION_ALIVE);
×
1556
                        }
1557
                    }
1558
                };
1✔
1559

1560
                self.send_to_main(PeerTaskToMain::BlockProposal(new_proposal), line!())
1✔
1561
                    .await?;
1✔
1562

1563
                // Valuable, new, hard-to-produce information. Reward peer.
1564
                self.reward(PositivePeerSanction::NewBlockProposal).await?;
1✔
1565

1566
                Ok(KEEP_CONNECTION_ALIVE)
1✔
1567
            }
1568
        }
1569
    }
147✔
1570

1571
    /// send msg to main via mpsc channel `to_main_tx` and logs if slow.
1572
    ///
1573
    /// the channel could potentially fill up in which case the send() will
1574
    /// block until there is capacity.  we wrap the send() so we can log if
1575
    /// that ever happens to the extent it passes slow-scope threshold.
1576
    async fn send_to_main(
1✔
1577
        &self,
1✔
1578
        msg: PeerTaskToMain,
1✔
1579
        line: u32,
1✔
1580
    ) -> Result<(), tokio::sync::mpsc::error::SendError<PeerTaskToMain>> {
1✔
1581
        // we measure across the send() in case the channel ever fills up.
1582
        log_slow_scope!(fn_name!() + &format!("peer_loop.rs:{}", line));
1✔
1583

1584
        self.to_main_tx.send(msg).await
1✔
1585
    }
1✔
1586

1587
    /// Handle message from main task. The boolean return value indicates if
1588
    /// the connection should be closed.
1589
    ///
1590
    /// Locking:
1591
    ///   * acquires `global_state_lock` for write via Self::punish()
1592
    async fn handle_main_task_message<S>(
29✔
1593
        &mut self,
29✔
1594
        msg: MainToPeerTask,
29✔
1595
        peer: &mut S,
29✔
1596
        peer_state_info: &mut MutablePeerState,
29✔
1597
    ) -> Result<bool>
29✔
1598
    where
29✔
1599
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
29✔
1600
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
29✔
1601
        <S as TryStream>::Error: std::error::Error,
29✔
1602
    {
29✔
1603
        debug!("Handling {} message from main in peer loop", msg.get_type());
29✔
1604
        match msg {
29✔
1605
            MainToPeerTask::Block(block) => {
23✔
1606
                // We don't currently differentiate whether a new block came from a peer, or from our
1607
                // own miner. It's always shared through this logic.
1608
                let new_block_height = block.kernel.header.height;
23✔
1609
                if new_block_height > peer_state_info.highest_shared_block_height {
23✔
1610
                    debug!("Sending PeerMessage::BlockNotification");
12✔
1611
                    peer_state_info.highest_shared_block_height = new_block_height;
12✔
1612
                    peer.send(PeerMessage::BlockNotification(block.as_ref().into()))
12✔
1613
                        .await?;
12✔
1614
                    debug!("Sent PeerMessage::BlockNotification");
12✔
1615
                }
11✔
1616
                Ok(KEEP_CONNECTION_ALIVE)
23✔
1617
            }
1618
            MainToPeerTask::RequestBlockBatch(batch_block_request) => {
×
1619
                // Only ask one of the peers about the batch of blocks
1620
                if batch_block_request.peer_addr_target != self.peer_address {
×
1621
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1622
                }
×
1623

1624
                let max_response_len = std::cmp::min(
×
1625
                    STANDARD_BLOCK_BATCH_SIZE,
1626
                    self.global_state_lock.cli().sync_mode_threshold,
×
1627
                );
1628

1629
                peer.send(PeerMessage::BlockRequestBatch(BlockRequestBatch {
×
1630
                    known_blocks: batch_block_request.known_blocks,
×
1631
                    max_response_len,
×
1632
                    anchor: batch_block_request.anchor_mmr,
×
1633
                }))
×
1634
                .await?;
×
1635

1636
                Ok(KEEP_CONNECTION_ALIVE)
×
1637
            }
1638
            MainToPeerTask::PeerSynchronizationTimeout(socket_addr) => {
×
1639
                log_slow_scope!(fn_name!() + "::MainToPeerTask::PeerSynchronizationTimeout");
×
1640

1641
                if self.peer_address != socket_addr {
×
1642
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1643
                }
×
1644

1645
                self.punish(NegativePeerSanction::SynchronizationTimeout)
×
1646
                    .await?;
×
1647

1648
                // If this peer failed the last synchronization attempt, we only
1649
                // sanction, we don't disconnect.
1650
                Ok(KEEP_CONNECTION_ALIVE)
×
1651
            }
1652
            MainToPeerTask::MakePeerDiscoveryRequest => {
1653
                peer.send(PeerMessage::PeerListRequest).await?;
×
1654
                Ok(KEEP_CONNECTION_ALIVE)
×
1655
            }
1656
            MainToPeerTask::Disconnect(peer_address) => {
×
1657
                log_slow_scope!(fn_name!() + "::MainToPeerTask::Disconnect");
×
1658

1659
                // Only disconnect from the peer the main task requested a disconnect for.
1660
                if peer_address != self.peer_address {
×
1661
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1662
                }
×
1663
                self.register_peer_disconnection().await;
×
1664

1665
                Ok(DISCONNECT_CONNECTION)
×
1666
            }
1667
            MainToPeerTask::DisconnectAll() => {
1668
                self.register_peer_disconnection().await;
×
1669

1670
                Ok(DISCONNECT_CONNECTION)
×
1671
            }
1672
            MainToPeerTask::MakeSpecificPeerDiscoveryRequest(target_socket_addr) => {
×
1673
                if target_socket_addr == self.peer_address {
×
1674
                    peer.send(PeerMessage::PeerListRequest).await?;
×
1675
                }
×
1676
                Ok(KEEP_CONNECTION_ALIVE)
×
1677
            }
1678
            MainToPeerTask::TransactionNotification(transaction_notification) => {
6✔
1679
                debug!("Sending PeerMessage::TransactionNotification");
6✔
1680
                peer.send(PeerMessage::TransactionNotification(
6✔
1681
                    transaction_notification,
6✔
1682
                ))
6✔
1683
                .await?;
6✔
1684
                debug!("Sent PeerMessage::TransactionNotification");
6✔
1685
                Ok(KEEP_CONNECTION_ALIVE)
6✔
1686
            }
1687
            MainToPeerTask::BlockProposalNotification(block_proposal_notification) => {
×
1688
                debug!("Sending PeerMessage::BlockProposalNotification");
×
1689
                peer.send(PeerMessage::BlockProposalNotification(
×
1690
                    block_proposal_notification,
×
1691
                ))
×
1692
                .await?;
×
1693
                debug!("Sent PeerMessage::BlockProposalNotification");
×
1694
                Ok(KEEP_CONNECTION_ALIVE)
×
1695
            }
1696
        }
1697
    }
29✔
1698

1699
    /// Loop for the peer tasks. Awaits either a message from the peer over TCP,
1700
    /// or a message from main over the main-to-peer-tasks broadcast channel.
1701
    async fn run<S>(
47✔
1702
        &mut self,
47✔
1703
        mut peer: S,
47✔
1704
        mut from_main_rx: broadcast::Receiver<MainToPeerTask>,
47✔
1705
        peer_state_info: &mut MutablePeerState,
47✔
1706
    ) -> Result<()>
47✔
1707
    where
47✔
1708
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
47✔
1709
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
47✔
1710
        <S as TryStream>::Error: std::error::Error,
47✔
1711
    {
47✔
1712
        loop {
1713
            select! {
182✔
1714
                // Handle peer messages
1715
                peer_message = peer.try_next() => {
182✔
1716
                    let peer_address = self.peer_address;
147✔
1717
                    let peer_message = match peer_message {
147✔
1718
                        Ok(message) => message,
147✔
1719
                        Err(err) => {
×
1720
                            let msg = format!("Error when receiving from peer: {peer_address}");
×
1721
                            error!("{msg}. Error: {err}");
×
1722
                            bail!("{msg}. Closing connection.");
×
1723
                        }
1724
                    };
1725
                    let Some(peer_message) = peer_message else {
147✔
1726
                        info!("Peer {peer_address} closed connection.");
×
1727
                        break;
×
1728
                    };
1729

1730
                    let syncing =
147✔
1731
                        self.global_state_lock.lock(|s| s.net.sync_anchor.is_some()).await;
147✔
1732
                    let message_type = peer_message.get_type();
147✔
1733
                    if peer_message.ignore_during_sync() && syncing {
147✔
1734
                        debug!(
×
1735
                            "Ignoring {message_type} message when syncing, from {peer_address}",
×
1736
                        );
1737
                        continue;
×
1738
                    }
147✔
1739
                    if peer_message.ignore_when_not_sync() && !syncing {
147✔
1740
                        debug!(
×
1741
                            "Ignoring {message_type} message when not syncing, from {peer_address}",
×
1742
                        );
1743
                        continue;
×
1744
                    }
147✔
1745

1746
                    match self
147✔
1747
                        .handle_peer_message(peer_message, &mut peer, peer_state_info)
147✔
1748
                        .await
147✔
1749
                    {
1750
                        Ok(false) => {}
106✔
1751
                        Ok(true) => {
1752
                            info!("Closing connection to {peer_address}");
40✔
1753
                            break;
40✔
1754
                        }
1755
                        Err(err) => {
1✔
1756
                            warn!("Closing connection to {peer_address} because of error {err}.");
1✔
1757
                            bail!("{err}");
1✔
1758
                        }
1759
                    };
1760
                }
1761

1762
                // Handle messages from main task
1763
                main_msg_res = from_main_rx.recv() => {
182✔
1764
                    let main_msg = main_msg_res.unwrap_or_else(|err| {
29✔
1765
                        let err_msg = format!("Failed to read from main loop: {err}");
×
1766
                        error!(err_msg);
×
1767
                        panic!("{err_msg}");
×
1768
                    });
1769
                    let close_connection = self
29✔
1770
                        .handle_main_task_message(main_msg, &mut peer, peer_state_info)
29✔
1771
                        .await
29✔
1772
                        .unwrap_or_else(|err| {
29✔
1773
                            warn!("handle_main_task_message returned an error: {err}");
×
1774
                            true
×
1775
                        });
×
1776

1777
                    if close_connection {
29✔
1778
                        info!(
×
1779
                            "handle_main_task_message is closing the connection to {}",
×
1780
                            self.peer_address
1781
                        );
1782
                        break;
×
1783
                    }
29✔
1784
                }
1785
            }
1786
        }
1787

1788
        Ok(())
40✔
1789
    }
41✔
1790

1791
    /// Function called before entering the peer loop. Reads the potentially stored
1792
    /// peer standing from the database and does other book-keeping before entering
1793
    /// its final resting place: the `peer_loop`. Note that the peer has already been
1794
    /// accepted for a connection for this loop to be entered. So we don't need
1795
    /// to check the standing again.
1796
    ///
1797
    /// Locking:
1798
    ///   * acquires `global_state_lock` for write
1799
    pub(crate) async fn run_wrapper<S>(
38✔
1800
        &mut self,
38✔
1801
        mut peer: S,
38✔
1802
        from_main_rx: broadcast::Receiver<MainToPeerTask>,
38✔
1803
    ) -> Result<()>
38✔
1804
    where
38✔
1805
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
38✔
1806
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
38✔
1807
        <S as TryStream>::Error: std::error::Error,
38✔
1808
    {
38✔
1809
        const TIME_DIFFERENCE_WARN_THRESHOLD_IN_SECONDS: i128 = 120;
1810

1811
        let cli_args = self.global_state_lock.cli().clone();
38✔
1812

1813
        let standing = self
38✔
1814
            .global_state_lock
38✔
1815
            .lock_guard()
38✔
1816
            .await
38✔
1817
            .net
1818
            .peer_databases
1819
            .peer_standings
1820
            .get(self.peer_address.ip())
38✔
1821
            .await
38✔
1822
            .unwrap_or_else(|| PeerStanding::new(cli_args.peer_tolerance));
37✔
1823

1824
        // Add peer to peer map
1825
        let peer_connection_info = PeerConnectionInfo::new(
37✔
1826
            self.peer_handshake_data.listen_port,
37✔
1827
            self.peer_address,
37✔
1828
            self.inbound_connection,
37✔
1829
        );
1830
        let new_peer = PeerInfo::new(
37✔
1831
            peer_connection_info,
37✔
1832
            &self.peer_handshake_data,
37✔
1833
            SystemTime::now(),
37✔
1834
            cli_args.peer_tolerance,
37✔
1835
        )
1836
        .with_standing(standing);
37✔
1837

1838
        // If timestamps are different, we currently just log a warning.
1839
        let peer_clock_ahead_in_seconds = new_peer.time_difference_in_seconds();
37✔
1840
        let own_clock_ahead_in_seconds = -peer_clock_ahead_in_seconds;
37✔
1841
        if peer_clock_ahead_in_seconds > TIME_DIFFERENCE_WARN_THRESHOLD_IN_SECONDS
37✔
1842
            || own_clock_ahead_in_seconds > TIME_DIFFERENCE_WARN_THRESHOLD_IN_SECONDS
37✔
1843
        {
1844
            let own_datetime_utc: DateTime<Utc> =
×
1845
                new_peer.own_timestamp_connection_established.into();
×
1846
            let peer_datetime_utc: DateTime<Utc> =
×
1847
                new_peer.peer_timestamp_connection_established.into();
×
1848
            warn!(
×
1849
                "New peer {} disagrees with us about time. Peer reports time {} but our clock at handshake was {}.",
×
1850
                new_peer.connected_address(),
×
1851
                peer_datetime_utc.format("%Y-%m-%d %H:%M:%S"),
×
1852
                own_datetime_utc.format("%Y-%m-%d %H:%M:%S"));
×
1853
        }
37✔
1854

1855
        // Multiple tasks might attempt to set up a connection concurrently. So
1856
        // even though we've checked that this connection is allowed, this check
1857
        // could have been invalidated by another task, for one accepting an
1858
        // incoming connection from a peer we're currently connecting to. So we
1859
        // need to make the a check again while holding a write-lock, since
1860
        // we're modifying `peer_map` here. Holding a read-lock doesn't work
1861
        // since it would have to be dropped before acquiring the write-lock.
1862
        {
1863
            let mut global_state = self.global_state_lock.lock_guard_mut().await;
37✔
1864
            let peer_map = &mut global_state.net.peer_map;
37✔
1865
            if peer_map
37✔
1866
                .values()
37✔
1867
                .any(|pi| pi.instance_id() == self.peer_handshake_data.instance_id)
37✔
1868
            {
1869
                bail!("Attempted to connect to already connected peer. Aborting connection.");
×
1870
            }
37✔
1871

1872
            if peer_map.len() >= cli_args.max_num_peers {
37✔
1873
                bail!("Attempted to connect to more peers than allowed. Aborting connection.");
×
1874
            }
37✔
1875

1876
            if peer_map.contains_key(&self.peer_address) {
37✔
1877
                // This shouldn't be possible, unless the peer reports a different instance ID than
1878
                // for the other connection. Only a malignant client would do that.
1879
                bail!("Already connected to peer. Aborting connection");
×
1880
            }
37✔
1881

1882
            peer_map.insert(self.peer_address, new_peer);
37✔
1883
        }
1884

1885
        // `MutablePeerState` contains the part of the peer-loop's state that is mutable
1886
        let mut peer_state = MutablePeerState::new(self.peer_handshake_data.tip_header.height);
37✔
1887

1888
        // If peer indicates more canonical block, request a block notification to catch up ASAP
1889
        if self.peer_handshake_data.tip_header.cumulative_proof_of_work
37✔
1890
            > self
37✔
1891
                .global_state_lock
37✔
1892
                .lock_guard()
37✔
1893
                .await
37✔
1894
                .chain
1895
                .light_state()
37✔
1896
                .kernel
1897
                .header
1898
                .cumulative_proof_of_work
1899
        {
1900
            // Send block notification request to catch up ASAP, in case we're
1901
            // behind the newly-connected peer.
1902
            peer.send(PeerMessage::BlockNotificationRequest).await?;
×
1903
        }
37✔
1904

1905
        let res = self.run(peer, from_main_rx, &mut peer_state).await;
37✔
1906
        debug!("Exited peer loop for {}", self.peer_address);
31✔
1907

1908
        close_peer_connected_callback(
31✔
1909
            self.global_state_lock.clone(),
31✔
1910
            self.peer_address,
31✔
1911
            &self.to_main_tx,
31✔
1912
        )
31✔
1913
        .await;
31✔
1914

1915
        debug!("Ending peer loop for {}", self.peer_address);
31✔
1916

1917
        // Return any error that `run` returned. Returning and not suppressing errors is a quite nice
1918
        // feature to have for testing purposes.
1919
        res
31✔
1920
    }
31✔
1921

1922
    /// Register graceful peer disconnection in the global state.
1923
    ///
1924
    /// See also [`NetworkingState::register_peer_disconnection`][1].
1925
    ///
1926
    /// # Locking:
1927
    ///   * acquires `global_state_lock` for write
1928
    ///
1929
    /// [1]: crate::models::state::networking_state::NetworkingState::register_peer_disconnection
1930
    async fn register_peer_disconnection(&mut self) {
×
1931
        let peer_id = self.peer_handshake_data.instance_id;
×
1932
        self.global_state_lock
×
1933
            .lock_guard_mut()
×
1934
            .await
×
1935
            .net
1936
            .register_peer_disconnection(peer_id, SystemTime::now());
×
1937
    }
×
1938
}
1939

1940
#[cfg(test)]
1941
#[cfg_attr(coverage_nightly, coverage(off))]
1942
mod tests {
1943
    use macro_rules_attr::apply;
1944
    use rand::rngs::StdRng;
1945
    use rand::Rng;
1946
    use rand::SeedableRng;
1947
    use tokio::sync::mpsc::error::TryRecvError;
1948
    use tracing_test::traced_test;
1949

1950
    use super::*;
1951
    use crate::config_models::cli_args;
1952
    use crate::config_models::network::Network;
1953
    use crate::models::blockchain::type_scripts::native_currency_amount::NativeCurrencyAmount;
1954
    use crate::models::peer::peer_block_notifications::PeerBlockNotification;
1955
    use crate::models::peer::transaction_notification::TransactionNotification;
1956
    use crate::models::state::mempool::TransactionOrigin;
1957
    use crate::models::state::tx_creation_config::TxCreationConfig;
1958
    use crate::models::state::tx_proving_capability::TxProvingCapability;
1959
    use crate::models::state::wallet::wallet_entropy::WalletEntropy;
1960
    use crate::tests::shared::fake_valid_block_for_tests;
1961
    use crate::tests::shared::fake_valid_sequence_of_blocks_for_tests;
1962
    use crate::tests::shared::get_dummy_handshake_data_for_genesis;
1963
    use crate::tests::shared::get_dummy_peer_connection_data_genesis;
1964
    use crate::tests::shared::get_dummy_socket_address;
1965
    use crate::tests::shared::get_test_genesis_setup;
1966
    use crate::tests::shared::invalid_empty_single_proof_transaction;
1967
    use crate::tests::shared::Action;
1968
    use crate::tests::shared::Mock;
1969
    use crate::tests::shared_tokio_runtime;
1970

1971
    #[traced_test]
1972
    #[apply(shared_tokio_runtime)]
1973
    async fn test_peer_loop_bye() -> Result<()> {
1974
        let mock = Mock::new(vec![Action::Read(PeerMessage::Bye)]);
1975

1976
        let (peer_broadcast_tx, _from_main_rx_clone, to_main_tx, _to_main_rx1, state_lock, hsd) =
1977
            get_test_genesis_setup(Network::Beta, 2, cli_args::Args::default()).await?;
1978

1979
        let peer_address = get_dummy_socket_address(2);
1980
        let from_main_rx_clone = peer_broadcast_tx.subscribe();
1981
        let mut peer_loop_handler =
1982
            PeerLoopHandler::new(to_main_tx, state_lock.clone(), peer_address, hsd, true, 1);
1983
        peer_loop_handler
1984
            .run_wrapper(mock, from_main_rx_clone)
1985
            .await?;
1986

1987
        assert_eq!(
1988
            2,
1989
            state_lock.lock_guard().await.net.peer_map.len(),
1990
            "peer map length must be back to 2 after goodbye"
1991
        );
1992

1993
        Ok(())
1994
    }
1995

1996
    #[traced_test]
1997
    #[apply(shared_tokio_runtime)]
1998
    async fn test_peer_loop_peer_list() {
1999
        let (peer_broadcast_tx, _from_main_rx_clone, to_main_tx, _to_main_rx1, state_lock, _hsd) =
2000
            get_test_genesis_setup(Network::Beta, 2, cli_args::Args::default())
2001
                .await
2002
                .unwrap();
2003

2004
        let mut peer_infos = state_lock
2005
            .lock_guard()
2006
            .await
2007
            .net
2008
            .peer_map
2009
            .clone()
2010
            .into_values()
2011
            .collect::<Vec<_>>();
2012
        peer_infos.sort_by_cached_key(|x| x.connected_address());
2013
        let (peer_address0, instance_id0) = (
2014
            peer_infos[0].connected_address(),
2015
            peer_infos[0].instance_id(),
2016
        );
2017
        let (peer_address1, instance_id1) = (
2018
            peer_infos[1].connected_address(),
2019
            peer_infos[1].instance_id(),
2020
        );
2021

2022
        let (hsd2, sa2) = get_dummy_peer_connection_data_genesis(Network::Beta, 2);
2023
        let expected_response = vec![
2024
            (peer_address0, instance_id0),
2025
            (peer_address1, instance_id1),
2026
            (sa2, hsd2.instance_id),
2027
        ];
2028
        let mock = Mock::new(vec![
2029
            Action::Read(PeerMessage::PeerListRequest),
2030
            Action::Write(PeerMessage::PeerListResponse(expected_response)),
2031
            Action::Read(PeerMessage::Bye),
2032
        ]);
2033

2034
        let from_main_rx_clone = peer_broadcast_tx.subscribe();
2035

2036
        let mut peer_loop_handler =
2037
            PeerLoopHandler::new(to_main_tx, state_lock.clone(), sa2, hsd2, true, 0);
2038
        peer_loop_handler
2039
            .run_wrapper(mock, from_main_rx_clone)
2040
            .await
2041
            .unwrap();
2042

2043
        assert_eq!(
2044
            2,
2045
            state_lock.lock_guard().await.net.peer_map.len(),
2046
            "peer map must have length 2 after saying goodbye to peer 2"
2047
        );
2048
    }
2049

2050
    #[traced_test]
2051
    #[apply(shared_tokio_runtime)]
2052
    async fn different_genesis_test() -> Result<()> {
2053
        // In this scenario a peer provides another genesis block than what has been
2054
        // hardcoded. This should lead to the closing of the connection to this peer
2055
        // and a ban.
2056

2057
        let network = Network::Main;
2058
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, mut to_main_rx1, state_lock, hsd) =
2059
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2060
        assert_eq!(1000, state_lock.cli().peer_tolerance);
2061
        let peer_address = get_dummy_socket_address(0);
2062

2063
        // Although the database is empty, `get_latest_block` still returns the genesis block,
2064
        // since that block is hardcoded.
2065
        let mut different_genesis_block = state_lock
2066
            .lock_guard()
2067
            .await
2068
            .chain
2069
            .archival_state()
2070
            .get_tip()
2071
            .await;
2072

2073
        different_genesis_block.set_header_nonce(StdRng::seed_from_u64(5550001).random());
2074
        let [block_1_with_different_genesis] = fake_valid_sequence_of_blocks_for_tests(
2075
            &different_genesis_block,
2076
            Timestamp::hours(1),
2077
            StdRng::seed_from_u64(5550001).random(),
2078
            network,
2079
        )
2080
        .await;
2081
        let mock = Mock::new(vec![Action::Read(PeerMessage::Block(Box::new(
2082
            block_1_with_different_genesis.try_into().unwrap(),
2083
        )))]);
2084

2085
        let mut peer_loop_handler = PeerLoopHandler::new(
2086
            to_main_tx.clone(),
2087
            state_lock.clone(),
2088
            peer_address,
2089
            hsd,
2090
            true,
2091
            1,
2092
        );
2093
        let res = peer_loop_handler
2094
            .run_wrapper(mock, from_main_rx_clone)
2095
            .await;
2096
        assert!(
2097
            res.is_err(),
2098
            "run_wrapper must return failure when genesis is different"
2099
        );
2100

2101
        match to_main_rx1.recv().await {
2102
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
2103
            _ => bail!("Must receive remove of peer block max height"),
2104
        }
2105

2106
        // Verify that no further message was sent to main loop
2107
        match to_main_rx1.try_recv() {
2108
            Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (),
2109
            _ => bail!("Block notification must not be sent for block with invalid PoW"),
2110
        };
2111

2112
        drop(to_main_tx);
2113

2114
        let peer_standing = state_lock
2115
            .lock_guard()
2116
            .await
2117
            .net
2118
            .get_peer_standing_from_database(peer_address.ip())
2119
            .await;
2120
        assert_eq!(
2121
            -i32::from(state_lock.cli().peer_tolerance),
2122
            peer_standing.unwrap().standing
2123
        );
2124
        assert_eq!(
2125
            NegativePeerSanction::DifferentGenesis,
2126
            peer_standing.unwrap().latest_punishment.unwrap().0
2127
        );
2128

2129
        Ok(())
2130
    }
2131

2132
    #[traced_test]
2133
    #[apply(shared_tokio_runtime)]
2134
    async fn node_does_not_record_disconnection_time_when_peer_initiates_disconnect() -> Result<()>
2135
    {
2136
        let args = cli_args::Args::default();
2137
        let network = args.network;
2138
        let (from_main_tx, from_main_rx, to_main_tx, to_main_rx, state_lock, _) =
2139
            get_test_genesis_setup(network, 0, args).await?;
2140

2141
        let peer_address = get_dummy_socket_address(0);
2142
        let peer_handshake_data = get_dummy_handshake_data_for_genesis(network);
2143
        let peer_id = peer_handshake_data.instance_id;
2144
        let mut peer_loop_handler = PeerLoopHandler::new(
2145
            to_main_tx,
2146
            state_lock.clone(),
2147
            peer_address,
2148
            peer_handshake_data,
2149
            true,
2150
            1,
2151
        );
2152
        let mock = Mock::new(vec![Action::Read(PeerMessage::Bye)]);
2153
        peer_loop_handler.run_wrapper(mock, from_main_rx).await?;
2154

2155
        let global_state = state_lock.lock_guard().await;
2156
        assert!(global_state
2157
            .net
2158
            .last_disconnection_time_of_peer(peer_id)
2159
            .is_none());
2160

2161
        drop(to_main_rx);
2162
        drop(from_main_tx);
2163

2164
        Ok(())
2165
    }
2166

2167
    #[traced_test]
2168
    #[apply(shared_tokio_runtime)]
2169
    async fn block_without_valid_pow_test() -> Result<()> {
2170
        // In this scenario, a block without a valid PoW is received. This block should be rejected
2171
        // by the peer loop and a notification should never reach the main loop.
2172

2173
        let network = Network::Main;
2174
        let (peer_broadcast_tx, _from_main_rx_clone, to_main_tx, mut to_main_rx1, state_lock, hsd) =
2175
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2176
        let peer_address = get_dummy_socket_address(0);
2177
        let genesis_block: Block = state_lock
2178
            .lock_guard()
2179
            .await
2180
            .chain
2181
            .archival_state()
2182
            .get_tip()
2183
            .await;
2184

2185
        // Make a with hash above what the implied threshold from
2186
        let [mut block_without_valid_pow] = fake_valid_sequence_of_blocks_for_tests(
2187
            &genesis_block,
2188
            Timestamp::hours(1),
2189
            StdRng::seed_from_u64(5550001).random(),
2190
            network,
2191
        )
2192
        .await;
2193

2194
        // This *probably* is invalid PoW -- and needs to be for this test to
2195
        // work.
2196
        block_without_valid_pow.set_header_nonce(Digest::default());
2197

2198
        // Sending an invalid block will not necessarily result in a ban. This depends on the peer
2199
        // tolerance that is set in the client. For this reason, we include a "Bye" here.
2200
        let mock = Mock::new(vec![
2201
            Action::Read(PeerMessage::Block(Box::new(
2202
                block_without_valid_pow.clone().try_into().unwrap(),
2203
            ))),
2204
            Action::Read(PeerMessage::Bye),
2205
        ]);
2206

2207
        let from_main_rx_clone = peer_broadcast_tx.subscribe();
2208

2209
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
2210
            to_main_tx.clone(),
2211
            state_lock.clone(),
2212
            peer_address,
2213
            hsd,
2214
            true,
2215
            1,
2216
            block_without_valid_pow.header().timestamp,
2217
        );
2218
        peer_loop_handler
2219
            .run_wrapper(mock, from_main_rx_clone)
2220
            .await
2221
            .expect("sending (one) invalid block should not result in closed connection");
2222

2223
        match to_main_rx1.recv().await {
2224
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
2225
            _ => bail!("Must receive remove of peer block max height"),
2226
        }
2227

2228
        // Verify that no further message was sent to main loop
2229
        match to_main_rx1.try_recv() {
2230
            Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (),
2231
            _ => bail!("Block notification must not be sent for block with invalid PoW"),
2232
        };
2233

2234
        // We need to have the transmitter in scope until we have received from it
2235
        // otherwise the receiver will report the disconnected error when we attempt
2236
        // to read from it. And the purpose is to verify that the channel is empty,
2237
        // not that it has been closed.
2238
        drop(to_main_tx);
2239

2240
        // Verify that peer standing was stored in database
2241
        let standing = state_lock
2242
            .lock_guard()
2243
            .await
2244
            .net
2245
            .peer_databases
2246
            .peer_standings
2247
            .get(peer_address.ip())
2248
            .await
2249
            .unwrap();
2250
        assert!(
2251
            standing.standing < 0,
2252
            "Peer must be sanctioned for sending a bad block"
2253
        );
2254

2255
        Ok(())
2256
    }
2257

2258
    #[traced_test]
2259
    #[apply(shared_tokio_runtime)]
2260
    async fn test_peer_loop_block_with_block_in_db() -> Result<()> {
2261
        // The scenario tested here is that a client receives a block that is already
2262
        // known and stored. The expected behavior is to ignore the block and not send
2263
        // a message to the main task.
2264

2265
        let network = Network::Main;
2266
        let (peer_broadcast_tx, _from_main_rx_clone, to_main_tx, mut to_main_rx1, mut alice, hsd) =
2267
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2268
        let peer_address = get_dummy_socket_address(0);
2269
        let genesis_block: Block = Block::genesis(network);
2270

2271
        let now = genesis_block.header().timestamp + Timestamp::hours(1);
2272
        let block_1 =
2273
            fake_valid_block_for_tests(&alice, StdRng::seed_from_u64(5550001).random()).await;
2274
        assert!(
2275
            block_1.is_valid(&genesis_block, now, network).await,
2276
            "Block must be valid for this test to make sense"
2277
        );
2278
        alice.set_new_tip(block_1.clone()).await?;
2279

2280
        let mock_peer_messages = Mock::new(vec![
2281
            Action::Read(PeerMessage::Block(Box::new(
2282
                block_1.clone().try_into().unwrap(),
2283
            ))),
2284
            Action::Read(PeerMessage::Bye),
2285
        ]);
2286

2287
        let from_main_rx_clone = peer_broadcast_tx.subscribe();
2288

2289
        let mut alice_peer_loop_handler = PeerLoopHandler::with_mocked_time(
2290
            to_main_tx.clone(),
2291
            alice.clone(),
2292
            peer_address,
2293
            hsd,
2294
            false,
2295
            1,
2296
            block_1.header().timestamp,
2297
        );
2298
        alice_peer_loop_handler
2299
            .run_wrapper(mock_peer_messages, from_main_rx_clone)
2300
            .await?;
2301

2302
        match to_main_rx1.recv().await {
2303
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
2304
            other => bail!("Must receive remove of peer block max height. Got:\n {other:?}"),
2305
        }
2306
        match to_main_rx1.try_recv() {
2307
            Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (),
2308
            _ => bail!("Block notification must not be sent for block with invalid PoW"),
2309
        };
2310
        drop(to_main_tx);
2311

2312
        if !alice.lock_guard().await.net.peer_map.is_empty() {
2313
            bail!("peer map must be empty after closing connection gracefully");
2314
        }
2315

2316
        Ok(())
2317
    }
2318

2319
    #[traced_test]
2320
    #[apply(shared_tokio_runtime)]
2321
    async fn block_request_batch_simple() {
2322
        // Scenario: Six blocks (including genesis) are known. Peer requests
2323
        // from all possible starting points, and client responds with the
2324
        // correct list of blocks.
2325
        let network = Network::Main;
2326
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, mut state_lock, hsd) =
2327
            get_test_genesis_setup(network, 0, cli_args::Args::default())
2328
                .await
2329
                .unwrap();
2330
        let genesis_block: Block = Block::genesis(network);
2331
        let peer_address = get_dummy_socket_address(0);
2332
        let [block_1, block_2, block_3, block_4, block_5] =
2333
            fake_valid_sequence_of_blocks_for_tests(
2334
                &genesis_block,
2335
                Timestamp::hours(1),
2336
                StdRng::seed_from_u64(5550001).random(),
2337
                network,
2338
            )
2339
            .await;
2340
        let blocks = vec![
2341
            genesis_block,
2342
            block_1,
2343
            block_2,
2344
            block_3,
2345
            block_4,
2346
            block_5.clone(),
2347
        ];
2348
        for block in blocks.iter().skip(1) {
2349
            state_lock.set_new_tip(block.to_owned()).await.unwrap();
2350
        }
2351

2352
        let mmra = state_lock
2353
            .lock_guard()
2354
            .await
2355
            .chain
2356
            .archival_state()
2357
            .archival_block_mmr
2358
            .ammr()
2359
            .to_accumulator_async()
2360
            .await;
2361
        for i in 0..=4 {
2362
            let expected_response = {
2363
                let state = state_lock.lock_guard().await;
2364
                let blocks_for_response = blocks.iter().skip(i + 1).cloned().collect_vec();
2365
                PeerLoopHandler::batch_response(&state, blocks_for_response, &mmra)
2366
                    .await
2367
                    .unwrap()
2368
            };
2369
            let mock = Mock::new(vec![
2370
                Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch {
2371
                    known_blocks: vec![blocks[i].hash()],
2372
                    max_response_len: 14,
2373
                    anchor: mmra.clone(),
2374
                })),
2375
                Action::Write(PeerMessage::BlockResponseBatch(expected_response)),
2376
                Action::Read(PeerMessage::Bye),
2377
            ]);
2378
            let mut peer_loop_handler = PeerLoopHandler::new(
2379
                to_main_tx.clone(),
2380
                state_lock.clone(),
2381
                peer_address,
2382
                hsd.clone(),
2383
                false,
2384
                1,
2385
            );
2386

2387
            peer_loop_handler
2388
                .run_wrapper(mock, from_main_rx_clone.resubscribe())
2389
                .await
2390
                .unwrap();
2391
        }
2392
    }
2393

2394
    #[traced_test]
2395
    #[apply(shared_tokio_runtime)]
2396
    async fn block_request_batch_in_order_test() -> Result<()> {
2397
        // Scenario: A fork began at block 2, node knows two blocks of height 2 and two of height 3.
2398
        // A peer requests a batch of blocks starting from block 1. Ensure that the correct blocks
2399
        // are returned.
2400

2401
        let network = Network::Main;
2402
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, mut state_lock, hsd) =
2403
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2404
        let genesis_block: Block = Block::genesis(network);
2405
        let peer_address = get_dummy_socket_address(0);
2406
        let [block_1, block_2_a, block_3_a] = fake_valid_sequence_of_blocks_for_tests(
2407
            &genesis_block,
2408
            Timestamp::hours(1),
2409
            StdRng::seed_from_u64(5550001).random(),
2410
            network,
2411
        )
2412
        .await;
2413
        let [block_2_b, block_3_b] = fake_valid_sequence_of_blocks_for_tests(
2414
            &block_1,
2415
            Timestamp::hours(1),
2416
            StdRng::seed_from_u64(5550002).random(),
2417
            network,
2418
        )
2419
        .await;
2420
        assert_ne!(block_2_b.hash(), block_2_a.hash());
2421

2422
        state_lock.set_new_tip(block_1.clone()).await?;
2423
        state_lock.set_new_tip(block_2_a.clone()).await?;
2424
        state_lock.set_new_tip(block_2_b.clone()).await?;
2425
        state_lock.set_new_tip(block_3_b.clone()).await?;
2426
        state_lock.set_new_tip(block_3_a.clone()).await?;
2427

2428
        let anchor = state_lock
2429
            .lock_guard()
2430
            .await
2431
            .chain
2432
            .archival_state()
2433
            .archival_block_mmr
2434
            .ammr()
2435
            .to_accumulator_async()
2436
            .await;
2437
        let response_1 = {
2438
            let state_lock = state_lock.lock_guard().await;
2439
            PeerLoopHandler::batch_response(
2440
                &state_lock,
2441
                vec![block_1.clone(), block_2_a.clone(), block_3_a.clone()],
2442
                &anchor,
2443
            )
2444
            .await
2445
            .unwrap()
2446
        };
2447

2448
        let mut mock = Mock::new(vec![
2449
            Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch {
2450
                known_blocks: vec![genesis_block.hash()],
2451
                max_response_len: 14,
2452
                anchor: anchor.clone(),
2453
            })),
2454
            Action::Write(PeerMessage::BlockResponseBatch(response_1)),
2455
            Action::Read(PeerMessage::Bye),
2456
        ]);
2457

2458
        let mut peer_loop_handler_1 = PeerLoopHandler::with_mocked_time(
2459
            to_main_tx.clone(),
2460
            state_lock.clone(),
2461
            peer_address,
2462
            hsd.clone(),
2463
            false,
2464
            1,
2465
            block_3_a.header().timestamp,
2466
        );
2467

2468
        peer_loop_handler_1
2469
            .run_wrapper(mock, from_main_rx_clone.resubscribe())
2470
            .await?;
2471

2472
        // Peer knows block 2_b, verify that canonical chain with 2_a is returned
2473
        let response_2 = {
2474
            let state_lock = state_lock.lock_guard().await;
2475
            PeerLoopHandler::batch_response(
2476
                &state_lock,
2477
                vec![block_2_a, block_3_a.clone()],
2478
                &anchor,
2479
            )
2480
            .await
2481
            .unwrap()
2482
        };
2483
        mock = Mock::new(vec![
2484
            Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch {
2485
                known_blocks: vec![block_2_b.hash(), block_1.hash(), genesis_block.hash()],
2486
                max_response_len: 14,
2487
                anchor,
2488
            })),
2489
            Action::Write(PeerMessage::BlockResponseBatch(response_2)),
2490
            Action::Read(PeerMessage::Bye),
2491
        ]);
2492

2493
        let mut peer_loop_handler_2 = PeerLoopHandler::with_mocked_time(
2494
            to_main_tx.clone(),
2495
            state_lock.clone(),
2496
            peer_address,
2497
            hsd,
2498
            false,
2499
            1,
2500
            block_3_a.header().timestamp,
2501
        );
2502

2503
        peer_loop_handler_2
2504
            .run_wrapper(mock, from_main_rx_clone)
2505
            .await?;
2506

2507
        Ok(())
2508
    }
2509

2510
    #[traced_test]
2511
    #[apply(shared_tokio_runtime)]
2512
    async fn block_request_batch_out_of_order_test() -> Result<()> {
2513
        // Scenario: A fork began at block 2, node knows two blocks of height 2 and two of height 3.
2514
        // A peer requests a batch of blocks starting from block 1, but the peer supplies their
2515
        // hashes in a wrong order. Ensure that the correct blocks are returned, in the right order.
2516
        // The blocks will be supplied in the correct order but starting from the first digest in
2517
        // the list that is known and canonical.
2518

2519
        let network = Network::Main;
2520
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, mut state_lock, hsd) =
2521
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2522
        let genesis_block = Block::genesis(network);
2523
        let peer_address = get_dummy_socket_address(0);
2524
        let [block_1, block_2_a, block_3_a] = fake_valid_sequence_of_blocks_for_tests(
2525
            &genesis_block,
2526
            Timestamp::hours(1),
2527
            StdRng::seed_from_u64(5550001).random(),
2528
            network,
2529
        )
2530
        .await;
2531
        let [block_2_b, block_3_b] = fake_valid_sequence_of_blocks_for_tests(
2532
            &block_1,
2533
            Timestamp::hours(1),
2534
            StdRng::seed_from_u64(5550002).random(),
2535
            network,
2536
        )
2537
        .await;
2538
        assert_ne!(block_2_a.hash(), block_2_b.hash());
2539

2540
        state_lock.set_new_tip(block_1.clone()).await?;
2541
        state_lock.set_new_tip(block_2_a.clone()).await?;
2542
        state_lock.set_new_tip(block_2_b.clone()).await?;
2543
        state_lock.set_new_tip(block_3_b.clone()).await?;
2544
        state_lock.set_new_tip(block_3_a.clone()).await?;
2545

2546
        // Peer knows block 2_b, verify that canonical chain with 2_a is returned
2547
        let mut expected_anchor = block_3_a.body().block_mmr_accumulator.clone();
2548
        expected_anchor.append(block_3_a.hash());
2549
        let state_anchor = state_lock
2550
            .lock_guard()
2551
            .await
2552
            .chain
2553
            .archival_state()
2554
            .archival_block_mmr
2555
            .ammr()
2556
            .to_accumulator_async()
2557
            .await;
2558
        assert_eq!(
2559
            expected_anchor, state_anchor,
2560
            "Catching assumption about MMRA in tip and in archival state"
2561
        );
2562

2563
        let response = {
2564
            let state_lock = state_lock.lock_guard().await;
2565
            PeerLoopHandler::batch_response(
2566
                &state_lock,
2567
                vec![block_1.clone(), block_2_a, block_3_a.clone()],
2568
                &expected_anchor,
2569
            )
2570
            .await
2571
            .unwrap()
2572
        };
2573
        let mock = Mock::new(vec![
2574
            Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch {
2575
                known_blocks: vec![block_2_b.hash(), genesis_block.hash(), block_1.hash()],
2576
                max_response_len: 14,
2577
                anchor: expected_anchor,
2578
            })),
2579
            // Since genesis block is the 1st known in the list of known blocks,
2580
            // it's immediate descendent, block_1, is the first one returned.
2581
            Action::Write(PeerMessage::BlockResponseBatch(response)),
2582
            Action::Read(PeerMessage::Bye),
2583
        ]);
2584

2585
        let mut peer_loop_handler_2 = PeerLoopHandler::with_mocked_time(
2586
            to_main_tx.clone(),
2587
            state_lock.clone(),
2588
            peer_address,
2589
            hsd,
2590
            false,
2591
            1,
2592
            block_3_a.header().timestamp,
2593
        );
2594

2595
        peer_loop_handler_2
2596
            .run_wrapper(mock, from_main_rx_clone)
2597
            .await?;
2598

2599
        Ok(())
2600
    }
2601

2602
    #[traced_test]
2603
    #[apply(shared_tokio_runtime)]
2604
    async fn request_unknown_height_doesnt_crash() {
2605
        // Scenario: Only genesis block is known. Peer requests block of height
2606
        // 2.
2607
        let network = Network::Main;
2608
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, state_lock, hsd) =
2609
            get_test_genesis_setup(network, 0, cli_args::Args::default())
2610
                .await
2611
                .unwrap();
2612
        let peer_address = get_dummy_socket_address(0);
2613
        let mock = Mock::new(vec![
2614
            Action::Read(PeerMessage::BlockRequestByHeight(2.into())),
2615
            Action::Read(PeerMessage::Bye),
2616
        ]);
2617

2618
        let mut peer_loop_handler = PeerLoopHandler::new(
2619
            to_main_tx.clone(),
2620
            state_lock.clone(),
2621
            peer_address,
2622
            hsd,
2623
            false,
2624
            1,
2625
        );
2626

2627
        // This will return error if seen read/write order does not match that of the
2628
        // mocked object.
2629
        peer_loop_handler
2630
            .run_wrapper(mock, from_main_rx_clone)
2631
            .await
2632
            .unwrap();
2633

2634
        // Verify that peer is sanctioned for this nonsense.
2635
        assert!(state_lock
2636
            .lock_guard()
2637
            .await
2638
            .net
2639
            .get_peer_standing_from_database(peer_address.ip())
2640
            .await
2641
            .unwrap()
2642
            .standing
2643
            .is_negative());
2644
    }
2645

2646
    #[traced_test]
2647
    #[apply(shared_tokio_runtime)]
2648
    async fn find_canonical_chain_when_multiple_blocks_at_same_height_test() -> Result<()> {
2649
        // Scenario: A fork began at block 2, node knows two blocks of height 2 and two of height 3.
2650
        // A peer requests a block at height 2. Verify that the correct block at height 2 is
2651
        // returned.
2652

2653
        let network = Network::Main;
2654
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, mut state_lock, hsd) =
2655
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2656
        let genesis_block = Block::genesis(network);
2657
        let peer_address = get_dummy_socket_address(0);
2658

2659
        let [block_1, block_2_a, block_3_a] = fake_valid_sequence_of_blocks_for_tests(
2660
            &genesis_block,
2661
            Timestamp::hours(1),
2662
            StdRng::seed_from_u64(5550001).random(),
2663
            network,
2664
        )
2665
        .await;
2666
        let [block_2_b, block_3_b] = fake_valid_sequence_of_blocks_for_tests(
2667
            &block_1,
2668
            Timestamp::hours(1),
2669
            StdRng::seed_from_u64(5550002).random(),
2670
            network,
2671
        )
2672
        .await;
2673
        assert_ne!(block_2_a.hash(), block_2_b.hash());
2674

2675
        state_lock.set_new_tip(block_1.clone()).await?;
2676
        state_lock.set_new_tip(block_2_a.clone()).await?;
2677
        state_lock.set_new_tip(block_2_b.clone()).await?;
2678
        state_lock.set_new_tip(block_3_b.clone()).await?;
2679
        state_lock.set_new_tip(block_3_a.clone()).await?;
2680

2681
        let mock = Mock::new(vec![
2682
            Action::Read(PeerMessage::BlockRequestByHeight(2.into())),
2683
            Action::Write(PeerMessage::Block(Box::new(block_2_a.try_into().unwrap()))),
2684
            Action::Read(PeerMessage::BlockRequestByHeight(3.into())),
2685
            Action::Write(PeerMessage::Block(Box::new(
2686
                block_3_a.clone().try_into().unwrap(),
2687
            ))),
2688
            Action::Read(PeerMessage::Bye),
2689
        ]);
2690

2691
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
2692
            to_main_tx.clone(),
2693
            state_lock.clone(),
2694
            peer_address,
2695
            hsd,
2696
            false,
2697
            1,
2698
            block_3_a.header().timestamp,
2699
        );
2700

2701
        // This will return error if seen read/write order does not match that of the
2702
        // mocked object.
2703
        peer_loop_handler
2704
            .run_wrapper(mock, from_main_rx_clone)
2705
            .await?;
2706

2707
        Ok(())
2708
    }
2709

2710
    #[traced_test]
2711
    #[apply(shared_tokio_runtime)]
2712
    async fn receival_of_block_notification_height_1() {
2713
        // Scenario: client only knows genesis block. Then receives block
2714
        // notification of height 1. Must request block 1.
2715
        let network = Network::Main;
2716
        let mut rng = StdRng::seed_from_u64(5552401);
2717
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, to_main_rx1, state_lock, hsd) =
2718
            get_test_genesis_setup(network, 0, cli_args::Args::default())
2719
                .await
2720
                .unwrap();
2721
        let block_1 = fake_valid_block_for_tests(&state_lock, rng.random()).await;
2722
        let notification_height1 = (&block_1).into();
2723
        let mock = Mock::new(vec![
2724
            Action::Read(PeerMessage::BlockNotification(notification_height1)),
2725
            Action::Write(PeerMessage::BlockRequestByHeight(1u64.into())),
2726
            Action::Read(PeerMessage::Bye),
2727
        ]);
2728

2729
        let peer_address = get_dummy_socket_address(0);
2730
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
2731
            to_main_tx.clone(),
2732
            state_lock.clone(),
2733
            peer_address,
2734
            hsd,
2735
            false,
2736
            1,
2737
            block_1.header().timestamp,
2738
        );
2739
        peer_loop_handler
2740
            .run_wrapper(mock, from_main_rx_clone)
2741
            .await
2742
            .unwrap();
2743

2744
        drop(to_main_rx1);
2745
    }
2746

2747
    #[traced_test]
2748
    #[apply(shared_tokio_runtime)]
2749
    async fn receive_block_request_by_height_block_7() {
2750
        // Scenario: client only knows blocks up to height 7. Then receives block-
2751
        // request-by-height for height 7. Must respond with block 7.
2752
        let network = Network::Main;
2753
        let mut rng = StdRng::seed_from_u64(5552401);
2754
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, to_main_rx1, mut state_lock, hsd) =
2755
            get_test_genesis_setup(network, 0, cli_args::Args::default())
2756
                .await
2757
                .unwrap();
2758
        let genesis_block = Block::genesis(network);
2759
        let blocks: [Block; 7] = fake_valid_sequence_of_blocks_for_tests(
2760
            &genesis_block,
2761
            Timestamp::hours(1),
2762
            rng.random(),
2763
            network,
2764
        )
2765
        .await;
2766
        let block7 = blocks.last().unwrap().to_owned();
2767
        let tip_height: u64 = block7.header().height.into();
2768
        assert_eq!(7, tip_height);
2769

2770
        for block in &blocks {
2771
            state_lock.set_new_tip(block.to_owned()).await.unwrap();
2772
        }
2773

2774
        let block7_response = PeerMessage::Block(Box::new(block7.try_into().unwrap()));
2775
        let mock = Mock::new(vec![
2776
            Action::Read(PeerMessage::BlockRequestByHeight(7u64.into())),
2777
            Action::Write(block7_response),
2778
            Action::Read(PeerMessage::Bye),
2779
        ]);
2780

2781
        let peer_address = get_dummy_socket_address(0);
2782
        let mut peer_loop_handler = PeerLoopHandler::new(
2783
            to_main_tx.clone(),
2784
            state_lock.clone(),
2785
            peer_address,
2786
            hsd,
2787
            false,
2788
            1,
2789
        );
2790
        peer_loop_handler
2791
            .run_wrapper(mock, from_main_rx_clone)
2792
            .await
2793
            .unwrap();
2794

2795
        drop(to_main_rx1);
2796
    }
2797

2798
    #[traced_test]
2799
    #[apply(shared_tokio_runtime)]
2800
    async fn test_peer_loop_receival_of_first_block() -> Result<()> {
2801
        // Scenario: client only knows genesis block. Then receives block 1.
2802

2803
        let network = Network::Main;
2804
        let mut rng = StdRng::seed_from_u64(5550001);
2805
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, mut to_main_rx1, state_lock, hsd) =
2806
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2807
        let peer_address = get_dummy_socket_address(0);
2808

2809
        let block_1 = fake_valid_block_for_tests(&state_lock, rng.random()).await;
2810
        let mock = Mock::new(vec![
2811
            Action::Read(PeerMessage::Block(Box::new(
2812
                block_1.clone().try_into().unwrap(),
2813
            ))),
2814
            Action::Read(PeerMessage::Bye),
2815
        ]);
2816

2817
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
2818
            to_main_tx.clone(),
2819
            state_lock.clone(),
2820
            peer_address,
2821
            hsd,
2822
            false,
2823
            1,
2824
            block_1.header().timestamp,
2825
        );
2826
        peer_loop_handler
2827
            .run_wrapper(mock, from_main_rx_clone)
2828
            .await?;
2829

2830
        // Verify that a block was sent to `main_loop`
2831
        match to_main_rx1.recv().await {
2832
            Some(PeerTaskToMain::NewBlocks(_block)) => (),
2833
            _ => bail!("Did not find msg sent to main task"),
2834
        };
2835

2836
        match to_main_rx1.recv().await {
2837
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
2838
            _ => bail!("Must receive remove of peer block max height"),
2839
        }
2840

2841
        if !state_lock.lock_guard().await.net.peer_map.is_empty() {
2842
            bail!("peer map must be empty after closing connection gracefully");
2843
        }
2844

2845
        Ok(())
2846
    }
2847

2848
    #[traced_test]
2849
    #[apply(shared_tokio_runtime)]
2850
    async fn test_peer_loop_receival_of_second_block_no_blocks_in_db() -> Result<()> {
2851
        // In this scenario, the client only knows the genesis block (block 0) and then
2852
        // receives block 2, meaning that block 1 will have to be requested.
2853

2854
        let network = Network::Main;
2855
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, mut to_main_rx1, state_lock, hsd) =
2856
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2857
        let peer_address = get_dummy_socket_address(0);
2858
        let genesis_block: Block = state_lock
2859
            .lock_guard()
2860
            .await
2861
            .chain
2862
            .archival_state()
2863
            .get_tip()
2864
            .await;
2865
        let [block_1, block_2] = fake_valid_sequence_of_blocks_for_tests(
2866
            &genesis_block,
2867
            Timestamp::hours(1),
2868
            StdRng::seed_from_u64(5550001).random(),
2869
            network,
2870
        )
2871
        .await;
2872

2873
        let mock = Mock::new(vec![
2874
            Action::Read(PeerMessage::Block(Box::new(
2875
                block_2.clone().try_into().unwrap(),
2876
            ))),
2877
            Action::Write(PeerMessage::BlockRequestByHash(block_1.hash())),
2878
            Action::Read(PeerMessage::Block(Box::new(
2879
                block_1.clone().try_into().unwrap(),
2880
            ))),
2881
            Action::Read(PeerMessage::Bye),
2882
        ]);
2883

2884
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
2885
            to_main_tx.clone(),
2886
            state_lock.clone(),
2887
            peer_address,
2888
            hsd,
2889
            true,
2890
            1,
2891
            block_2.header().timestamp,
2892
        );
2893
        peer_loop_handler
2894
            .run_wrapper(mock, from_main_rx_clone)
2895
            .await?;
2896

2897
        match to_main_rx1.recv().await {
2898
            Some(PeerTaskToMain::NewBlocks(blocks)) => {
2899
                if blocks[0].hash() != block_1.hash() {
2900
                    bail!("1st received block by main loop must be block 1");
2901
                }
2902
                if blocks[1].hash() != block_2.hash() {
2903
                    bail!("2nd received block by main loop must be block 2");
2904
                }
2905
            }
2906
            _ => bail!("Did not find msg sent to main task 1"),
2907
        };
2908
        match to_main_rx1.recv().await {
2909
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
2910
            _ => bail!("Must receive remove of peer block max height"),
2911
        }
2912

2913
        if !state_lock.lock_guard().await.net.peer_map.is_empty() {
2914
            bail!("peer map must be empty after closing connection gracefully");
2915
        }
2916

2917
        Ok(())
2918
    }
2919

2920
    #[traced_test]
2921
    #[apply(shared_tokio_runtime)]
2922
    async fn prevent_ram_exhaustion_test() -> Result<()> {
2923
        // In this scenario the peer sends more blocks than the client allows to store in the
2924
        // fork-reconciliation field. This should result in abandonment of the fork-reconciliation
2925
        // process as the alternative is that the program will crash because it runs out of RAM.
2926

2927
        let network = Network::Main;
2928
        let mut rng = StdRng::seed_from_u64(5550001);
2929
        let (
2930
            _peer_broadcast_tx,
2931
            from_main_rx_clone,
2932
            to_main_tx,
2933
            mut to_main_rx1,
2934
            mut state_lock,
2935
            _hsd,
2936
        ) = get_test_genesis_setup(network, 1, cli_args::Args::default()).await?;
2937
        let genesis_block = Block::genesis(network);
2938

2939
        // Restrict max number of blocks held in memory to 2.
2940
        let mut cli = state_lock.cli().clone();
2941
        cli.sync_mode_threshold = 2;
2942
        state_lock.set_cli(cli).await;
2943

2944
        let (hsd1, peer_address1) = get_dummy_peer_connection_data_genesis(Network::Beta, 1);
2945
        let [block_1, _block_2, block_3, block_4] = fake_valid_sequence_of_blocks_for_tests(
2946
            &genesis_block,
2947
            Timestamp::hours(1),
2948
            rng.random(),
2949
            network,
2950
        )
2951
        .await;
2952
        state_lock.set_new_tip(block_1.clone()).await?;
2953

2954
        let mock = Mock::new(vec![
2955
            Action::Read(PeerMessage::Block(Box::new(
2956
                block_4.clone().try_into().unwrap(),
2957
            ))),
2958
            Action::Write(PeerMessage::BlockRequestByHash(block_3.hash())),
2959
            Action::Read(PeerMessage::Block(Box::new(
2960
                block_3.clone().try_into().unwrap(),
2961
            ))),
2962
            Action::Read(PeerMessage::Bye),
2963
        ]);
2964

2965
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
2966
            to_main_tx.clone(),
2967
            state_lock.clone(),
2968
            peer_address1,
2969
            hsd1,
2970
            true,
2971
            1,
2972
            block_4.header().timestamp,
2973
        );
2974
        peer_loop_handler
2975
            .run_wrapper(mock, from_main_rx_clone)
2976
            .await?;
2977

2978
        match to_main_rx1.recv().await {
2979
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
2980
            _ => bail!("Must receive remove of peer block max height"),
2981
        }
2982

2983
        // Verify that no block is sent to main loop.
2984
        match to_main_rx1.try_recv() {
2985
            Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (),
2986
            _ => bail!("Peer must not handle more fork-reconciliation blocks than specified in CLI arguments"),
2987
        };
2988
        drop(to_main_tx);
2989

2990
        // Verify that peer is sanctioned for failed fork reconciliation attempt
2991
        assert!(state_lock
2992
            .lock_guard()
2993
            .await
2994
            .net
2995
            .get_peer_standing_from_database(peer_address1.ip())
2996
            .await
2997
            .unwrap()
2998
            .standing
2999
            .is_negative());
3000

3001
        Ok(())
3002
    }
3003

3004
    #[traced_test]
3005
    #[apply(shared_tokio_runtime)]
3006
    async fn test_peer_loop_receival_of_fourth_block_one_block_in_db() {
3007
        // In this scenario, the client know the genesis block (block 0) and block 1, it
3008
        // then receives block 4, meaning that block 3 and 2 will have to be requested.
3009

3010
        let network = Network::Main;
3011
        let (
3012
            _peer_broadcast_tx,
3013
            from_main_rx_clone,
3014
            to_main_tx,
3015
            mut to_main_rx1,
3016
            mut state_lock,
3017
            hsd,
3018
        ) = get_test_genesis_setup(network, 0, cli_args::Args::default())
3019
            .await
3020
            .unwrap();
3021
        let peer_address: SocketAddr = get_dummy_socket_address(0);
3022
        let genesis_block = Block::genesis(network);
3023
        let [block_1, block_2, block_3, block_4] = fake_valid_sequence_of_blocks_for_tests(
3024
            &genesis_block,
3025
            Timestamp::hours(1),
3026
            StdRng::seed_from_u64(5550001).random(),
3027
            network,
3028
        )
3029
        .await;
3030
        state_lock.set_new_tip(block_1.clone()).await.unwrap();
3031

3032
        let mock = Mock::new(vec![
3033
            Action::Read(PeerMessage::Block(Box::new(
3034
                block_4.clone().try_into().unwrap(),
3035
            ))),
3036
            Action::Write(PeerMessage::BlockRequestByHash(block_3.hash())),
3037
            Action::Read(PeerMessage::Block(Box::new(
3038
                block_3.clone().try_into().unwrap(),
3039
            ))),
3040
            Action::Write(PeerMessage::BlockRequestByHash(block_2.hash())),
3041
            Action::Read(PeerMessage::Block(Box::new(
3042
                block_2.clone().try_into().unwrap(),
3043
            ))),
3044
            Action::Read(PeerMessage::Bye),
3045
        ]);
3046

3047
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
3048
            to_main_tx.clone(),
3049
            state_lock.clone(),
3050
            peer_address,
3051
            hsd,
3052
            true,
3053
            1,
3054
            block_4.header().timestamp,
3055
        );
3056
        peer_loop_handler
3057
            .run_wrapper(mock, from_main_rx_clone)
3058
            .await
3059
            .unwrap();
3060

3061
        let Some(PeerTaskToMain::NewBlocks(blocks)) = to_main_rx1.recv().await else {
3062
            panic!("Did not find msg sent to main task");
3063
        };
3064
        assert_eq!(blocks[0].hash(), block_2.hash());
3065
        assert_eq!(blocks[1].hash(), block_3.hash());
3066
        assert_eq!(blocks[2].hash(), block_4.hash());
3067

3068
        let Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) = to_main_rx1.recv().await else {
3069
            panic!("Must receive remove of peer block max height");
3070
        };
3071

3072
        assert!(
3073
            state_lock.lock_guard().await.net.peer_map.is_empty(),
3074
            "peer map must be empty after closing connection gracefully"
3075
        );
3076
    }
3077

3078
    #[traced_test]
3079
    #[apply(shared_tokio_runtime)]
3080
    async fn test_peer_loop_receival_of_third_block_no_blocks_in_db() -> Result<()> {
3081
        // In this scenario, the client only knows the genesis block (block 0) and then
3082
        // receives block 3, meaning that block 2 and 1 will have to be requested.
3083

3084
        let network = Network::Main;
3085
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, mut to_main_rx1, state_lock, hsd) =
3086
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
3087
        let peer_address = get_dummy_socket_address(0);
3088
        let genesis_block = Block::genesis(network);
3089

3090
        let [block_1, block_2, block_3] = fake_valid_sequence_of_blocks_for_tests(
3091
            &genesis_block,
3092
            Timestamp::hours(1),
3093
            StdRng::seed_from_u64(5550001).random(),
3094
            network,
3095
        )
3096
        .await;
3097

3098
        let mock = Mock::new(vec![
3099
            Action::Read(PeerMessage::Block(Box::new(
3100
                block_3.clone().try_into().unwrap(),
3101
            ))),
3102
            Action::Write(PeerMessage::BlockRequestByHash(block_2.hash())),
3103
            Action::Read(PeerMessage::Block(Box::new(
3104
                block_2.clone().try_into().unwrap(),
3105
            ))),
3106
            Action::Write(PeerMessage::BlockRequestByHash(block_1.hash())),
3107
            Action::Read(PeerMessage::Block(Box::new(
3108
                block_1.clone().try_into().unwrap(),
3109
            ))),
3110
            Action::Read(PeerMessage::Bye),
3111
        ]);
3112

3113
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
3114
            to_main_tx.clone(),
3115
            state_lock.clone(),
3116
            peer_address,
3117
            hsd,
3118
            true,
3119
            1,
3120
            block_3.header().timestamp,
3121
        );
3122
        peer_loop_handler
3123
            .run_wrapper(mock, from_main_rx_clone)
3124
            .await?;
3125

3126
        match to_main_rx1.recv().await {
3127
            Some(PeerTaskToMain::NewBlocks(blocks)) => {
3128
                if blocks[0].hash() != block_1.hash() {
3129
                    bail!("1st received block by main loop must be block 1");
3130
                }
3131
                if blocks[1].hash() != block_2.hash() {
3132
                    bail!("2nd received block by main loop must be block 2");
3133
                }
3134
                if blocks[2].hash() != block_3.hash() {
3135
                    bail!("3rd received block by main loop must be block 3");
3136
                }
3137
            }
3138
            _ => bail!("Did not find msg sent to main task"),
3139
        };
3140
        match to_main_rx1.recv().await {
3141
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
3142
            _ => bail!("Must receive remove of peer block max height"),
3143
        }
3144

3145
        if !state_lock.lock_guard().await.net.peer_map.is_empty() {
3146
            bail!("peer map must be empty after closing connection gracefully");
3147
        }
3148

3149
        Ok(())
3150
    }
3151

3152
    #[traced_test]
3153
    #[apply(shared_tokio_runtime)]
3154
    async fn test_block_reconciliation_interrupted_by_block_notification() -> Result<()> {
3155
        // In this scenario, the client know the genesis block (block 0) and block 1, it
3156
        // then receives block 4, meaning that block 3, 2, and 1 will have to be requested.
3157
        // But the requests are interrupted by the peer sending another message: a new block
3158
        // notification.
3159

3160
        let network = Network::Main;
3161
        let (
3162
            _peer_broadcast_tx,
3163
            from_main_rx_clone,
3164
            to_main_tx,
3165
            mut to_main_rx1,
3166
            mut state_lock,
3167
            hsd,
3168
        ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
3169
        let peer_socket_address: SocketAddr = get_dummy_socket_address(0);
3170
        let genesis_block: Block = state_lock
3171
            .lock_guard()
3172
            .await
3173
            .chain
3174
            .archival_state()
3175
            .get_tip()
3176
            .await;
3177

3178
        let [block_1, block_2, block_3, block_4, block_5] =
3179
            fake_valid_sequence_of_blocks_for_tests(
3180
                &genesis_block,
3181
                Timestamp::hours(1),
3182
                StdRng::seed_from_u64(5550001).random(),
3183
                network,
3184
            )
3185
            .await;
3186
        state_lock.set_new_tip(block_1.clone()).await?;
3187

3188
        let mock = Mock::new(vec![
3189
            Action::Read(PeerMessage::Block(Box::new(
3190
                block_4.clone().try_into().unwrap(),
3191
            ))),
3192
            Action::Write(PeerMessage::BlockRequestByHash(block_3.hash())),
3193
            Action::Read(PeerMessage::Block(Box::new(
3194
                block_3.clone().try_into().unwrap(),
3195
            ))),
3196
            Action::Write(PeerMessage::BlockRequestByHash(block_2.hash())),
3197
            //
3198
            // Now make the interruption of the block reconciliation process
3199
            Action::Read(PeerMessage::BlockNotification((&block_5).into())),
3200
            //
3201
            // Complete the block reconciliation process by requesting the last block
3202
            // in this process, to get back to a mutually known block.
3203
            Action::Read(PeerMessage::Block(Box::new(
3204
                block_2.clone().try_into().unwrap(),
3205
            ))),
3206
            //
3207
            // Then anticipate the request of the block that was announced
3208
            // in the interruption.
3209
            // Note that we cannot anticipate the response, as only the main
3210
            // task writes to the database. And the database needs to be updated
3211
            // for the handling of block 5 to be done correctly.
3212
            Action::Write(PeerMessage::BlockRequestByHeight(
3213
                block_5.kernel.header.height,
3214
            )),
3215
            Action::Read(PeerMessage::Bye),
3216
        ]);
3217

3218
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
3219
            to_main_tx.clone(),
3220
            state_lock.clone(),
3221
            peer_socket_address,
3222
            hsd,
3223
            false,
3224
            1,
3225
            block_5.header().timestamp,
3226
        );
3227
        peer_loop_handler
3228
            .run_wrapper(mock, from_main_rx_clone)
3229
            .await?;
3230

3231
        match to_main_rx1.recv().await {
3232
            Some(PeerTaskToMain::NewBlocks(blocks)) => {
3233
                if blocks[0].hash() != block_2.hash() {
3234
                    bail!("1st received block by main loop must be block 1");
3235
                }
3236
                if blocks[1].hash() != block_3.hash() {
3237
                    bail!("2nd received block by main loop must be block 2");
3238
                }
3239
                if blocks[2].hash() != block_4.hash() {
3240
                    bail!("3rd received block by main loop must be block 3");
3241
                }
3242
            }
3243
            _ => bail!("Did not find msg sent to main task"),
3244
        };
3245
        match to_main_rx1.recv().await {
3246
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
3247
            _ => bail!("Must receive remove of peer block max height"),
3248
        }
3249

3250
        if !state_lock.lock_guard().await.net.peer_map.is_empty() {
3251
            bail!("peer map must be empty after closing connection gracefully");
3252
        }
3253

3254
        Ok(())
3255
    }
3256

3257
    #[traced_test]
3258
    #[apply(shared_tokio_runtime)]
3259
    async fn test_block_reconciliation_interrupted_by_peer_list_request() -> Result<()> {
3260
        // In this scenario, the client knows the genesis block (block 0) and block 1, it
3261
        // then receives block 4, meaning that block 3, 2, and 1 will have to be requested.
3262
        // But the requests are interrupted by the peer sending another message: a request
3263
        // for a list of peers.
3264

3265
        let network = Network::Main;
3266
        let (
3267
            _peer_broadcast_tx,
3268
            from_main_rx_clone,
3269
            to_main_tx,
3270
            mut to_main_rx1,
3271
            mut state_lock,
3272
            _hsd,
3273
        ) = get_test_genesis_setup(network, 1, cli_args::Args::default()).await?;
3274
        let genesis_block = Block::genesis(network);
3275
        let peer_infos: Vec<PeerInfo> = state_lock
3276
            .lock_guard()
3277
            .await
3278
            .net
3279
            .peer_map
3280
            .clone()
3281
            .into_values()
3282
            .collect::<Vec<_>>();
3283

3284
        let [block_1, block_2, block_3, block_4] = fake_valid_sequence_of_blocks_for_tests(
3285
            &genesis_block,
3286
            Timestamp::hours(1),
3287
            StdRng::seed_from_u64(5550001).random(),
3288
            network,
3289
        )
3290
        .await;
3291
        state_lock.set_new_tip(block_1.clone()).await?;
3292

3293
        let (hsd_1, sa_1) = get_dummy_peer_connection_data_genesis(network, 1);
3294
        let expected_peer_list_resp = vec![
3295
            (
3296
                peer_infos[0].listen_address().unwrap(),
3297
                peer_infos[0].instance_id(),
3298
            ),
3299
            (sa_1, hsd_1.instance_id),
3300
        ];
3301
        let mock = Mock::new(vec![
3302
            Action::Read(PeerMessage::Block(Box::new(
3303
                block_4.clone().try_into().unwrap(),
3304
            ))),
3305
            Action::Write(PeerMessage::BlockRequestByHash(block_3.hash())),
3306
            Action::Read(PeerMessage::Block(Box::new(
3307
                block_3.clone().try_into().unwrap(),
3308
            ))),
3309
            Action::Write(PeerMessage::BlockRequestByHash(block_2.hash())),
3310
            //
3311
            // Now make the interruption of the block reconciliation process
3312
            Action::Read(PeerMessage::PeerListRequest),
3313
            //
3314
            // Answer the request for a peer list
3315
            Action::Write(PeerMessage::PeerListResponse(expected_peer_list_resp)),
3316
            //
3317
            // Complete the block reconciliation process by requesting the last block
3318
            // in this process, to get back to a mutually known block.
3319
            Action::Read(PeerMessage::Block(Box::new(
3320
                block_2.clone().try_into().unwrap(),
3321
            ))),
3322
            Action::Read(PeerMessage::Bye),
3323
        ]);
3324

3325
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
3326
            to_main_tx,
3327
            state_lock.clone(),
3328
            sa_1,
3329
            hsd_1,
3330
            true,
3331
            1,
3332
            block_4.header().timestamp,
3333
        );
3334
        peer_loop_handler
3335
            .run_wrapper(mock, from_main_rx_clone)
3336
            .await?;
3337

3338
        // Verify that blocks are sent to `main_loop` in expected ordering
3339
        match to_main_rx1.recv().await {
3340
            Some(PeerTaskToMain::NewBlocks(blocks)) => {
3341
                if blocks[0].hash() != block_2.hash() {
3342
                    bail!("1st received block by main loop must be block 1");
3343
                }
3344
                if blocks[1].hash() != block_3.hash() {
3345
                    bail!("2nd received block by main loop must be block 2");
3346
                }
3347
                if blocks[2].hash() != block_4.hash() {
3348
                    bail!("3rd received block by main loop must be block 3");
3349
                }
3350
            }
3351
            _ => bail!("Did not find msg sent to main task"),
3352
        };
3353

3354
        assert_eq!(
3355
            1,
3356
            state_lock.lock_guard().await.net.peer_map.len(),
3357
            "One peer must remain in peer list after peer_1 closed gracefully"
3358
        );
3359

3360
        Ok(())
3361
    }
3362

3363
    #[traced_test]
3364
    #[apply(shared_tokio_runtime)]
3365
    async fn receive_transaction_request() {
3366
        let network = Network::Main;
3367
        let dummy_tx = invalid_empty_single_proof_transaction();
3368
        let txid = dummy_tx.kernel.txid();
3369

3370
        for transaction_is_known in [false, true] {
3371
            let (_peer_broadcast_tx, from_main_rx, to_main_tx, _, mut state_lock, _hsd) =
3372
                get_test_genesis_setup(network, 1, cli_args::Args::default())
3373
                    .await
3374
                    .unwrap();
3375
            if transaction_is_known {
3376
                state_lock
3377
                    .lock_guard_mut()
3378
                    .await
3379
                    .mempool_insert(dummy_tx.clone(), TransactionOrigin::Own)
3380
                    .await;
3381
            }
3382

3383
            let mock = if transaction_is_known {
3384
                Mock::new(vec![
3385
                    Action::Read(PeerMessage::TransactionRequest(txid)),
3386
                    Action::Write(PeerMessage::Transaction(Box::new(
3387
                        (&dummy_tx).try_into().unwrap(),
3388
                    ))),
3389
                    Action::Read(PeerMessage::Bye),
3390
                ])
3391
            } else {
3392
                Mock::new(vec![
3393
                    Action::Read(PeerMessage::TransactionRequest(txid)),
3394
                    Action::Read(PeerMessage::Bye),
3395
                ])
3396
            };
3397

3398
            let hsd = get_dummy_handshake_data_for_genesis(network);
3399
            let mut peer_state = MutablePeerState::new(hsd.tip_header.height);
3400
            let mut peer_loop_handler = PeerLoopHandler::new(
3401
                to_main_tx,
3402
                state_lock,
3403
                get_dummy_socket_address(0),
3404
                hsd,
3405
                true,
3406
                1,
3407
            );
3408

3409
            peer_loop_handler
3410
                .run(mock, from_main_rx, &mut peer_state)
3411
                .await
3412
                .unwrap();
3413
        }
3414
    }
3415

3416
    #[traced_test]
3417
    #[apply(shared_tokio_runtime)]
3418
    async fn empty_mempool_request_tx_test() {
3419
        // In this scenario the client receives a transaction notification from
3420
        // a peer of a transaction it doesn't know; the client must then request it.
3421

3422
        let network = Network::Main;
3423
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, mut to_main_rx1, state_lock, _hsd) =
3424
            get_test_genesis_setup(network, 1, cli_args::Args::default())
3425
                .await
3426
                .unwrap();
3427

3428
        let spending_key = state_lock
3429
            .lock_guard()
3430
            .await
3431
            .wallet_state
3432
            .wallet_entropy
3433
            .nth_symmetric_key_for_tests(0);
3434
        let genesis_block = Block::genesis(network);
3435
        let now = genesis_block.kernel.header.timestamp;
3436
        let config = TxCreationConfig::default()
3437
            .recover_change_off_chain(spending_key.into())
3438
            .with_prover_capability(TxProvingCapability::ProofCollection);
3439
        let transaction_1: Transaction = state_lock
3440
            .api()
3441
            .tx_initiator_internal()
3442
            .create_transaction(
3443
                Default::default(),
3444
                NativeCurrencyAmount::coins(0),
3445
                now,
3446
                config,
3447
            )
3448
            .await
3449
            .unwrap()
3450
            .transaction
3451
            .into();
3452

3453
        // Build the resulting transaction notification
3454
        let tx_notification: TransactionNotification = (&transaction_1).try_into().unwrap();
3455
        let mock = Mock::new(vec![
3456
            Action::Read(PeerMessage::TransactionNotification(tx_notification)),
3457
            Action::Write(PeerMessage::TransactionRequest(tx_notification.txid)),
3458
            Action::Read(PeerMessage::Transaction(Box::new(
3459
                (&transaction_1).try_into().unwrap(),
3460
            ))),
3461
            Action::Read(PeerMessage::Bye),
3462
        ]);
3463

3464
        let (hsd_1, _sa_1) = get_dummy_peer_connection_data_genesis(network, 1);
3465

3466
        // Mock a timestamp to allow transaction to be considered valid
3467
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
3468
            to_main_tx,
3469
            state_lock.clone(),
3470
            get_dummy_socket_address(0),
3471
            hsd_1.clone(),
3472
            true,
3473
            1,
3474
            now,
3475
        );
3476

3477
        let mut peer_state = MutablePeerState::new(hsd_1.tip_header.height);
3478

3479
        assert!(
3480
            state_lock.lock_guard().await.mempool.is_empty(),
3481
            "Mempool must be empty at init"
3482
        );
3483
        peer_loop_handler
3484
            .run(mock, from_main_rx_clone, &mut peer_state)
3485
            .await
3486
            .unwrap();
3487

3488
        // Transaction must be sent to `main_loop`. The transaction is stored to the mempool
3489
        // by the `main_loop`.
3490
        match to_main_rx1.recv().await {
3491
            Some(PeerTaskToMain::Transaction(_)) => (),
3492
            _ => panic!("Must receive remove of peer block max height"),
3493
        };
3494
    }
3495

3496
    #[traced_test]
3497
    #[apply(shared_tokio_runtime)]
3498
    async fn populated_mempool_request_tx_test() -> Result<()> {
3499
        // In this scenario the peer is informed of a transaction that it already knows
3500

3501
        let network = Network::Main;
3502
        let (
3503
            _peer_broadcast_tx,
3504
            from_main_rx_clone,
3505
            to_main_tx,
3506
            mut to_main_rx1,
3507
            mut state_lock,
3508
            _hsd,
3509
        ) = get_test_genesis_setup(network, 1, cli_args::Args::default())
3510
            .await
3511
            .unwrap();
3512
        let spending_key = state_lock
3513
            .lock_guard()
3514
            .await
3515
            .wallet_state
3516
            .wallet_entropy
3517
            .nth_symmetric_key_for_tests(0);
3518

3519
        let genesis_block = Block::genesis(network);
3520
        let now = genesis_block.kernel.header.timestamp;
3521
        let config = TxCreationConfig::default()
3522
            .recover_change_off_chain(spending_key.into())
3523
            .with_prover_capability(TxProvingCapability::ProofCollection);
3524
        let transaction_1: Transaction = state_lock
3525
            .api()
3526
            .tx_initiator_internal()
3527
            .create_transaction(
3528
                Default::default(),
3529
                NativeCurrencyAmount::coins(0),
3530
                now,
3531
                config,
3532
            )
3533
            .await
3534
            .unwrap()
3535
            .transaction
3536
            .into();
3537

3538
        let (hsd_1, _sa_1) = get_dummy_peer_connection_data_genesis(network, 1);
3539
        let mut peer_loop_handler = PeerLoopHandler::new(
3540
            to_main_tx,
3541
            state_lock.clone(),
3542
            get_dummy_socket_address(0),
3543
            hsd_1.clone(),
3544
            true,
3545
            1,
3546
        );
3547
        let mut peer_state = MutablePeerState::new(hsd_1.tip_header.height);
3548

3549
        assert!(
3550
            state_lock.lock_guard().await.mempool.is_empty(),
3551
            "Mempool must be empty at init"
3552
        );
3553
        state_lock
3554
            .lock_guard_mut()
3555
            .await
3556
            .mempool_insert(transaction_1.clone(), TransactionOrigin::Foreign)
3557
            .await;
3558
        assert!(
3559
            !state_lock.lock_guard().await.mempool.is_empty(),
3560
            "Mempool must be non-empty after insertion"
3561
        );
3562

3563
        // Run the peer loop and verify expected exchange -- namely that the
3564
        // tx notification is received and the the transaction is *not*
3565
        // requested.
3566
        let tx_notification: TransactionNotification = (&transaction_1).try_into().unwrap();
3567
        let mock = Mock::new(vec![
3568
            Action::Read(PeerMessage::TransactionNotification(tx_notification)),
3569
            Action::Read(PeerMessage::Bye),
3570
        ]);
3571
        peer_loop_handler
3572
            .run(mock, from_main_rx_clone, &mut peer_state)
3573
            .await
3574
            .unwrap();
3575

3576
        // nothing is allowed to be sent to `main_loop`
3577
        match to_main_rx1.try_recv() {
3578
            Err(TryRecvError::Empty) => (),
3579
            Err(TryRecvError::Disconnected) => panic!("to_main channel must still be open"),
3580
            Ok(_) => panic!("to_main channel must be empty"),
3581
        };
3582
        Ok(())
3583
    }
3584

3585
    mod block_proposals {
3586
        use super::*;
3587
        use crate::tests::shared::get_dummy_handshake_data_for_genesis;
3588

3589
        struct TestSetup {
3590
            peer_loop_handler: PeerLoopHandler,
3591
            to_main_rx: mpsc::Receiver<PeerTaskToMain>,
3592
            from_main_rx: broadcast::Receiver<MainToPeerTask>,
3593
            peer_state: MutablePeerState,
3594
            to_main_tx: mpsc::Sender<PeerTaskToMain>,
3595
            genesis_block: Block,
3596
            peer_broadcast_tx: broadcast::Sender<MainToPeerTask>,
3597
        }
3598

3599
        async fn genesis_setup(network: Network) -> TestSetup {
3600
            let (peer_broadcast_tx, from_main_rx, to_main_tx, to_main_rx, alice, _hsd) =
3601
                get_test_genesis_setup(network, 0, cli_args::Args::default())
3602
                    .await
3603
                    .unwrap();
3604
            let peer_hsd = get_dummy_handshake_data_for_genesis(network);
3605
            let peer_loop_handler = PeerLoopHandler::new(
3606
                to_main_tx.clone(),
3607
                alice.clone(),
3608
                get_dummy_socket_address(0),
3609
                peer_hsd.clone(),
3610
                true,
3611
                1,
3612
            );
3613
            let peer_state = MutablePeerState::new(peer_hsd.tip_header.height);
3614

3615
            // (peer_loop_handler, to_main_rx1)
3616
            TestSetup {
3617
                peer_broadcast_tx,
3618
                peer_loop_handler,
3619
                to_main_rx,
3620
                from_main_rx,
3621
                peer_state,
3622
                to_main_tx,
3623
                genesis_block: Block::genesis(network),
3624
            }
3625
        }
3626

3627
        #[traced_test]
3628
        #[apply(shared_tokio_runtime)]
3629
        async fn accept_block_proposal_height_one() {
3630
            // Node knows genesis block, receives a block proposal for block 1
3631
            // and must accept this. Verify that main loop is informed of block
3632
            // proposal.
3633
            let TestSetup {
3634
                peer_broadcast_tx,
3635
                mut peer_loop_handler,
3636
                mut to_main_rx,
3637
                from_main_rx,
3638
                mut peer_state,
3639
                to_main_tx,
3640
                genesis_block,
3641
            } = genesis_setup(Network::Main).await;
3642
            let block1 = fake_valid_block_for_tests(
3643
                &peer_loop_handler.global_state_lock,
3644
                StdRng::seed_from_u64(5550001).random(),
3645
            )
3646
            .await;
3647

3648
            let mock = Mock::new(vec![
3649
                Action::Read(PeerMessage::BlockProposal(Box::new(block1))),
3650
                Action::Read(PeerMessage::Bye),
3651
            ]);
3652
            peer_loop_handler
3653
                .run(mock, from_main_rx, &mut peer_state)
3654
                .await
3655
                .unwrap();
3656

3657
            match to_main_rx.try_recv().unwrap() {
3658
                PeerTaskToMain::BlockProposal(block) => {
3659
                    assert_eq!(genesis_block.hash(), block.header().prev_block_digest);
3660
                }
3661
                _ => panic!("Expected main loop to be informed of block proposal"),
3662
            };
3663

3664
            drop(to_main_tx);
3665
            drop(peer_broadcast_tx);
3666
        }
3667

3668
        #[traced_test]
3669
        #[apply(shared_tokio_runtime)]
3670
        async fn accept_block_proposal_notification_height_one() {
3671
            // Node knows genesis block, receives a block proposal notification
3672
            // for block 1 and must accept this by requesting the block
3673
            // proposal from peer.
3674
            let TestSetup {
3675
                peer_broadcast_tx,
3676
                mut peer_loop_handler,
3677
                to_main_rx: _,
3678
                from_main_rx,
3679
                mut peer_state,
3680
                to_main_tx,
3681
                ..
3682
            } = genesis_setup(Network::Main).await;
3683
            let block1 = fake_valid_block_for_tests(
3684
                &peer_loop_handler.global_state_lock,
3685
                StdRng::seed_from_u64(5550001).random(),
3686
            )
3687
            .await;
3688

3689
            let mock = Mock::new(vec![
3690
                Action::Read(PeerMessage::BlockProposalNotification((&block1).into())),
3691
                Action::Write(PeerMessage::BlockProposalRequest(
3692
                    BlockProposalRequest::new(block1.body().mast_hash()),
3693
                )),
3694
                Action::Read(PeerMessage::Bye),
3695
            ]);
3696
            peer_loop_handler
3697
                .run(mock, from_main_rx, &mut peer_state)
3698
                .await
3699
                .unwrap();
3700

3701
            drop(to_main_tx);
3702
            drop(peer_broadcast_tx);
3703
        }
3704
    }
3705

3706
    mod proof_qualities {
3707
        use strum::IntoEnumIterator;
3708

3709
        use super::*;
3710
        use crate::config_models::cli_args;
3711
        use crate::models::blockchain::transaction::Transaction;
3712
        use crate::models::peer::transfer_transaction::TransactionProofQuality;
3713
        use crate::models::state::wallet::transaction_output::TxOutput;
3714
        use crate::tests::shared::mock_genesis_global_state;
3715

3716
        async fn tx_of_proof_quality(
3717
            network: Network,
3718
            quality: TransactionProofQuality,
3719
        ) -> Transaction {
3720
            let wallet_secret = WalletEntropy::devnet_wallet();
3721
            let alice_key = wallet_secret.nth_generation_spending_key_for_tests(0);
3722
            let alice_gsl = mock_genesis_global_state(
3723
                1,
3724
                wallet_secret,
3725
                cli_args::Args::default_with_network(network),
3726
            )
3727
            .await;
3728
            let alice = alice_gsl.lock_guard().await;
3729
            let genesis_block = alice.chain.light_state();
3730
            let in_seven_months = genesis_block.header().timestamp + Timestamp::months(7);
3731
            let prover_capability = match quality {
3732
                TransactionProofQuality::ProofCollection => TxProvingCapability::ProofCollection,
3733
                TransactionProofQuality::SingleProof => TxProvingCapability::SingleProof,
3734
            };
3735
            let config = TxCreationConfig::default()
3736
                .recover_change_off_chain(alice_key.into())
3737
                .with_prover_capability(prover_capability);
3738
            alice_gsl
3739
                .api()
3740
                .tx_initiator_internal()
3741
                .create_transaction(
3742
                    Vec::<TxOutput>::new().into(),
3743
                    NativeCurrencyAmount::coins(1),
3744
                    in_seven_months,
3745
                    config,
3746
                )
3747
                .await
3748
                .unwrap()
3749
                .transaction()
3750
                .clone()
3751
        }
3752

3753
        #[traced_test]
3754
        #[apply(shared_tokio_runtime)]
3755
        async fn client_favors_higher_proof_quality() {
3756
            // In this scenario the peer is informed of a transaction that it
3757
            // already knows, and it's tested that it checks the proof quality
3758
            // field and verifies that it exceeds the proof in the mempool
3759
            // before requesting the transasction.
3760
            let network = Network::Main;
3761
            let proof_collection_tx =
3762
                tx_of_proof_quality(network, TransactionProofQuality::ProofCollection).await;
3763
            let single_proof_tx =
3764
                tx_of_proof_quality(network, TransactionProofQuality::SingleProof).await;
3765

3766
            for (own_tx_pq, new_tx_pq) in
3767
                TransactionProofQuality::iter().cartesian_product(TransactionProofQuality::iter())
3768
            {
3769
                use TransactionProofQuality::*;
3770

3771
                let (
3772
                    _peer_broadcast_tx,
3773
                    from_main_rx_clone,
3774
                    to_main_tx,
3775
                    mut to_main_rx1,
3776
                    mut alice,
3777
                    handshake_data,
3778
                ) = get_test_genesis_setup(network, 1, cli_args::Args::default())
3779
                    .await
3780
                    .unwrap();
3781

3782
                let (own_tx, new_tx) = match (own_tx_pq, new_tx_pq) {
3783
                    (ProofCollection, ProofCollection) => {
3784
                        (&proof_collection_tx, &proof_collection_tx)
3785
                    }
3786
                    (ProofCollection, SingleProof) => (&proof_collection_tx, &single_proof_tx),
3787
                    (SingleProof, ProofCollection) => (&single_proof_tx, &proof_collection_tx),
3788
                    (SingleProof, SingleProof) => (&single_proof_tx, &single_proof_tx),
3789
                };
3790

3791
                alice
3792
                    .lock_guard_mut()
3793
                    .await
3794
                    .mempool_insert((*own_tx).to_owned(), TransactionOrigin::Foreign)
3795
                    .await;
3796

3797
                let tx_notification: TransactionNotification = new_tx.try_into().unwrap();
3798

3799
                let own_proof_is_supreme = own_tx_pq >= new_tx_pq;
3800
                let mock = if own_proof_is_supreme {
3801
                    Mock::new(vec![
3802
                        Action::Read(PeerMessage::TransactionNotification(tx_notification)),
3803
                        Action::Read(PeerMessage::Bye),
3804
                    ])
3805
                } else {
3806
                    Mock::new(vec![
3807
                        Action::Read(PeerMessage::TransactionNotification(tx_notification)),
3808
                        Action::Write(PeerMessage::TransactionRequest(tx_notification.txid)),
3809
                        Action::Read(PeerMessage::Transaction(Box::new(
3810
                            new_tx.try_into().unwrap(),
3811
                        ))),
3812
                        Action::Read(PeerMessage::Bye),
3813
                    ])
3814
                };
3815

3816
                let now = proof_collection_tx.kernel.timestamp;
3817
                let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
3818
                    to_main_tx,
3819
                    alice.clone(),
3820
                    get_dummy_socket_address(0),
3821
                    handshake_data.clone(),
3822
                    true,
3823
                    1,
3824
                    now,
3825
                );
3826
                let mut peer_state = MutablePeerState::new(handshake_data.tip_header.height);
3827

3828
                peer_loop_handler
3829
                    .run(mock, from_main_rx_clone, &mut peer_state)
3830
                    .await
3831
                    .unwrap();
3832

3833
                if own_proof_is_supreme {
3834
                    match to_main_rx1.try_recv() {
3835
                        Err(TryRecvError::Empty) => (),
3836
                        Err(TryRecvError::Disconnected) => {
3837
                            panic!("to_main channel must still be open")
3838
                        }
3839
                        Ok(_) => panic!("to_main channel must be empty"),
3840
                    }
3841
                } else {
3842
                    match to_main_rx1.try_recv() {
3843
                        Err(TryRecvError::Empty) => panic!("Transaction must be sent to main loop"),
3844
                        Err(TryRecvError::Disconnected) => {
3845
                            panic!("to_main channel must still be open")
3846
                        }
3847
                        Ok(PeerTaskToMain::Transaction(_)) => (),
3848
                        _ => panic!("Unexpected result from channel"),
3849
                    }
3850
                }
3851
            }
3852
        }
3853
    }
3854

3855
    mod sync_challenges {
3856
        use super::*;
3857
        use crate::tests::shared::fake_valid_sequence_of_blocks_for_tests_dyn;
3858

3859
        #[traced_test]
3860
        #[apply(shared_tokio_runtime)]
3861
        async fn bad_sync_challenge_height_greater_than_tip() {
3862
            // Criterium: Challenge height may not exceed that of tip in the
3863
            // request.
3864

3865
            let network = Network::Main;
3866
            let (
3867
                _alice_main_to_peer_tx,
3868
                alice_main_to_peer_rx,
3869
                alice_peer_to_main_tx,
3870
                alice_peer_to_main_rx,
3871
                mut alice,
3872
                alice_hsd,
3873
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default())
3874
                .await
3875
                .unwrap();
3876
            let genesis_block: Block = Block::genesis(network);
3877
            let blocks: [Block; 11] = fake_valid_sequence_of_blocks_for_tests(
3878
                &genesis_block,
3879
                Timestamp::hours(1),
3880
                Default::default(),
3881
                network,
3882
            )
3883
            .await;
3884
            for block in &blocks {
3885
                alice.set_new_tip(block.clone()).await.unwrap();
3886
            }
3887

3888
            let bh12 = blocks.last().unwrap().header().height;
3889
            let sync_challenge = SyncChallenge {
3890
                tip_digest: blocks[9].hash(),
3891
                challenges: [bh12; 10],
3892
            };
3893
            let alice_p2p_messages = Mock::new(vec![
3894
                Action::Read(PeerMessage::SyncChallenge(sync_challenge)),
3895
                Action::Read(PeerMessage::Bye),
3896
            ]);
3897

3898
            let peer_address = get_dummy_socket_address(0);
3899
            let mut alice_peer_loop_handler = PeerLoopHandler::new(
3900
                alice_peer_to_main_tx.clone(),
3901
                alice.clone(),
3902
                peer_address,
3903
                alice_hsd,
3904
                false,
3905
                1,
3906
            );
3907
            alice_peer_loop_handler
3908
                .run_wrapper(alice_p2p_messages, alice_main_to_peer_rx)
3909
                .await
3910
                .unwrap();
3911

3912
            drop(alice_peer_to_main_rx);
3913

3914
            let latest_sanction = alice
3915
                .lock_guard()
3916
                .await
3917
                .net
3918
                .get_peer_standing_from_database(peer_address.ip())
3919
                .await
3920
                .unwrap();
3921
            assert_eq!(
3922
                NegativePeerSanction::InvalidSyncChallenge,
3923
                latest_sanction
3924
                    .latest_punishment
3925
                    .expect("peer must be sanctioned")
3926
                    .0
3927
            );
3928
        }
3929

3930
        #[traced_test]
3931
        #[apply(shared_tokio_runtime)]
3932
        async fn bad_sync_challenge_genesis_block_doesnt_crash_client() {
3933
            // Criterium: Challenge may not point to genesis block, or block 1, as
3934
            // tip.
3935

3936
            let network = Network::Main;
3937
            let genesis_block: Block = Block::genesis(network);
3938

3939
            let alice_cli = cli_args::Args::default();
3940
            let (
3941
                _alice_main_to_peer_tx,
3942
                alice_main_to_peer_rx,
3943
                alice_peer_to_main_tx,
3944
                alice_peer_to_main_rx,
3945
                alice,
3946
                alice_hsd,
3947
            ) = get_test_genesis_setup(network, 0, alice_cli).await.unwrap();
3948

3949
            let sync_challenge = SyncChallenge {
3950
                tip_digest: genesis_block.hash(),
3951
                challenges: [BlockHeight::genesis(); 10],
3952
            };
3953

3954
            let alice_p2p_messages = Mock::new(vec![
3955
                Action::Read(PeerMessage::SyncChallenge(sync_challenge)),
3956
                Action::Read(PeerMessage::Bye),
3957
            ]);
3958

3959
            let peer_address = get_dummy_socket_address(0);
3960
            let mut alice_peer_loop_handler = PeerLoopHandler::new(
3961
                alice_peer_to_main_tx.clone(),
3962
                alice.clone(),
3963
                peer_address,
3964
                alice_hsd,
3965
                false,
3966
                1,
3967
            );
3968
            alice_peer_loop_handler
3969
                .run_wrapper(alice_p2p_messages, alice_main_to_peer_rx)
3970
                .await
3971
                .unwrap();
3972

3973
            drop(alice_peer_to_main_rx);
3974

3975
            let latest_sanction = alice
3976
                .lock_guard()
3977
                .await
3978
                .net
3979
                .get_peer_standing_from_database(peer_address.ip())
3980
                .await
3981
                .unwrap();
3982
            assert_eq!(
3983
                NegativePeerSanction::InvalidSyncChallenge,
3984
                latest_sanction
3985
                    .latest_punishment
3986
                    .expect("peer must be sanctioned")
3987
                    .0
3988
            );
3989
        }
3990

3991
        #[traced_test]
3992
        #[apply(shared_tokio_runtime)]
3993
        async fn sync_challenge_happy_path() -> Result<()> {
3994
            // Bob notifies Alice of a block whose parameters satisfy the sync mode
3995
            // criterion. Alice issues a challenge. Bob responds. Alice enters into
3996
            // sync mode.
3997

3998
            let mut rng = rand::rng();
3999
            let network = Network::Main;
4000
            let genesis_block: Block = Block::genesis(network);
4001

4002
            const ALICE_SYNC_MODE_THRESHOLD: usize = 10;
4003
            let alice_cli = cli_args::Args {
4004
                sync_mode_threshold: ALICE_SYNC_MODE_THRESHOLD,
4005
                ..Default::default()
4006
            };
4007
            let (
4008
                _alice_main_to_peer_tx,
4009
                alice_main_to_peer_rx,
4010
                alice_peer_to_main_tx,
4011
                mut alice_peer_to_main_rx,
4012
                mut alice,
4013
                alice_hsd,
4014
            ) = get_test_genesis_setup(network, 0, alice_cli).await?;
4015
            let _alice_socket_address = get_dummy_socket_address(0);
4016

4017
            let (
4018
                _bob_main_to_peer_tx,
4019
                _bob_main_to_peer_rx,
4020
                _bob_peer_to_main_tx,
4021
                _bob_peer_to_main_rx,
4022
                mut bob,
4023
                _bob_hsd,
4024
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
4025
            let bob_socket_address = get_dummy_socket_address(0);
4026

4027
            let now = genesis_block.header().timestamp + Timestamp::hours(1);
4028
            let block_1 = fake_valid_block_for_tests(&alice, rng.random()).await;
4029
            assert!(
4030
                block_1.is_valid(&genesis_block, now, network).await,
4031
                "Block must be valid for this test to make sense"
4032
            );
4033
            let alice_tip = &block_1;
4034
            alice.set_new_tip(block_1.clone()).await?;
4035
            bob.set_new_tip(block_1.clone()).await?;
4036

4037
            // produce enough blocks to ensure alice needs to go into sync mode
4038
            // with this block notification.
4039
            let blocks = fake_valid_sequence_of_blocks_for_tests_dyn(
4040
                &block_1,
4041
                network.target_block_interval(),
4042
                (0..rng.random_range(ALICE_SYNC_MODE_THRESHOLD + 1..20))
4043
                    .map(|_| rng.random())
4044
                    .collect_vec(),
4045
                network,
4046
            )
4047
            .await;
4048
            for block in &blocks {
4049
                bob.set_new_tip(block.clone()).await?;
4050
            }
4051
            let bob_tip = blocks.last().unwrap();
4052

4053
            let block_notification_from_bob = PeerBlockNotification {
4054
                hash: bob_tip.hash(),
4055
                height: bob_tip.header().height,
4056
                cumulative_proof_of_work: bob_tip.header().cumulative_proof_of_work,
4057
            };
4058

4059
            let alice_rng_seed = rng.random::<[u8; 32]>();
4060
            let mut alice_rng_clone = StdRng::from_seed(alice_rng_seed);
4061
            let sync_challenge_from_alice = SyncChallenge::generate(
4062
                &block_notification_from_bob,
4063
                alice_tip.header().height,
4064
                alice_rng_clone.random(),
4065
            );
4066

4067
            println!(
4068
                "sync challenge from alice:\n{:?}",
4069
                sync_challenge_from_alice
4070
            );
4071

4072
            let sync_challenge_response_from_bob = bob
4073
                .lock_guard()
4074
                .await
4075
                .response_to_sync_challenge(sync_challenge_from_alice)
4076
                .await
4077
                .expect("should be able to respond to sync challenge");
4078

4079
            let alice_p2p_messages = Mock::new(vec![
4080
                Action::Read(PeerMessage::BlockNotification(block_notification_from_bob)),
4081
                Action::Write(PeerMessage::SyncChallenge(sync_challenge_from_alice)),
4082
                Action::Read(PeerMessage::SyncChallengeResponse(Box::new(
4083
                    sync_challenge_response_from_bob,
4084
                ))),
4085
                Action::Read(PeerMessage::BlockNotification(block_notification_from_bob)),
4086
                // Ensure no 2nd sync challenge is sent, as timeout has not yet passed.
4087
                // The absence of a Write here checks that a 2nd challenge isn't sent
4088
                // when a successful was just received.
4089
                Action::Read(PeerMessage::Bye),
4090
            ]);
4091

4092
            let mut alice_peer_loop_handler = PeerLoopHandler::with_mocked_time(
4093
                alice_peer_to_main_tx.clone(),
4094
                alice.clone(),
4095
                bob_socket_address,
4096
                alice_hsd,
4097
                false,
4098
                1,
4099
                bob_tip.header().timestamp,
4100
            );
4101
            alice_peer_loop_handler.set_rng(StdRng::from_seed(alice_rng_seed));
4102
            alice_peer_loop_handler
4103
                .run_wrapper(alice_p2p_messages, alice_main_to_peer_rx)
4104
                .await?;
4105

4106
            // AddPeerMaxBlockHeight message triggered *after* sync challenge
4107
            let mut expected_anchor_mmra = bob_tip.body().block_mmr_accumulator.clone();
4108
            expected_anchor_mmra.append(bob_tip.hash());
4109
            let expected_message_from_alice_peer_loop = PeerTaskToMain::AddPeerMaxBlockHeight {
4110
                peer_address: bob_socket_address,
4111
                claimed_height: bob_tip.header().height,
4112
                claimed_cumulative_pow: bob_tip.header().cumulative_proof_of_work,
4113
                claimed_block_mmra: expected_anchor_mmra,
4114
            };
4115
            let observed_message_from_alice_peer_loop = alice_peer_to_main_rx.recv().await.unwrap();
4116
            assert_eq!(
4117
                expected_message_from_alice_peer_loop,
4118
                observed_message_from_alice_peer_loop
4119
            );
4120

4121
            Ok(())
4122
        }
4123
    }
4124
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc