• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Neptune-Crypto / neptune-core / 16567754698

28 Jul 2025 11:22AM UTC coverage: 74.102% (-0.04%) from 74.138%
16567754698

push

github

Sword-Smith
chore: Update TVM benchmark data

The updated benchmark show that the cycle count for hashing the removal
records has decreased by ~25 %. This is due to the better representation
of absolute indices introduced in
e57c6a1de. Note that only the index sets
are hashed, the MMR authentication paths are not.

22902 of 30906 relevant lines covered (74.1%)

631000.5 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.64
/src/peer_loop.rs
1
use std::cmp;
2
use std::marker::Unpin;
3
use std::net::SocketAddr;
4
use std::time::SystemTime;
5

6
use anyhow::bail;
7
use anyhow::Result;
8
use chrono::DateTime;
9
use chrono::Utc;
10
use futures::sink::Sink;
11
use futures::sink::SinkExt;
12
use futures::stream::TryStream;
13
use futures::stream::TryStreamExt;
14
use itertools::Itertools;
15
use rand::rngs::StdRng;
16
use rand::Rng;
17
use rand::SeedableRng;
18
use tasm_lib::triton_vm::prelude::Digest;
19
use tasm_lib::twenty_first::prelude::Mmr;
20
use tasm_lib::twenty_first::prelude::MmrMembershipProof;
21
use tasm_lib::twenty_first::util_types::mmr::mmr_accumulator::MmrAccumulator;
22
use tokio::select;
23
use tokio::sync::broadcast;
24
use tokio::sync::mpsc;
25
use tracing::debug;
26
use tracing::error;
27
use tracing::info;
28
use tracing::warn;
29

30
use crate::connect_to_peers::close_peer_connected_callback;
31
use crate::macros::fn_name;
32
use crate::macros::log_slow_scope;
33
use crate::main_loop::MAX_NUM_DIGESTS_IN_BATCH_REQUEST;
34
use crate::models::blockchain::block::block_height::BlockHeight;
35
use crate::models::blockchain::block::mutator_set_update::MutatorSetUpdate;
36
use crate::models::blockchain::block::Block;
37
use crate::models::blockchain::consensus_rule_set::ConsensusRuleSet;
38
use crate::models::blockchain::transaction::transaction_kernel::TransactionConfirmabilityError;
39
use crate::models::blockchain::transaction::Transaction;
40
use crate::models::channel::MainToPeerTask;
41
use crate::models::channel::PeerTaskToMain;
42
use crate::models::channel::PeerTaskToMainTransaction;
43
use crate::models::peer::handshake_data::HandshakeData;
44
use crate::models::peer::peer_info::PeerConnectionInfo;
45
use crate::models::peer::peer_info::PeerInfo;
46
use crate::models::peer::transfer_block::TransferBlock;
47
use crate::models::peer::BlockProposalRequest;
48
use crate::models::peer::BlockRequestBatch;
49
use crate::models::peer::IssuedSyncChallenge;
50
use crate::models::peer::MutablePeerState;
51
use crate::models::peer::NegativePeerSanction;
52
use crate::models::peer::PeerMessage;
53
use crate::models::peer::PeerSanction;
54
use crate::models::peer::PeerStanding;
55
use crate::models::peer::PositivePeerSanction;
56
use crate::models::peer::SyncChallenge;
57
use crate::models::proof_abstractions::mast_hash::MastHash;
58
use crate::models::proof_abstractions::timestamp::Timestamp;
59
use crate::models::state::block_proposal::BlockProposalRejectError;
60
use crate::models::state::mempool::MEMPOOL_IGNORE_TRANSACTIONS_THIS_MANY_SECS_AHEAD;
61
use crate::models::state::mempool::MEMPOOL_TX_THRESHOLD_AGE_IN_SECS;
62
use crate::models::state::GlobalState;
63
use crate::models::state::GlobalStateLock;
64
use crate::util_types::mutator_set::removal_record::RemovalRecordValidityError;
65

66
const STANDARD_BLOCK_BATCH_SIZE: usize = 250;
67
const MAX_PEER_LIST_LENGTH: usize = 10;
68
const MINIMUM_BLOCK_BATCH_SIZE: usize = 2;
69

70
const KEEP_CONNECTION_ALIVE: bool = false;
71
const DISCONNECT_CONNECTION: bool = true;
72

73
pub type PeerStandingNumber = i32;
74

75
/// Handles messages from peers via TCP
76
///
77
/// also handles messages from main task over the main-to-peer-tasks broadcast
78
/// channel.
79
#[derive(Debug, Clone)]
80
pub struct PeerLoopHandler {
81
    to_main_tx: mpsc::Sender<PeerTaskToMain>,
82
    global_state_lock: GlobalStateLock,
83
    peer_address: SocketAddr,
84
    peer_handshake_data: HandshakeData,
85
    inbound_connection: bool,
86
    distance: u8,
87
    rng: StdRng,
88
    #[cfg(test)]
89
    mock_now: Option<Timestamp>,
90
}
91

92
impl PeerLoopHandler {
93
    pub(crate) fn new(
29✔
94
        to_main_tx: mpsc::Sender<PeerTaskToMain>,
29✔
95
        global_state_lock: GlobalStateLock,
29✔
96
        peer_address: SocketAddr,
29✔
97
        peer_handshake_data: HandshakeData,
29✔
98
        inbound_connection: bool,
29✔
99
        distance: u8,
29✔
100
    ) -> Self {
29✔
101
        Self {
29✔
102
            to_main_tx,
29✔
103
            global_state_lock,
29✔
104
            peer_address,
29✔
105
            peer_handshake_data,
29✔
106
            inbound_connection,
29✔
107
            distance,
29✔
108
            rng: StdRng::from_rng(&mut rand::rng()),
29✔
109
            #[cfg(test)]
29✔
110
            mock_now: None,
29✔
111
        }
29✔
112
    }
29✔
113

114
    /// Allows for mocked timestamps such that time dependencies may be tested.
115
    #[cfg(test)]
116
    pub(crate) fn with_mocked_time(
22✔
117
        to_main_tx: mpsc::Sender<PeerTaskToMain>,
22✔
118
        global_state_lock: GlobalStateLock,
22✔
119
        peer_address: SocketAddr,
22✔
120
        peer_handshake_data: HandshakeData,
22✔
121
        inbound_connection: bool,
22✔
122
        distance: u8,
22✔
123
        mocked_time: Timestamp,
22✔
124
    ) -> Self {
22✔
125
        Self {
22✔
126
            to_main_tx,
22✔
127
            global_state_lock,
22✔
128
            peer_address,
22✔
129
            peer_handshake_data,
22✔
130
            inbound_connection,
22✔
131
            distance,
22✔
132
            mock_now: Some(mocked_time),
22✔
133
            rng: StdRng::from_rng(&mut rand::rng()),
22✔
134
        }
22✔
135
    }
22✔
136

137
    /// Overwrite the random number generator object with a specific one.
138
    ///
139
    /// Useful for derandomizing tests.
140
    #[cfg(test)]
141
    fn set_rng(&mut self, rng: StdRng) {
1✔
142
        self.rng = rng;
1✔
143
    }
1✔
144

145
    fn now(&self) -> Timestamp {
56✔
146
        #[cfg(not(test))]
147
        {
148
            Timestamp::now()
27✔
149
        }
150
        #[cfg(test)]
151
        {
152
            self.mock_now.unwrap_or(Timestamp::now())
29✔
153
        }
154
    }
56✔
155

156
    /// Punish a peer for bad behavior.
157
    ///
158
    /// Return `Err` if the peer in question is (now) banned.
159
    ///
160
    /// # Locking:
161
    ///   * acquires `global_state_lock` for write
162
    async fn punish(&mut self, reason: NegativePeerSanction) -> Result<()> {
7✔
163
        let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
7✔
164
        warn!("Punishing peer {} for {:?}", self.peer_address.ip(), reason);
7✔
165
        debug!(
7✔
166
            "Peer standing before punishment is {}",
×
167
            global_state_mut
×
168
                .net
×
169
                .peer_map
×
170
                .get(&self.peer_address)
×
171
                .unwrap()
×
172
                .standing
173
        );
174

175
        let Some(peer_info) = global_state_mut.net.peer_map.get_mut(&self.peer_address) else {
7✔
176
            bail!("Could not read peer map.");
×
177
        };
178
        let sanction_result = peer_info.standing.sanction(PeerSanction::Negative(reason));
7✔
179
        if let Err(err) = sanction_result {
7✔
180
            warn!("Banning peer: {err}");
1✔
181
        }
6✔
182

183
        sanction_result.map_err(|err| anyhow::anyhow!("Banning peer: {err}"))
7✔
184
    }
7✔
185

186
    /// Reward a peer for good behavior.
187
    ///
188
    /// Return `Err` if the peer in question is banned.
189
    ///
190
    /// # Locking:
191
    ///   * acquires `global_state_lock` for write
192
    async fn reward(&mut self, reason: PositivePeerSanction) -> Result<()> {
19✔
193
        let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
19✔
194
        info!("Rewarding peer {} for {:?}", self.peer_address.ip(), reason);
19✔
195
        let Some(peer_info) = global_state_mut.net.peer_map.get_mut(&self.peer_address) else {
19✔
196
            error!("Could not read peer map.");
1✔
197
            return Ok(());
1✔
198
        };
199
        let sanction_result = peer_info.standing.sanction(PeerSanction::Positive(reason));
18✔
200
        if sanction_result.is_err() {
18✔
201
            error!("Cannot reward banned peer");
×
202
        }
18✔
203

204
        sanction_result.map_err(|err| anyhow::anyhow!("Cannot reward banned peer: {err}"))
18✔
205
    }
19✔
206

207
    /// Construct a batch response, with blocks and their MMR membership proofs
208
    /// relative to a specified anchor.
209
    ///
210
    /// Returns `None` if the anchor has a lower leaf count than the blocks, or
211
    /// a block height of the response exceeds own tip height.
212
    async fn batch_response(
16✔
213
        state: &GlobalState,
16✔
214
        blocks: Vec<Block>,
16✔
215
        anchor: &MmrAccumulator,
16✔
216
    ) -> Option<Vec<(TransferBlock, MmrMembershipProof)>> {
16✔
217
        let own_tip_height = state.chain.light_state().header().height;
16✔
218
        let block_heights_match_anchor = blocks
16✔
219
            .iter()
16✔
220
            .all(|bl| bl.header().height < anchor.num_leafs().into());
46✔
221
        let block_heights_known = blocks.iter().all(|bl| bl.header().height <= own_tip_height);
46✔
222
        if !block_heights_match_anchor || !block_heights_known {
16✔
223
            let max_block_height = match blocks.iter().map(|bl| bl.header().height).max() {
×
224
                Some(height) => height.to_string(),
×
225
                None => "None".to_owned(),
×
226
            };
227

228
            debug!("max_block_height: {max_block_height}");
×
229
            debug!("own_tip_height: {own_tip_height}");
×
230
            debug!("anchor.num_leafs(): {}", anchor.num_leafs());
×
231
            debug!("block_heights_match_anchor: {block_heights_match_anchor}");
×
232
            debug!("block_heights_known: {block_heights_known}");
×
233
            return None;
×
234
        }
16✔
235

236
        let mut ret = vec![];
16✔
237
        for block in blocks {
62✔
238
            let mmr_mp = state
46✔
239
                .chain
46✔
240
                .archival_state()
46✔
241
                .archival_block_mmr
46✔
242
                .ammr()
46✔
243
                .prove_membership_relative_to_smaller_mmr(
46✔
244
                    block.header().height.into(),
46✔
245
                    anchor.num_leafs(),
46✔
246
                )
46✔
247
                .await;
46✔
248
            let block: TransferBlock = block.try_into().unwrap();
46✔
249
            ret.push((block, mmr_mp));
46✔
250
        }
251

252
        Some(ret)
16✔
253
    }
16✔
254

255
    /// Handle validation and send all blocks to the main task if they're all
256
    /// valid. Use with a list of blocks or a single block. When the
257
    /// `received_blocks` is a list, the parent of the `i+1`th block in the
258
    /// list is the `i`th block. The parent of element zero in this list is
259
    /// `parent_of_first_block`.
260
    ///
261
    /// # Return Value
262
    ///  - `Err` when the connection should be closed;
263
    ///  - `Ok(None)` if some block is invalid
264
    ///  - `Ok(None)` if the last block has insufficient cumulative PoW and we
265
    ///    are not syncing;
266
    ///  - `Ok(None)` if the last block has insufficient height and we are
267
    ///    syncing;
268
    ///  - `Ok(Some(block_height))` otherwise, referring to the block with the
269
    ///    highest height in the batch.
270
    ///
271
    /// A return value of Ok(Some(_)) means that the message was passed on to
272
    /// main loop.
273
    ///
274
    /// # Locking
275
    ///   * Acquires `global_state_lock` for write via `self.punish(..)` and
276
    ///     `self.reward(..)`.
277
    ///
278
    /// # Panics
279
    ///
280
    ///  - Panics if called with the empty list.
281
    async fn handle_blocks(
20✔
282
        &mut self,
20✔
283
        received_blocks: Vec<Block>,
20✔
284
        parent_of_first_block: Block,
20✔
285
    ) -> Result<Option<BlockHeight>> {
20✔
286
        debug!(
20✔
287
            "attempting to validate {} {}",
×
288
            received_blocks.len(),
×
289
            if received_blocks.len() == 1 {
×
290
                "block"
×
291
            } else {
292
                "blocks"
×
293
            }
294
        );
295
        let now = self.now();
20✔
296
        debug!("validating with respect to current timestamp {now}");
20✔
297
        let mut previous_block = &parent_of_first_block;
20✔
298
        for new_block in &received_blocks {
48✔
299
            let new_block_has_proof_of_work = new_block.has_proof_of_work(
29✔
300
                self.global_state_lock.cli().network,
29✔
301
                previous_block.header(),
29✔
302
            );
303
            debug!("new block has proof of work? {new_block_has_proof_of_work}");
29✔
304
            let new_block_is_valid = new_block
29✔
305
                .is_valid(previous_block, now, self.global_state_lock.cli().network)
29✔
306
                .await;
29✔
307
            debug!("new block is valid? {new_block_is_valid}");
29✔
308
            if !new_block_has_proof_of_work {
29✔
309
                warn!(
1✔
310
                    "Received invalid proof-of-work for block of height {} from peer with IP {}",
×
311
                    new_block.kernel.header.height, self.peer_address
×
312
                );
313
                warn!("Difficulty is {}.", previous_block.kernel.header.difficulty);
1✔
314
                warn!(
1✔
315
                    "Proof of work should be {} (or more) but was [{}].",
×
316
                    previous_block.kernel.header.difficulty.target(),
×
317
                    new_block.hash().values().iter().join(", ")
×
318
                );
319
                self.punish(NegativePeerSanction::InvalidBlock((
1✔
320
                    new_block.kernel.header.height,
1✔
321
                    new_block.hash(),
1✔
322
                )))
1✔
323
                .await?;
1✔
324
                warn!("Failed to validate block due to insufficient PoW");
1✔
325
                return Ok(None);
1✔
326
            } else if !new_block_is_valid {
28✔
327
                warn!(
×
328
                    "Received invalid block of height {} from peer with IP {}",
×
329
                    new_block.kernel.header.height, self.peer_address
×
330
                );
331
                self.punish(NegativePeerSanction::InvalidBlock((
×
332
                    new_block.kernel.header.height,
×
333
                    new_block.hash(),
×
334
                )))
×
335
                .await?;
×
336
                warn!("Failed to validate block: invalid block");
×
337
                return Ok(None);
×
338
            }
28✔
339
            info!(
28✔
340
                "Block with height {} is valid. mined: {}",
×
341
                new_block.kernel.header.height,
×
342
                new_block.kernel.header.timestamp.standard_format()
×
343
            );
344

345
            previous_block = new_block;
28✔
346
        }
347

348
        // evaluate the fork choice rule
349
        debug!("Checking last block's canonicity ...");
19✔
350
        let last_block = received_blocks.last().unwrap();
19✔
351
        let is_canonical = self
19✔
352
            .global_state_lock
19✔
353
            .lock_guard()
19✔
354
            .await
19✔
355
            .incoming_block_is_more_canonical(last_block);
19✔
356
        let last_block_height = last_block.header().height;
19✔
357
        let sync_mode_active_and_have_new_champion = self
19✔
358
            .global_state_lock
19✔
359
            .lock_guard()
19✔
360
            .await
19✔
361
            .net
362
            .sync_anchor
363
            .as_ref()
19✔
364
            .is_some_and(|x| {
19✔
365
                x.champion
×
366
                    .is_none_or(|(height, _)| height < last_block_height)
×
367
            });
×
368
        if !is_canonical && !sync_mode_active_and_have_new_champion {
19✔
369
            warn!(
1✔
370
                "Received {} blocks from peer but incoming blocks are less \
×
371
            canonical than current tip, or current sync-champion.",
×
372
                received_blocks.len()
×
373
            );
374
            return Ok(None);
1✔
375
        }
18✔
376

377
        // Send the new blocks to the main task which handles the state update
378
        // and storage to the database.
379
        let number_of_received_blocks = received_blocks.len();
18✔
380
        self.to_main_tx
18✔
381
            .send(PeerTaskToMain::NewBlocks(received_blocks))
18✔
382
            .await?;
18✔
383
        info!(
18✔
384
            "Updated block info by block from peer. block height {}",
×
385
            last_block_height
386
        );
387

388
        // Valuable, new, hard-to-produce information. Reward peer.
389
        self.reward(PositivePeerSanction::ValidBlocks(number_of_received_blocks))
18✔
390
            .await?;
18✔
391

392
        Ok(Some(last_block_height))
18✔
393
    }
20✔
394

395
    /// Take a single block received from a peer and (attempt to) find a path
396
    /// between the received block and a common ancestor stored in the blocks
397
    /// database.
398
    ///
399
    /// This function attempts to find the parent of the received block, either
400
    /// by searching the database or by requesting it from a peer.
401
    ///  - If the parent is not stored, it is requested from the peer and the
402
    ///    received block is pushed to the fork reconciliation list for later
403
    ///    handling by this function. The fork reconciliation list starts out
404
    ///    empty, but grows as more parents are requested and transmitted.
405
    ///  - If the parent is found in the database, a) block handling continues:
406
    ///    the entire list of fork reconciliation blocks are passed down the
407
    ///    pipeline, potentially leading to a state update; and b) the fork
408
    ///    reconciliation list is cleared.
409
    ///
410
    /// Locking:
411
    ///   * Acquires `global_state_lock` for write via `self.punish(..)` and
412
    ///     `self.reward(..)`.
413
    async fn try_ensure_path<S>(
32✔
414
        &mut self,
32✔
415
        received_block: Box<Block>,
32✔
416
        peer: &mut S,
32✔
417
        peer_state: &mut MutablePeerState,
32✔
418
    ) -> Result<()>
32✔
419
    where
32✔
420
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
32✔
421
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
32✔
422
        <S as TryStream>::Error: std::error::Error,
32✔
423
    {
32✔
424
        // Does the received block match the fork reconciliation list?
425
        let received_block_matches_fork_reconciliation_list = if let Some(successor) =
32✔
426
            peer_state.fork_reconciliation_blocks.last()
32✔
427
        {
428
            let valid = successor
10✔
429
                .is_valid(
10✔
430
                    received_block.as_ref(),
10✔
431
                    self.now(),
10✔
432
                    self.global_state_lock.cli().network,
10✔
433
                )
10✔
434
                .await;
10✔
435
            if !valid {
10✔
436
                warn!(
×
437
                        "Fork reconciliation failed after receiving {} blocks: successor of received block is invalid",
×
438
                        peer_state.fork_reconciliation_blocks.len() + 1
×
439
                    );
440
            }
10✔
441
            valid
10✔
442
        } else {
443
            true
22✔
444
        };
445

446
        // Are we running out of RAM?
447
        let too_many_blocks = peer_state.fork_reconciliation_blocks.len() + 1
32✔
448
            >= self.global_state_lock.cli().sync_mode_threshold;
32✔
449
        if too_many_blocks {
32✔
450
            warn!(
1✔
451
                "Fork reconciliation failed after receiving {} blocks: block count exceeds sync mode threshold",
×
452
                peer_state.fork_reconciliation_blocks.len() + 1
×
453
            );
454
        }
31✔
455

456
        // Block mismatch or too many blocks: abort!
457
        if !received_block_matches_fork_reconciliation_list || too_many_blocks {
32✔
458
            self.punish(NegativePeerSanction::ForkResolutionError((
1✔
459
                received_block.header().height,
1✔
460
                peer_state.fork_reconciliation_blocks.len() as u16,
1✔
461
                received_block.hash(),
1✔
462
            )))
1✔
463
            .await?;
1✔
464
            peer_state.fork_reconciliation_blocks = vec![];
1✔
465
            return Ok(());
1✔
466
        }
31✔
467

468
        // otherwise, append
469
        peer_state.fork_reconciliation_blocks.push(*received_block);
31✔
470

471
        // Try fetch parent
472
        let received_block_header = *peer_state
31✔
473
            .fork_reconciliation_blocks
31✔
474
            .last()
31✔
475
            .unwrap()
31✔
476
            .header();
31✔
477

478
        let parent_digest = received_block_header.prev_block_digest;
31✔
479
        let parent_height = received_block_header.height.previous()
31✔
480
            .expect("transferred block must have previous height because genesis block cannot be transferred");
31✔
481
        debug!("Try ensure path: fetching parent block");
31✔
482
        let parent_block = self
31✔
483
            .global_state_lock
31✔
484
            .lock_guard()
31✔
485
            .await
31✔
486
            .chain
487
            .archival_state()
31✔
488
            .get_block(parent_digest)
31✔
489
            .await?;
31✔
490
        debug!(
31✔
491
            "Completed parent block fetching from DB: {}",
×
492
            if parent_block.is_some() {
×
493
                "found".to_string()
×
494
            } else {
495
                "not found".to_string()
×
496
            }
497
        );
498

499
        // If parent is not known (but not genesis) request it.
500
        let Some(parent_block) = parent_block else {
31✔
501
            if parent_height.is_genesis() {
11✔
502
                peer_state.fork_reconciliation_blocks.clear();
1✔
503
                self.punish(NegativePeerSanction::DifferentGenesis).await?;
1✔
504
                return Ok(());
×
505
            }
10✔
506
            info!(
10✔
507
                "Parent not known: Requesting previous block with height {} from peer",
×
508
                parent_height
509
            );
510

511
            peer.send(PeerMessage::BlockRequestByHash(parent_digest))
10✔
512
                .await?;
10✔
513

514
            return Ok(());
10✔
515
        };
516

517
        // We want to treat the received fork reconciliation blocks (plus the
518
        // received block) in reverse order, from oldest to newest, because
519
        // they were requested from high to low block height.
520
        let mut new_blocks = peer_state.fork_reconciliation_blocks.clone();
20✔
521
        new_blocks.reverse();
20✔
522

523
        // Reset the fork resolution state since we got all the way back to a
524
        // block that we have.
525
        let fork_reconciliation_event = !peer_state.fork_reconciliation_blocks.is_empty();
20✔
526
        peer_state.fork_reconciliation_blocks.clear();
20✔
527

528
        if let Some(new_block_height) = self.handle_blocks(new_blocks, parent_block).await? {
20✔
529
            // If `BlockNotification` was received during a block reconciliation
530
            // event, then the peer might have one (or more (unlikely)) blocks
531
            // that we do not have. We should thus request those blocks.
532
            if fork_reconciliation_event
18✔
533
                && peer_state.highest_shared_block_height > new_block_height
18✔
534
            {
535
                peer.send(PeerMessage::BlockRequestByHeight(
1✔
536
                    peer_state.highest_shared_block_height,
1✔
537
                ))
1✔
538
                .await?;
1✔
539
            }
17✔
540
        }
2✔
541

542
        Ok(())
20✔
543
    }
32✔
544

545
    /// Handle peer messages and returns Ok(true) if connection should be closed.
546
    /// Connection should also be closed if an error is returned.
547
    /// Otherwise, returns OK(false).
548
    ///
549
    /// Locking:
550
    ///   * Acquires `global_state_lock` for read.
551
    ///   * Acquires `global_state_lock` for write via `self.punish(..)` and
552
    ///     `self.reward(..)`.
553
    async fn handle_peer_message<S>(
154✔
554
        &mut self,
154✔
555
        msg: PeerMessage,
154✔
556
        peer: &mut S,
154✔
557
        peer_state_info: &mut MutablePeerState,
154✔
558
    ) -> Result<bool>
154✔
559
    where
154✔
560
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
154✔
561
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
154✔
562
        <S as TryStream>::Error: std::error::Error,
154✔
563
    {
154✔
564
        debug!(
154✔
565
            "Received {} from peer {}",
×
566
            msg.get_type(),
×
567
            self.peer_address
568
        );
569
        match msg {
154✔
570
            PeerMessage::Bye => {
571
                // Note that the current peer is not removed from the global_state.peer_map here
572
                // but that this is done by the caller.
573
                info!("Got bye. Closing connection to peer");
43✔
574
                Ok(DISCONNECT_CONNECTION)
43✔
575
            }
576
            PeerMessage::PeerListRequest => {
577
                let peer_info = {
5✔
578
                    log_slow_scope!(fn_name!() + "::PeerMessage::PeerListRequest");
5✔
579

580
                    // We are interested in the address on which peers accept ingoing connections,
581
                    // not in the address in which they are connected to us. We are only interested in
582
                    // peers that accept incoming connections.
583
                    let mut peer_info: Vec<(SocketAddr, u128)> = self
5✔
584
                        .global_state_lock
5✔
585
                        .lock_guard()
5✔
586
                        .await
5✔
587
                        .net
588
                        .peer_map
589
                        .values()
5✔
590
                        .filter(|peer_info| peer_info.listen_address().is_some())
8✔
591
                        .take(MAX_PEER_LIST_LENGTH) // limit length of response
5✔
592
                        .map(|peer_info| {
8✔
593
                            (
8✔
594
                                // unwrap is safe bc of above `filter`
8✔
595
                                peer_info.listen_address().unwrap(),
8✔
596
                                peer_info.instance_id(),
8✔
597
                            )
8✔
598
                        })
8✔
599
                        .collect();
5✔
600

601
                    // We sort the returned list, so this function is easier to test
602
                    peer_info.sort_by_cached_key(|x| x.0);
5✔
603
                    peer_info
5✔
604
                };
605

606
                debug!("Responding with: {:?}", peer_info);
5✔
607
                peer.send(PeerMessage::PeerListResponse(peer_info)).await?;
5✔
608
                Ok(KEEP_CONNECTION_ALIVE)
5✔
609
            }
610
            PeerMessage::PeerListResponse(peers) => {
3✔
611
                log_slow_scope!(fn_name!() + "::PeerMessage::PeerListResponse");
3✔
612

613
                if peers.len() > MAX_PEER_LIST_LENGTH {
3✔
614
                    self.punish(NegativePeerSanction::FloodPeerListResponse)
×
615
                        .await?;
×
616
                }
3✔
617
                self.to_main_tx
3✔
618
                    .send(PeerTaskToMain::PeerDiscoveryAnswer((
3✔
619
                        peers,
3✔
620
                        self.peer_address,
3✔
621
                        // The distance to the revealed peers is 1 + this peer's distance
3✔
622
                        self.distance + 1,
3✔
623
                    )))
3✔
624
                    .await?;
3✔
625
                Ok(KEEP_CONNECTION_ALIVE)
3✔
626
            }
627
            PeerMessage::BlockNotificationRequest => {
628
                debug!("Got BlockNotificationRequest");
×
629

630
                peer.send(PeerMessage::BlockNotification(
×
631
                    self.global_state_lock
×
632
                        .lock_guard()
×
633
                        .await
×
634
                        .chain
635
                        .light_state()
×
636
                        .into(),
×
637
                ))
638
                .await?;
×
639

640
                Ok(KEEP_CONNECTION_ALIVE)
×
641
            }
642
            PeerMessage::BlockNotification(block_notification) => {
16✔
643
                const SYNC_CHALLENGE_COOLDOWN: Timestamp = Timestamp::minutes(10);
644

645
                let (tip_header, sync_anchor_is_set) = {
16✔
646
                    let state = self.global_state_lock.lock_guard().await;
16✔
647
                    (
16✔
648
                        *state.chain.light_state().header(),
16✔
649
                        state.net.sync_anchor.is_some(),
16✔
650
                    )
16✔
651
                };
652
                debug!(
16✔
653
                    "Got BlockNotification of height {}. Own height is {}",
×
654
                    block_notification.height, tip_header.height
655
                );
656

657
                let sync_mode_threshold = self.global_state_lock.cli().sync_mode_threshold;
16✔
658
                let now = self.now();
16✔
659
                let time_since_latest_successful_challenge = peer_state_info
16✔
660
                    .successful_sync_challenge_response_time
16✔
661
                    .map(|then| now - then);
16✔
662
                let cooldown_expired = time_since_latest_successful_challenge
16✔
663
                    .is_none_or(|time_passed| time_passed > SYNC_CHALLENGE_COOLDOWN);
16✔
664
                let exceeds_sync_mode_threshold = GlobalState::sync_mode_threshold_stateless(
16✔
665
                    &tip_header,
16✔
666
                    block_notification.height,
16✔
667
                    block_notification.cumulative_proof_of_work,
16✔
668
                    sync_mode_threshold,
16✔
669
                );
670
                if cooldown_expired && exceeds_sync_mode_threshold {
16✔
671
                    debug!("sync mode criterion satisfied.");
1✔
672

673
                    if peer_state_info.sync_challenge.is_some() {
1✔
674
                        warn!("Cannot launch new sync challenge because one is already on-going.");
×
675
                        return Ok(KEEP_CONNECTION_ALIVE);
×
676
                    }
1✔
677

678
                    info!(
1✔
679
                        "Peer indicates block which satisfies sync mode criterion, issuing challenge."
×
680
                    );
681
                    let challenge = SyncChallenge::generate(
1✔
682
                        &block_notification,
1✔
683
                        tip_header.height,
1✔
684
                        self.rng.random(),
1✔
685
                    );
686
                    peer_state_info.sync_challenge = Some(IssuedSyncChallenge::new(
1✔
687
                        challenge,
1✔
688
                        block_notification.cumulative_proof_of_work,
1✔
689
                        self.now(),
1✔
690
                    ));
1✔
691

692
                    debug!("sending challenge ...");
1✔
693
                    peer.send(PeerMessage::SyncChallenge(challenge)).await?;
1✔
694

695
                    return Ok(KEEP_CONNECTION_ALIVE);
1✔
696
                }
15✔
697

698
                peer_state_info.highest_shared_block_height = block_notification.height;
15✔
699
                let block_is_new = tip_header.cumulative_proof_of_work
15✔
700
                    < block_notification.cumulative_proof_of_work;
15✔
701

702
                debug!("block_is_new: {}", block_is_new);
15✔
703

704
                if block_is_new
15✔
705
                    && peer_state_info.fork_reconciliation_blocks.is_empty()
15✔
706
                    && !sync_anchor_is_set
14✔
707
                    && !exceeds_sync_mode_threshold
14✔
708
                {
709
                    debug!(
13✔
710
                        "sending BlockRequestByHeight to peer for block with height {}",
×
711
                        block_notification.height
712
                    );
713
                    peer.send(PeerMessage::BlockRequestByHeight(block_notification.height))
13✔
714
                        .await?;
13✔
715
                } else {
716
                    debug!(
2✔
717
                        "ignoring peer block. height {}. new: {}, reconciling_fork: {}",
×
718
                        block_notification.height,
719
                        block_is_new,
720
                        !peer_state_info.fork_reconciliation_blocks.is_empty()
×
721
                    );
722
                }
723

724
                Ok(KEEP_CONNECTION_ALIVE)
15✔
725
            }
726
            PeerMessage::SyncChallenge(sync_challenge) => {
2✔
727
                let response = {
×
728
                    log_slow_scope!(fn_name!() + "::PeerMessage::SyncChallenge");
2✔
729

730
                    info!("Got sync challenge from {}", self.peer_address.ip());
2✔
731

732
                    let response = self
2✔
733
                        .global_state_lock
2✔
734
                        .lock_guard()
2✔
735
                        .await
2✔
736
                        .response_to_sync_challenge(sync_challenge)
2✔
737
                        .await;
2✔
738

739
                    match response {
2✔
740
                        Ok(resp) => resp,
×
741
                        Err(e) => {
2✔
742
                            warn!("could not generate sync challenge response:\n{e}");
2✔
743
                            self.punish(NegativePeerSanction::InvalidSyncChallenge)
2✔
744
                                .await?;
2✔
745
                            return Ok(KEEP_CONNECTION_ALIVE);
2✔
746
                        }
747
                    }
748
                };
749

750
                info!(
×
751
                    "Responding to sync challenge from {}",
×
752
                    self.peer_address.ip()
×
753
                );
754
                peer.send(PeerMessage::SyncChallengeResponse(Box::new(response)))
×
755
                    .await?;
×
756

757
                Ok(KEEP_CONNECTION_ALIVE)
×
758
            }
759
            PeerMessage::SyncChallengeResponse(challenge_response) => {
1✔
760
                const SYNC_RESPONSE_TIMEOUT: Timestamp = Timestamp::seconds(45);
761

762
                log_slow_scope!(fn_name!() + "::PeerMessage::SyncChallengeResponse");
1✔
763
                info!(
1✔
764
                    "Got sync challenge response from {}",
×
765
                    self.peer_address.ip()
×
766
                );
767

768
                // The purpose of the sync challenge and sync challenge response
769
                // is to avoid going into sync mode based on a malicious target
770
                // fork. Instead of verifying that the claimed proof-of-work
771
                // number is correct (which would require sending and verifying,
772
                // at least, all blocks between luca (whatever that is) and the
773
                // claimed tip), we use a heuristic that requires less
774
                // communication and less verification work. The downside of
775
                // using a heuristic here is a nonzero false positive and false
776
                // negative rate. Note that the false negative event
777
                // (maliciously sending someone into sync mode based on a bogus
778
                // fork) still requires a significant amount of work from the
779
                // attacker, *in addition* to being lucky. Also, down the line
780
                // succinctness (and more specifically, recursive block
781
                // validation) obviates this entire subprotocol.
782

783
                // Did we issue a challenge?
784
                let Some(issued_challenge) = peer_state_info.sync_challenge else {
1✔
785
                    warn!("Sync challenge response was not prompted.");
×
786
                    self.punish(NegativePeerSanction::UnexpectedSyncChallengeResponse)
×
787
                        .await?;
×
788
                    return Ok(KEEP_CONNECTION_ALIVE);
×
789
                };
790

791
                // Reset the challenge, regardless of the response's success.
792
                peer_state_info.sync_challenge = None;
1✔
793

794
                // Does response match issued challenge?
795
                if !challenge_response
1✔
796
                    .matches(self.global_state_lock.cli().network, issued_challenge)
1✔
797
                {
798
                    self.punish(NegativePeerSanction::InvalidSyncChallengeResponse)
×
799
                        .await?;
×
800
                    return Ok(KEEP_CONNECTION_ALIVE);
×
801
                }
1✔
802

803
                // Does response verify?
804
                let claimed_tip_height = challenge_response.tip.header.height;
1✔
805
                let now = self.now();
1✔
806
                if !challenge_response
1✔
807
                    .is_valid(now, self.global_state_lock.cli().network)
1✔
808
                    .await
1✔
809
                {
810
                    self.punish(NegativePeerSanction::InvalidSyncChallengeResponse)
×
811
                        .await?;
×
812
                    return Ok(KEEP_CONNECTION_ALIVE);
×
813
                }
1✔
814

815
                // Does cumulative proof-of-work evolve reasonably?
816
                let own_tip_header = *self
1✔
817
                    .global_state_lock
1✔
818
                    .lock_guard()
1✔
819
                    .await
1✔
820
                    .chain
821
                    .light_state()
1✔
822
                    .header();
1✔
823
                if !challenge_response
1✔
824
                    .check_pow(self.global_state_lock.cli().network, own_tip_header.height)
1✔
825
                {
826
                    self.punish(NegativePeerSanction::FishyPowEvolutionChallengeResponse)
×
827
                        .await?;
×
828
                    return Ok(KEEP_CONNECTION_ALIVE);
×
829
                }
1✔
830

831
                // Is there some specific (*i.e.*, not aggregate) proof of work?
832
                if !challenge_response.check_difficulty(own_tip_header.difficulty) {
1✔
833
                    self.punish(NegativePeerSanction::FishyDifficultiesChallengeResponse)
×
834
                        .await?;
×
835
                    return Ok(KEEP_CONNECTION_ALIVE);
×
836
                }
1✔
837

838
                // Did it come in time?
839
                if now - issued_challenge.issued_at > SYNC_RESPONSE_TIMEOUT {
1✔
840
                    self.punish(NegativePeerSanction::TimedOutSyncChallengeResponse)
×
841
                        .await?;
×
842
                    return Ok(KEEP_CONNECTION_ALIVE);
×
843
                }
1✔
844

845
                info!("Successful sync challenge response; relaying peer tip info to main loop.");
1✔
846
                peer_state_info.successful_sync_challenge_response_time = Some(now);
1✔
847

848
                let mut sync_mmra_anchor = challenge_response.tip.body.block_mmr_accumulator;
1✔
849
                sync_mmra_anchor.append(issued_challenge.challenge.tip_digest);
1✔
850

851
                // Inform main loop
852
                self.to_main_tx
1✔
853
                    .send(PeerTaskToMain::AddPeerMaxBlockHeight {
1✔
854
                        peer_address: self.peer_address,
1✔
855
                        claimed_height: claimed_tip_height,
1✔
856
                        claimed_cumulative_pow: issued_challenge.accumulated_pow,
1✔
857
                        claimed_block_mmra: sync_mmra_anchor,
1✔
858
                    })
1✔
859
                    .await?;
1✔
860

861
                Ok(KEEP_CONNECTION_ALIVE)
1✔
862
            }
863
            PeerMessage::BlockRequestByHash(block_digest) => {
×
864
                let block = self
×
865
                    .global_state_lock
×
866
                    .lock_guard()
×
867
                    .await
×
868
                    .chain
869
                    .archival_state()
×
870
                    .get_block(block_digest)
×
871
                    .await?;
×
872

873
                match block {
×
874
                    None => {
875
                        // TODO: Consider punishing here
876
                        warn!("Peer requested unknown block with hash {}", block_digest);
×
877
                        Ok(KEEP_CONNECTION_ALIVE)
×
878
                    }
879
                    Some(b) => {
×
880
                        peer.send(PeerMessage::Block(Box::new(b.try_into().unwrap())))
×
881
                            .await?;
×
882
                        Ok(KEEP_CONNECTION_ALIVE)
×
883
                    }
884
                }
885
            }
886
            PeerMessage::BlockRequestByHeight(block_height) => {
16✔
887
                let block_response = {
15✔
888
                    log_slow_scope!(fn_name!() + "::PeerMessage::BlockRequestByHeight");
16✔
889

890
                    debug!("Got BlockRequestByHeight of height {}", block_height);
16✔
891

892
                    let canonical_block_digest = self
16✔
893
                        .global_state_lock
16✔
894
                        .lock_guard()
16✔
895
                        .await
16✔
896
                        .chain
897
                        .archival_state()
16✔
898
                        .archival_block_mmr
899
                        .ammr()
16✔
900
                        .try_get_leaf(block_height.into())
16✔
901
                        .await;
16✔
902

903
                    let Some(canonical_block_digest) = canonical_block_digest else {
16✔
904
                        let own_tip_height = self
1✔
905
                            .global_state_lock
1✔
906
                            .lock_guard()
1✔
907
                            .await
1✔
908
                            .chain
909
                            .light_state()
1✔
910
                            .header()
1✔
911
                            .height;
912
                        warn!("Got block request by height ({block_height}) for unknown block. Own tip height is {own_tip_height}.");
1✔
913
                        self.punish(NegativePeerSanction::BlockRequestUnknownHeight)
1✔
914
                            .await?;
1✔
915

916
                        return Ok(KEEP_CONNECTION_ALIVE);
1✔
917
                    };
918

919
                    let canonical_chain_block: Block = self
15✔
920
                        .global_state_lock
15✔
921
                        .lock_guard()
15✔
922
                        .await
15✔
923
                        .chain
924
                        .archival_state()
15✔
925
                        .get_block(canonical_block_digest)
15✔
926
                        .await?
15✔
927
                        .unwrap();
15✔
928

929
                    PeerMessage::Block(Box::new(canonical_chain_block.try_into().unwrap()))
15✔
930
                };
931

932
                debug!("Sending block");
15✔
933
                peer.send(block_response).await?;
15✔
934
                debug!("Sent block");
15✔
935
                Ok(KEEP_CONNECTION_ALIVE)
15✔
936
            }
937
            PeerMessage::Block(t_block) => {
32✔
938
                log_slow_scope!(fn_name!() + "::PeerMessage::Block");
32✔
939

940
                info!(
32✔
941
                    "Got new block from peer {}, height {}, mined {}",
×
942
                    self.peer_address,
943
                    t_block.header.height,
944
                    t_block.header.timestamp.standard_format()
×
945
                );
946
                let new_block_height = t_block.header.height;
32✔
947

948
                let block = match Block::try_from(*t_block) {
32✔
949
                    Ok(block) => Box::new(block),
32✔
950
                    Err(e) => {
×
951
                        warn!("Peer sent invalid block: {e:?}");
×
952
                        self.punish(NegativePeerSanction::InvalidTransferBlock)
×
953
                            .await?;
×
954

955
                        return Ok(KEEP_CONNECTION_ALIVE);
×
956
                    }
957
                };
958

959
                // Update the value for the highest known height that peer possesses iff
960
                // we are not in a fork reconciliation state.
961
                if peer_state_info.fork_reconciliation_blocks.is_empty() {
32✔
962
                    peer_state_info.highest_shared_block_height = new_block_height;
22✔
963
                }
22✔
964

965
                self.try_ensure_path(block, peer, peer_state_info).await?;
32✔
966

967
                // Reward happens as part of `try_ensure_path`
968

969
                Ok(KEEP_CONNECTION_ALIVE)
31✔
970
            }
971
            PeerMessage::BlockRequestBatch(BlockRequestBatch {
972
                known_blocks,
8✔
973
                max_response_len,
8✔
974
                anchor,
8✔
975
            }) => {
976
                debug!(
8✔
977
                    "Received BlockRequestBatch from peer {}, max_response_len: {max_response_len}",
×
978
                    self.peer_address
979
                );
980

981
                if known_blocks.len() > MAX_NUM_DIGESTS_IN_BATCH_REQUEST {
8✔
982
                    self.punish(NegativePeerSanction::BatchBlocksRequestTooManyDigests)
×
983
                        .await?;
×
984

985
                    return Ok(KEEP_CONNECTION_ALIVE);
×
986
                }
8✔
987

988
                // The last block in the list of the peers known block is the
989
                // earliest block, block with lowest height, the peer has
990
                // requested. If it does not belong to canonical chain, none of
991
                // the later will. So we can do an early abort in that case.
992
                let least_preferred = match known_blocks.last() {
8✔
993
                    Some(least_preferred) => *least_preferred,
8✔
994
                    None => {
995
                        self.punish(NegativePeerSanction::BatchBlocksRequestEmpty)
×
996
                            .await?;
×
997

998
                        return Ok(KEEP_CONNECTION_ALIVE);
×
999
                    }
1000
                };
1001

1002
                let state = self.global_state_lock.lock_guard().await;
8✔
1003
                let block_mmr_num_leafs = state.chain.light_state().header().height.next().into();
8✔
1004
                let luca_is_known = state
8✔
1005
                    .chain
8✔
1006
                    .archival_state()
8✔
1007
                    .block_belongs_to_canonical_chain(least_preferred)
8✔
1008
                    .await;
8✔
1009
                if !luca_is_known || anchor.num_leafs() > block_mmr_num_leafs {
8✔
1010
                    drop(state);
×
1011
                    self.punish(NegativePeerSanction::BatchBlocksUnknownRequest)
×
1012
                        .await?;
×
1013
                    peer.send(PeerMessage::UnableToSatisfyBatchRequest).await?;
×
1014

1015
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1016
                }
8✔
1017

1018
                // Happy case: At least *one* of the blocks referenced by peer
1019
                // is known to us.
1020
                let first_block_in_response = {
8✔
1021
                    let mut first_block_in_response: Option<BlockHeight> = None;
8✔
1022
                    for block_digest in known_blocks {
10✔
1023
                        if state
10✔
1024
                            .chain
10✔
1025
                            .archival_state()
10✔
1026
                            .block_belongs_to_canonical_chain(block_digest)
10✔
1027
                            .await
10✔
1028
                        {
1029
                            let height = state
8✔
1030
                                .chain
8✔
1031
                                .archival_state()
8✔
1032
                                .get_block_header(block_digest)
8✔
1033
                                .await
8✔
1034
                                .unwrap()
8✔
1035
                                .height;
1036
                            first_block_in_response = Some(height);
8✔
1037
                            debug!(
8✔
1038
                                "Found block in canonical chain for batch response: {}",
×
1039
                                block_digest
1040
                            );
1041
                            break;
8✔
1042
                        }
2✔
1043
                    }
1044

1045
                    first_block_in_response
8✔
1046
                        .expect("existence of LUCA should have been established already.")
8✔
1047
                };
1048

1049
                debug!(
8✔
1050
                    "Peer's most preferred block has height {first_block_in_response}.\
×
1051
                 Now building response from that height."
×
1052
                );
1053

1054
                // Get the relevant blocks, at most batch-size many, descending from the
1055
                // peer's (alleged) most canonical block. Don't exceed `max_response_len`
1056
                // or `STANDARD_BLOCK_BATCH_SIZE` number of blocks in response.
1057
                let max_response_len = cmp::min(
8✔
1058
                    max_response_len,
8✔
1059
                    self.global_state_lock.cli().sync_mode_threshold,
8✔
1060
                );
1061
                let max_response_len = cmp::max(max_response_len, MINIMUM_BLOCK_BATCH_SIZE);
8✔
1062
                let max_response_len = cmp::min(max_response_len, STANDARD_BLOCK_BATCH_SIZE);
8✔
1063

1064
                let mut digests_of_returned_blocks = Vec::with_capacity(max_response_len);
8✔
1065
                let response_start_height: u64 = first_block_in_response.into();
8✔
1066
                let mut i: u64 = 1;
8✔
1067
                while digests_of_returned_blocks.len() < max_response_len {
31✔
1068
                    let block_height = response_start_height + i;
31✔
1069
                    match state
31✔
1070
                        .chain
31✔
1071
                        .archival_state()
31✔
1072
                        .archival_block_mmr
31✔
1073
                        .ammr()
31✔
1074
                        .try_get_leaf(block_height)
31✔
1075
                        .await
31✔
1076
                    {
1077
                        Some(digest) => {
23✔
1078
                            digests_of_returned_blocks.push(digest);
23✔
1079
                        }
23✔
1080
                        None => break,
8✔
1081
                    }
1082
                    i += 1;
23✔
1083
                }
1084

1085
                let mut returned_blocks: Vec<Block> =
8✔
1086
                    Vec::with_capacity(digests_of_returned_blocks.len());
8✔
1087
                for block_digest in digests_of_returned_blocks {
31✔
1088
                    let block = state
23✔
1089
                        .chain
23✔
1090
                        .archival_state()
23✔
1091
                        .get_block(block_digest)
23✔
1092
                        .await?
23✔
1093
                        .unwrap();
23✔
1094
                    returned_blocks.push(block);
23✔
1095
                }
1096

1097
                let response = Self::batch_response(&state, returned_blocks, &anchor).await;
8✔
1098

1099
                // issue 457. do not hold lock across a peer.send(), nor self.punish()
1100
                drop(state);
8✔
1101

1102
                let Some(response) = response else {
8✔
1103
                    warn!("Unable to satisfy batch-block request");
×
1104
                    self.punish(NegativePeerSanction::BatchBlocksUnknownRequest)
×
1105
                        .await?;
×
1106
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1107
                };
1108

1109
                debug!("Returning {} blocks in batch response", response.len());
8✔
1110

1111
                let response = PeerMessage::BlockResponseBatch(response);
8✔
1112
                peer.send(response).await?;
8✔
1113

1114
                Ok(KEEP_CONNECTION_ALIVE)
8✔
1115
            }
1116
            PeerMessage::BlockResponseBatch(authenticated_blocks) => {
×
1117
                log_slow_scope!(fn_name!() + "::PeerMessage::BlockResponseBatch");
×
1118

1119
                debug!(
×
1120
                    "handling block response batch with {} blocks",
×
1121
                    authenticated_blocks.len()
×
1122
                );
1123

1124
                // (Alan:) why is there even a minimum?
1125
                if authenticated_blocks.len() < MINIMUM_BLOCK_BATCH_SIZE {
×
1126
                    warn!("Got smaller batch response than allowed");
×
1127
                    self.punish(NegativePeerSanction::TooShortBlockBatch)
×
1128
                        .await?;
×
1129
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1130
                }
×
1131

1132
                // Verify that we are in fact in syncing mode
1133
                // TODO: Separate peer messages into those allowed under syncing
1134
                // and those that are not
1135
                let Some(sync_anchor) = self
×
1136
                    .global_state_lock
×
1137
                    .lock_guard()
×
1138
                    .await
×
1139
                    .net
1140
                    .sync_anchor
1141
                    .clone()
×
1142
                else {
1143
                    warn!("Received a batch of blocks without being in syncing mode");
×
1144
                    self.punish(NegativePeerSanction::ReceivedBatchBlocksOutsideOfSync)
×
1145
                        .await?;
×
1146
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1147
                };
1148

1149
                // Verify that the response matches the current state
1150
                // We get the latest block from the DB here since this message is
1151
                // only valid for archival nodes.
1152
                let (first_block, _) = &authenticated_blocks[0];
×
1153
                let first_blocks_parent_digest: Digest = first_block.header.prev_block_digest;
×
1154
                let most_canonical_own_block_match: Option<Block> = self
×
1155
                    .global_state_lock
×
1156
                    .lock_guard()
×
1157
                    .await
×
1158
                    .chain
1159
                    .archival_state()
×
1160
                    .get_block(first_blocks_parent_digest)
×
1161
                    .await
×
1162
                    .expect("Block lookup must succeed");
×
1163
                let most_canonical_own_block_match: Block = match most_canonical_own_block_match {
×
1164
                    Some(block) => block,
×
1165
                    None => {
1166
                        warn!("Got batch response with invalid start block");
×
1167
                        self.punish(NegativePeerSanction::BatchBlocksInvalidStartHeight)
×
1168
                            .await?;
×
1169
                        return Ok(KEEP_CONNECTION_ALIVE);
×
1170
                    }
1171
                };
1172

1173
                // Convert all blocks to Block objects
1174
                debug!(
×
1175
                    "Found own block of height {} to match received batch",
×
1176
                    most_canonical_own_block_match.kernel.header.height
×
1177
                );
1178
                let mut received_blocks = vec![];
×
1179
                for (t_block, membership_proof) in authenticated_blocks {
×
1180
                    let Ok(block) = Block::try_from(t_block) else {
×
1181
                        warn!("Received invalid transfer block from peer");
×
1182
                        self.punish(NegativePeerSanction::InvalidTransferBlock)
×
1183
                            .await?;
×
1184
                        return Ok(KEEP_CONNECTION_ALIVE);
×
1185
                    };
1186

1187
                    if !membership_proof.verify(
×
1188
                        block.header().height.into(),
×
1189
                        block.hash(),
×
1190
                        &sync_anchor.block_mmr.peaks(),
×
1191
                        sync_anchor.block_mmr.num_leafs(),
×
1192
                    ) {
×
1193
                        warn!("Authentication of received block fails relative to anchor");
×
1194
                        self.punish(NegativePeerSanction::InvalidBlockMmrAuthentication)
×
1195
                            .await?;
×
1196
                        return Ok(KEEP_CONNECTION_ALIVE);
×
1197
                    }
×
1198

1199
                    received_blocks.push(block);
×
1200
                }
1201

1202
                // Get the latest block that we know of and handle all received blocks
1203
                self.handle_blocks(received_blocks, most_canonical_own_block_match)
×
1204
                    .await?;
×
1205

1206
                // Reward happens as part of `handle_blocks`.
1207

1208
                Ok(KEEP_CONNECTION_ALIVE)
×
1209
            }
1210
            PeerMessage::UnableToSatisfyBatchRequest => {
1211
                log_slow_scope!(fn_name!() + "::PeerMessage::UnableToSatisfyBatchRequest");
×
1212
                warn!(
×
1213
                    "Peer {} reports inability to satisfy batch request.",
×
1214
                    self.peer_address
1215
                );
1216

1217
                Ok(KEEP_CONNECTION_ALIVE)
×
1218
            }
1219
            PeerMessage::Handshake { .. } => {
1220
                log_slow_scope!(fn_name!() + "::PeerMessage::Handshake");
×
1221

1222
                // The handshake should have been sent during connection
1223
                // initialization. Here it is out of order at best, malicious at
1224
                // worst.
1225
                self.punish(NegativePeerSanction::InvalidMessage).await?;
×
1226
                Ok(KEEP_CONNECTION_ALIVE)
×
1227
            }
1228
            PeerMessage::ConnectionStatus(_) => {
1229
                log_slow_scope!(fn_name!() + "::PeerMessage::ConnectionStatus");
×
1230

1231
                // The connection status should have been sent during connection
1232
                // initialization. Here it is out of order at best, malicious at
1233
                // worst.
1234

1235
                self.punish(NegativePeerSanction::InvalidMessage).await?;
×
1236
                Ok(KEEP_CONNECTION_ALIVE)
×
1237
            }
1238
            PeerMessage::Transaction(transaction) => {
7✔
1239
                log_slow_scope!(fn_name!() + "::PeerMessage::Transaction");
7✔
1240

1241
                debug!(
7✔
1242
                    "`peer_loop` received following transaction from peer. {} inputs, {} outputs. Synced to mutator set hash: {}",
×
1243
                    transaction.kernel.inputs.len(),
×
1244
                    transaction.kernel.outputs.len(),
×
1245
                    transaction.kernel.mutator_set_hash
×
1246
                );
1247

1248
                let transaction: Transaction = (*transaction).into();
7✔
1249

1250
                let (tip, mutator_set_accumulator_after, current_block_height) = {
7✔
1251
                    let state = self.global_state_lock.lock_guard().await;
7✔
1252

1253
                    (
7✔
1254
                        state.chain.light_state().hash(),
7✔
1255
                        state
7✔
1256
                            .chain
7✔
1257
                            .light_state()
7✔
1258
                            .mutator_set_accumulator_after()
7✔
1259
                            .expect("Block from state must have mutator set after"),
7✔
1260
                        state.chain.light_state().header().height,
7✔
1261
                    )
7✔
1262
                };
1263

1264
                // 1. If transaction is invalid, punish.
1265
                let network = self.global_state_lock.cli().network;
7✔
1266
                let consensus_rule_set =
7✔
1267
                    ConsensusRuleSet::infer_from(network, current_block_height);
7✔
1268
                if !transaction.is_valid(network, consensus_rule_set).await {
7✔
1269
                    warn!("Received invalid tx");
×
1270
                    self.punish(NegativePeerSanction::InvalidTransaction)
×
1271
                        .await?;
×
1272
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1273
                }
7✔
1274

1275
                // 2. If transaction has coinbase, punish.
1276
                // Transactions received from peers have not been mined yet.
1277
                // Only the miner is allowed to produce transactions with non-empty coinbase fields.
1278
                if transaction.kernel.coinbase.is_some() {
7✔
1279
                    warn!("Received non-mined transaction with coinbase.");
×
1280
                    self.punish(NegativePeerSanction::NonMinedTransactionHasCoinbase)
×
1281
                        .await?;
×
1282
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1283
                }
7✔
1284

1285
                // 3. If negative fee, punish.
1286
                if transaction.kernel.fee.is_negative() {
7✔
1287
                    warn!("Received negative-fee transaction.");
×
1288
                    self.punish(NegativePeerSanction::TransactionWithNegativeFee)
×
1289
                        .await?;
×
1290
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1291
                }
7✔
1292

1293
                // 4. If transaction is already known, ignore.
1294
                if self
7✔
1295
                    .global_state_lock
7✔
1296
                    .lock_guard()
7✔
1297
                    .await
7✔
1298
                    .mempool
1299
                    .contains_with_higher_proof_quality(
7✔
1300
                        transaction.kernel.txid(),
7✔
1301
                        transaction.proof.proof_quality()?,
7✔
1302
                        transaction.kernel.mutator_set_hash,
7✔
1303
                    )
1304
                {
1305
                    warn!("Received transaction that was already known");
×
1306

1307
                    // We received a transaction that we *probably* haven't requested.
1308
                    // Consider punishing here, if this is abused.
1309
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1310
                }
7✔
1311

1312
                // 5. if transaction is not confirmable, punish.
1313
                if !transaction.is_confirmable_relative_to(&mutator_set_accumulator_after) {
7✔
1314
                    warn!(
×
1315
                        "Received unconfirmable transaction with TXID {}. Unconfirmable because:",
×
1316
                        transaction.kernel.txid()
×
1317
                    );
1318
                    // get fine-grained error code for informative logging
1319
                    let confirmability_error_code = transaction
×
1320
                        .kernel
×
1321
                        .is_confirmable_relative_to(&mutator_set_accumulator_after);
×
1322
                    match confirmability_error_code {
×
1323
                        Ok(_) => unreachable!(),
×
1324
                        Err(TransactionConfirmabilityError::InvalidRemovalRecord(index)) => {
×
1325
                            warn!("invalid removal record (at index {index})");
×
1326
                            let invalid_removal_record = transaction.kernel.inputs[index].clone();
×
1327
                            let removal_record_error_code = invalid_removal_record
×
1328
                                .validate_inner(&mutator_set_accumulator_after);
×
1329
                            debug!(
×
1330
                                "Absolute index set of removal record {index}: {:?}",
×
1331
                                invalid_removal_record.absolute_indices
1332
                            );
1333
                            match removal_record_error_code {
×
1334
                                Ok(_) => unreachable!(),
×
1335
                                Err(RemovalRecordValidityError::AbsentAuthenticatedChunk) => {
1336
                                    debug!("invalid because membership proof is missing");
×
1337
                                }
1338
                                Err(RemovalRecordValidityError::InvalidSwbfiMmrMp {
1339
                                    chunk_index,
×
1340
                                }) => {
1341
                                    debug!("invalid because membership proof for chunk index {chunk_index} is invalid");
×
1342
                                }
1343
                            };
1344
                            self.punish(NegativePeerSanction::UnconfirmableTransaction)
×
1345
                                .await?;
×
1346
                            return Ok(KEEP_CONNECTION_ALIVE);
×
1347
                        }
1348
                        Err(TransactionConfirmabilityError::DuplicateInputs) => {
1349
                            warn!("duplicate inputs");
×
1350
                            self.punish(NegativePeerSanction::DoubleSpendingTransaction)
×
1351
                                .await?;
×
1352
                            return Ok(KEEP_CONNECTION_ALIVE);
×
1353
                        }
1354
                        Err(TransactionConfirmabilityError::AlreadySpentInput(index)) => {
×
1355
                            warn!("already spent input (at index {index})");
×
1356
                            self.punish(NegativePeerSanction::DoubleSpendingTransaction)
×
1357
                                .await?;
×
1358
                            return Ok(KEEP_CONNECTION_ALIVE);
×
1359
                        }
1360
                        Err(TransactionConfirmabilityError::RemovalRecordUnpackFailure) => {
1361
                            warn!("Failed to unpack removal records");
×
1362
                            self.punish(NegativePeerSanction::InvalidTransaction)
×
1363
                                .await?;
×
1364
                            return Ok(KEEP_CONNECTION_ALIVE);
×
1365
                        }
1366
                    };
1367
                }
7✔
1368

1369
                // If transaction cannot be applied to mutator set, punish.
1370
                // I don't think this can happen when above checks pass but we include
1371
                // the check to ensure that transaction can be applied.
1372

1373
                // TODO: Try unpacking tx-inputs
1374
                let ms_update = MutatorSetUpdate::new(
7✔
1375
                    transaction.kernel.inputs.clone(),
7✔
1376
                    transaction.kernel.outputs.clone(),
7✔
1377
                );
1378
                let can_apply = ms_update
7✔
1379
                    .apply_to_accumulator(&mut mutator_set_accumulator_after.clone())
7✔
1380
                    .is_ok();
7✔
1381
                if !can_apply {
7✔
1382
                    warn!("Cannot apply transaction to current mutator set.");
×
1383
                    warn!("Transaction ID: {}", transaction.kernel.txid());
×
1384
                    self.punish(NegativePeerSanction::CannotApplyTransactionToMutatorSet)
×
1385
                        .await?;
×
1386
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1387
                }
7✔
1388

1389
                let tx_timestamp = transaction.kernel.timestamp;
7✔
1390

1391
                // 6. Ignore if transaction is too old
1392
                let now = self.now();
7✔
1393
                if tx_timestamp < now - Timestamp::seconds(MEMPOOL_TX_THRESHOLD_AGE_IN_SECS) {
7✔
1394
                    // TODO: Consider punishing here
1395
                    warn!("Received too old tx");
×
1396
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1397
                }
7✔
1398

1399
                // 7. Ignore if transaction is too far into the future
1400
                if tx_timestamp
7✔
1401
                    > now + Timestamp::seconds(MEMPOOL_IGNORE_TRANSACTIONS_THIS_MANY_SECS_AHEAD)
7✔
1402
                {
1403
                    // TODO: Consider punishing here
1404
                    warn!("Received tx too far into the future. Got timestamp: {tx_timestamp:?}");
×
1405
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1406
                }
7✔
1407

1408
                // Otherwise, relay to main
1409
                let pt2m_transaction = PeerTaskToMainTransaction {
7✔
1410
                    transaction,
7✔
1411
                    confirmable_for_block: tip,
7✔
1412
                };
7✔
1413
                self.to_main_tx
7✔
1414
                    .send(PeerTaskToMain::Transaction(Box::new(pt2m_transaction)))
7✔
1415
                    .await?;
7✔
1416

1417
                Ok(KEEP_CONNECTION_ALIVE)
7✔
1418
            }
1419
            PeerMessage::TransactionNotification(tx_notification) => {
14✔
1420
                // addresses #457
1421
                // new scope for state read-lock to avoid holding across peer.send()
1422
                {
1423
                    log_slow_scope!(fn_name!() + "::PeerMessage::TransactionNotification");
14✔
1424

1425
                    // 1. Ignore if we already know this transaction, and
1426
                    // the proof quality is not higher than what we already know.
1427
                    let state = self.global_state_lock.lock_guard().await;
14✔
1428
                    let transaction_of_same_or_higher_proof_quality_is_known =
14✔
1429
                        state.mempool.contains_with_higher_proof_quality(
14✔
1430
                            tx_notification.txid,
14✔
1431
                            tx_notification.proof_quality,
14✔
1432
                            tx_notification.mutator_set_hash,
14✔
1433
                        );
1434
                    if transaction_of_same_or_higher_proof_quality_is_known {
14✔
1435
                        debug!("transaction with same or higher proof quality was already known");
7✔
1436
                        return Ok(KEEP_CONNECTION_ALIVE);
7✔
1437
                    }
7✔
1438

1439
                    // Only accept transactions that do not require executing
1440
                    // `update`.
1441
                    if state
7✔
1442
                        .chain
7✔
1443
                        .light_state()
7✔
1444
                        .mutator_set_accumulator_after()
7✔
1445
                        .expect("Block from state must have mutator set after")
7✔
1446
                        .hash()
7✔
1447
                        != tx_notification.mutator_set_hash
7✔
1448
                    {
1449
                        debug!("transaction refers to non-canonical mutator set state");
×
1450
                        return Ok(KEEP_CONNECTION_ALIVE);
×
1451
                    }
7✔
1452
                }
1453

1454
                // 2. Request the actual `Transaction` from peer
1455
                debug!("requesting transaction from peer");
7✔
1456
                peer.send(PeerMessage::TransactionRequest(tx_notification.txid))
7✔
1457
                    .await?;
7✔
1458

1459
                Ok(KEEP_CONNECTION_ALIVE)
7✔
1460
            }
1461
            PeerMessage::TransactionRequest(transaction_identifier) => {
5✔
1462
                let state = self.global_state_lock.lock_guard().await;
5✔
1463
                let Some(transaction) = state.mempool.get(transaction_identifier) else {
5✔
1464
                    return Ok(KEEP_CONNECTION_ALIVE);
1✔
1465
                };
1466

1467
                let Ok(transfer_transaction) = transaction.try_into() else {
4✔
1468
                    warn!("Peer requested transaction that cannot be converted to transfer object");
×
1469
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1470
                };
1471

1472
                // Drop state immediately to prevent holding over a response.
1473
                drop(state);
4✔
1474

1475
                peer.send(PeerMessage::Transaction(Box::new(transfer_transaction)))
4✔
1476
                    .await?;
4✔
1477

1478
                Ok(KEEP_CONNECTION_ALIVE)
4✔
1479
            }
1480
            PeerMessage::BlockProposalNotification(block_proposal_notification) => {
1✔
1481
                let verdict = self
1✔
1482
                    .global_state_lock
1✔
1483
                    .lock_guard()
1✔
1484
                    .await
1✔
1485
                    .favor_incoming_block_proposal_legacy(
1✔
1486
                        block_proposal_notification.height,
1✔
1487
                        block_proposal_notification.guesser_fee,
1✔
1488
                    );
1489
                match verdict {
1✔
1490
                    Ok(_) => {
1491
                        peer.send(PeerMessage::BlockProposalRequest(
1✔
1492
                            BlockProposalRequest::new(block_proposal_notification.body_mast_hash),
1✔
1493
                        ))
1✔
1494
                        .await?
1✔
1495
                    }
1496
                    Err(reject_reason) => {
×
1497
                        info!(
×
1498
                        "Rejecting notification of block proposal with guesser fee {} from peer \
×
1499
                        {}. Reason:\n{reject_reason}",
×
1500
                        block_proposal_notification.guesser_fee.display_n_decimals(5),
×
1501
                        self.peer_address
1502
                    )
1503
                    }
1504
                }
1505

1506
                Ok(KEEP_CONNECTION_ALIVE)
1✔
1507
            }
1508
            PeerMessage::BlockProposalRequest(block_proposal_request) => {
×
1509
                let matching_proposal = self
×
1510
                    .global_state_lock
×
1511
                    .lock_guard()
×
1512
                    .await
×
1513
                    .mining_state
1514
                    .block_proposal
1515
                    .filter(|x| x.body().mast_hash() == block_proposal_request.body_mast_hash)
×
1516
                    .map(|x| x.to_owned());
×
1517
                if let Some(proposal) = matching_proposal {
×
1518
                    peer.send(PeerMessage::BlockProposal(Box::new(proposal)))
×
1519
                        .await?;
×
1520
                } else {
1521
                    self.punish(NegativePeerSanction::BlockProposalNotFound)
×
1522
                        .await?;
×
1523
                }
1524

1525
                Ok(KEEP_CONNECTION_ALIVE)
×
1526
            }
1527
            PeerMessage::BlockProposal(new_proposal) => {
1✔
1528
                info!("Got block proposal from peer.");
1✔
1529

1530
                // Is the proposal valid?
1531
                // Lock needs to be held here because race conditions: otherwise
1532
                // the block proposal that was validated might not match with
1533
                // the one whose favorability is being computed.
1534
                let state = self.global_state_lock.lock_guard().await;
1✔
1535
                let tip = state.chain.light_state();
1✔
1536
                let proposal_is_valid = new_proposal
1✔
1537
                    .is_valid(tip, self.now(), self.global_state_lock.cli().network)
1✔
1538
                    .await;
1✔
1539
                if !proposal_is_valid {
1✔
1540
                    drop(state);
×
1541
                    self.punish(NegativePeerSanction::InvalidBlockProposal)
×
1542
                        .await?;
×
1543
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1544
                }
1✔
1545

1546
                // Is block proposal favorable?
1547
                let is_favorable = state.favor_incoming_block_proposal(
1✔
1548
                    new_proposal.header().prev_block_digest,
1✔
1549
                    new_proposal
1✔
1550
                        .total_guesser_reward()
1✔
1551
                        .expect("Block was validated"),
1✔
1552
                );
1✔
1553
                drop(state);
1✔
1554

1555
                if let Err(rejection_reason) = is_favorable {
1✔
1556
                    match rejection_reason {
×
1557
                        // no need to punish and log if the fees are equal.  we just ignore the incoming proposal.
1558
                        BlockProposalRejectError::InsufficientFee { current, received }
×
1559
                            if Some(received) == current =>
×
1560
                        {
1561
                            debug!("ignoring new block proposal because the fee is equal to the present one");
×
1562
                            return Ok(KEEP_CONNECTION_ALIVE);
×
1563
                        }
1564
                        _ => {
1565
                            warn!("Rejecting new block proposal:\n{rejection_reason}");
×
1566
                            self.punish(NegativePeerSanction::NonFavorableBlockProposal)
×
1567
                                .await?;
×
1568
                            return Ok(KEEP_CONNECTION_ALIVE);
×
1569
                        }
1570
                    }
1571
                };
1✔
1572

1573
                self.send_to_main(PeerTaskToMain::BlockProposal(new_proposal), line!())
1✔
1574
                    .await?;
1✔
1575

1576
                // Valuable, new, hard-to-produce information. Reward peer.
1577
                self.reward(PositivePeerSanction::NewBlockProposal).await?;
1✔
1578

1579
                Ok(KEEP_CONNECTION_ALIVE)
1✔
1580
            }
1581
        }
1582
    }
154✔
1583

1584
    /// send msg to main via mpsc channel `to_main_tx` and logs if slow.
1585
    ///
1586
    /// the channel could potentially fill up in which case the send() will
1587
    /// block until there is capacity.  we wrap the send() so we can log if
1588
    /// that ever happens to the extent it passes slow-scope threshold.
1589
    async fn send_to_main(
1✔
1590
        &self,
1✔
1591
        msg: PeerTaskToMain,
1✔
1592
        line: u32,
1✔
1593
    ) -> Result<(), tokio::sync::mpsc::error::SendError<PeerTaskToMain>> {
1✔
1594
        // we measure across the send() in case the channel ever fills up.
1595
        log_slow_scope!(fn_name!() + &format!("peer_loop.rs:{}", line));
1✔
1596

1597
        self.to_main_tx.send(msg).await
1✔
1598
    }
1✔
1599

1600
    /// Handle message from main task. The boolean return value indicates if
1601
    /// the connection should be closed.
1602
    ///
1603
    /// Locking:
1604
    ///   * acquires `global_state_lock` for write via Self::punish()
1605
    async fn handle_main_task_message<S>(
29✔
1606
        &mut self,
29✔
1607
        msg: MainToPeerTask,
29✔
1608
        peer: &mut S,
29✔
1609
        peer_state_info: &mut MutablePeerState,
29✔
1610
    ) -> Result<bool>
29✔
1611
    where
29✔
1612
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
29✔
1613
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
29✔
1614
        <S as TryStream>::Error: std::error::Error,
29✔
1615
    {
29✔
1616
        debug!("Handling {} message from main in peer loop", msg.get_type());
29✔
1617
        match msg {
29✔
1618
            MainToPeerTask::Block(block) => {
23✔
1619
                // We don't currently differentiate whether a new block came from a peer, or from our
1620
                // own miner. It's always shared through this logic.
1621
                let new_block_height = block.kernel.header.height;
23✔
1622
                if new_block_height > peer_state_info.highest_shared_block_height {
23✔
1623
                    debug!("Sending PeerMessage::BlockNotification");
12✔
1624
                    peer_state_info.highest_shared_block_height = new_block_height;
12✔
1625
                    peer.send(PeerMessage::BlockNotification(block.as_ref().into()))
12✔
1626
                        .await?;
12✔
1627
                    debug!("Sent PeerMessage::BlockNotification");
12✔
1628
                }
11✔
1629
                Ok(KEEP_CONNECTION_ALIVE)
23✔
1630
            }
1631
            MainToPeerTask::RequestBlockBatch(batch_block_request) => {
×
1632
                // Only ask one of the peers about the batch of blocks
1633
                if batch_block_request.peer_addr_target != self.peer_address {
×
1634
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1635
                }
×
1636

1637
                let max_response_len = std::cmp::min(
×
1638
                    STANDARD_BLOCK_BATCH_SIZE,
1639
                    self.global_state_lock.cli().sync_mode_threshold,
×
1640
                );
1641

1642
                peer.send(PeerMessage::BlockRequestBatch(BlockRequestBatch {
×
1643
                    known_blocks: batch_block_request.known_blocks,
×
1644
                    max_response_len,
×
1645
                    anchor: batch_block_request.anchor_mmr,
×
1646
                }))
×
1647
                .await?;
×
1648

1649
                Ok(KEEP_CONNECTION_ALIVE)
×
1650
            }
1651
            MainToPeerTask::PeerSynchronizationTimeout(socket_addr) => {
×
1652
                log_slow_scope!(fn_name!() + "::MainToPeerTask::PeerSynchronizationTimeout");
×
1653

1654
                if self.peer_address != socket_addr {
×
1655
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1656
                }
×
1657

1658
                self.punish(NegativePeerSanction::SynchronizationTimeout)
×
1659
                    .await?;
×
1660

1661
                // If this peer failed the last synchronization attempt, we only
1662
                // sanction, we don't disconnect.
1663
                Ok(KEEP_CONNECTION_ALIVE)
×
1664
            }
1665
            MainToPeerTask::MakePeerDiscoveryRequest => {
1666
                peer.send(PeerMessage::PeerListRequest).await?;
×
1667
                Ok(KEEP_CONNECTION_ALIVE)
×
1668
            }
1669
            MainToPeerTask::Disconnect(peer_address) => {
×
1670
                log_slow_scope!(fn_name!() + "::MainToPeerTask::Disconnect");
×
1671

1672
                // Only disconnect from the peer the main task requested a disconnect for.
1673
                if peer_address != self.peer_address {
×
1674
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1675
                }
×
1676
                self.register_peer_disconnection().await;
×
1677

1678
                Ok(DISCONNECT_CONNECTION)
×
1679
            }
1680
            MainToPeerTask::DisconnectAll() => {
1681
                self.register_peer_disconnection().await;
×
1682

1683
                Ok(DISCONNECT_CONNECTION)
×
1684
            }
1685
            MainToPeerTask::MakeSpecificPeerDiscoveryRequest(target_socket_addr) => {
×
1686
                if target_socket_addr == self.peer_address {
×
1687
                    peer.send(PeerMessage::PeerListRequest).await?;
×
1688
                }
×
1689
                Ok(KEEP_CONNECTION_ALIVE)
×
1690
            }
1691
            MainToPeerTask::TransactionNotification(transaction_notification) => {
6✔
1692
                debug!("Sending PeerMessage::TransactionNotification");
6✔
1693
                peer.send(PeerMessage::TransactionNotification(
6✔
1694
                    transaction_notification,
6✔
1695
                ))
6✔
1696
                .await?;
6✔
1697
                debug!("Sent PeerMessage::TransactionNotification");
6✔
1698
                Ok(KEEP_CONNECTION_ALIVE)
6✔
1699
            }
1700
            MainToPeerTask::BlockProposalNotification(block_proposal_notification) => {
×
1701
                debug!("Sending PeerMessage::BlockProposalNotification");
×
1702
                peer.send(PeerMessage::BlockProposalNotification(
×
1703
                    block_proposal_notification,
×
1704
                ))
×
1705
                .await?;
×
1706
                debug!("Sent PeerMessage::BlockProposalNotification");
×
1707
                Ok(KEEP_CONNECTION_ALIVE)
×
1708
            }
1709
        }
1710
    }
29✔
1711

1712
    /// Loop for the peer tasks. Awaits either a message from the peer over TCP,
1713
    /// or a message from main over the main-to-peer-tasks broadcast channel.
1714
    async fn run<S>(
50✔
1715
        &mut self,
50✔
1716
        mut peer: S,
50✔
1717
        mut from_main_rx: broadcast::Receiver<MainToPeerTask>,
50✔
1718
        peer_state_info: &mut MutablePeerState,
50✔
1719
    ) -> Result<()>
50✔
1720
    where
50✔
1721
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
50✔
1722
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
50✔
1723
        <S as TryStream>::Error: std::error::Error,
50✔
1724
    {
50✔
1725
        loop {
1726
            select! {
190✔
1727
                // Handle peer messages
1728
                peer_message = peer.try_next() => {
190✔
1729
                    let peer_address = self.peer_address;
155✔
1730
                    let peer_message = match peer_message {
155✔
1731
                        Ok(message) => message,
154✔
1732
                        Err(err) => {
1✔
1733
                            // Don't disconnect if message type is unknown, as
1734
                            // this allows the adding of new message types in
1735
                            // the future. Consider only keeping connection open
1736
                            // if this is a deserialization error, and close
1737
                            // otherwise.
1738
                            let msg = format!("Error when receiving from peer: {peer_address}");
1✔
1739
                            warn!("{msg}. Error: {err}");
1✔
1740
                            self.punish(NegativePeerSanction::InvalidMessage).await?;
1✔
1741
                            continue;
1✔
1742
                        }
1743
                    };
1744
                    let Some(peer_message) = peer_message else {
154✔
1745
                        info!("Peer {peer_address} closed connection.");
×
1746
                        break;
×
1747
                    };
1748

1749
                    let syncing =
154✔
1750
                        self.global_state_lock.lock(|s| s.net.sync_anchor.is_some()).await;
154✔
1751
                    let message_type = peer_message.get_type();
154✔
1752
                    if peer_message.ignore_during_sync() && syncing {
154✔
1753
                        debug!(
×
1754
                            "Ignoring {message_type} message when syncing, from {peer_address}",
×
1755
                        );
1756
                        continue;
×
1757
                    }
154✔
1758
                    if peer_message.ignore_when_not_sync() && !syncing {
154✔
1759
                        debug!(
×
1760
                            "Ignoring {message_type} message when not syncing, from {peer_address}",
×
1761
                        );
1762
                        continue;
×
1763
                    }
154✔
1764

1765
                    match self
154✔
1766
                        .handle_peer_message(peer_message, &mut peer, peer_state_info)
154✔
1767
                        .await
154✔
1768
                    {
1769
                        Ok(false) => {}
110✔
1770
                        Ok(true) => {
1771
                            info!("Closing connection to {peer_address}");
43✔
1772
                            break;
43✔
1773
                        }
1774
                        Err(err) => {
1✔
1775
                            warn!("Closing connection to {peer_address} because of error {err}.");
1✔
1776
                            bail!("{err}");
1✔
1777
                        }
1778
                    };
1779
                }
1780

1781
                // Handle messages from main task
1782
                main_msg_res = from_main_rx.recv() => {
190✔
1783
                    let main_msg = main_msg_res.unwrap_or_else(|err| {
29✔
1784
                        let err_msg = format!("Failed to read from main loop: {err}");
×
1785
                        error!(err_msg);
×
1786
                        panic!("{err_msg}");
×
1787
                    });
1788
                    let close_connection = self
29✔
1789
                        .handle_main_task_message(main_msg, &mut peer, peer_state_info)
29✔
1790
                        .await
29✔
1791
                        .unwrap_or_else(|err| {
29✔
1792
                            warn!("handle_main_task_message returned an error: {err}");
×
1793
                            true
×
1794
                        });
×
1795

1796
                    if close_connection {
29✔
1797
                        info!(
×
1798
                            "handle_main_task_message is closing the connection to {}",
×
1799
                            self.peer_address
1800
                        );
1801
                        break;
×
1802
                    }
29✔
1803
                }
1804
            }
1805
        }
1806

1807
        Ok(())
43✔
1808
    }
44✔
1809

1810
    /// Function called before entering the peer loop. Reads the potentially stored
1811
    /// peer standing from the database and does other book-keeping before entering
1812
    /// its final resting place: the `peer_loop`. Note that the peer has already been
1813
    /// accepted for a connection for this loop to be entered. So we don't need
1814
    /// to check the standing again.
1815
    ///
1816
    /// Locking:
1817
    ///   * acquires `global_state_lock` for write
1818
    pub(crate) async fn run_wrapper<S>(
39✔
1819
        &mut self,
39✔
1820
        mut peer: S,
39✔
1821
        from_main_rx: broadcast::Receiver<MainToPeerTask>,
39✔
1822
    ) -> Result<()>
39✔
1823
    where
39✔
1824
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
39✔
1825
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
39✔
1826
        <S as TryStream>::Error: std::error::Error,
39✔
1827
    {
39✔
1828
        const TIME_DIFFERENCE_WARN_THRESHOLD_IN_SECONDS: i128 = 120;
1829

1830
        let cli_args = self.global_state_lock.cli().clone();
39✔
1831

1832
        let standing = self
39✔
1833
            .global_state_lock
39✔
1834
            .lock_guard()
39✔
1835
            .await
39✔
1836
            .net
1837
            .peer_databases
1838
            .peer_standings
1839
            .get(self.peer_address.ip())
39✔
1840
            .await
39✔
1841
            .unwrap_or_else(|| PeerStanding::new(cli_args.peer_tolerance));
38✔
1842

1843
        // Add peer to peer map
1844
        let peer_connection_info = PeerConnectionInfo::new(
38✔
1845
            self.peer_handshake_data.listen_port,
38✔
1846
            self.peer_address,
38✔
1847
            self.inbound_connection,
38✔
1848
        );
1849
        let new_peer = PeerInfo::new(
38✔
1850
            peer_connection_info,
38✔
1851
            &self.peer_handshake_data,
38✔
1852
            SystemTime::now(),
38✔
1853
            cli_args.peer_tolerance,
38✔
1854
        )
1855
        .with_standing(standing);
38✔
1856

1857
        // If timestamps are different, we currently just log a warning.
1858
        let peer_clock_ahead_in_seconds = new_peer.time_difference_in_seconds();
38✔
1859
        let own_clock_ahead_in_seconds = -peer_clock_ahead_in_seconds;
38✔
1860
        if peer_clock_ahead_in_seconds > TIME_DIFFERENCE_WARN_THRESHOLD_IN_SECONDS
38✔
1861
            || own_clock_ahead_in_seconds > TIME_DIFFERENCE_WARN_THRESHOLD_IN_SECONDS
38✔
1862
        {
1863
            let own_datetime_utc: DateTime<Utc> =
×
1864
                new_peer.own_timestamp_connection_established.into();
×
1865
            let peer_datetime_utc: DateTime<Utc> =
×
1866
                new_peer.peer_timestamp_connection_established.into();
×
1867
            warn!(
×
1868
                "New peer {} disagrees with us about time. Peer reports time {} but our clock at handshake was {}.",
×
1869
                new_peer.connected_address(),
×
1870
                peer_datetime_utc.format("%Y-%m-%d %H:%M:%S"),
×
1871
                own_datetime_utc.format("%Y-%m-%d %H:%M:%S"));
×
1872
        }
38✔
1873

1874
        // Multiple tasks might attempt to set up a connection concurrently. So
1875
        // even though we've checked that this connection is allowed, this check
1876
        // could have been invalidated by another task, for one accepting an
1877
        // incoming connection from a peer we're currently connecting to. So we
1878
        // need to make the a check again while holding a write-lock, since
1879
        // we're modifying `peer_map` here. Holding a read-lock doesn't work
1880
        // since it would have to be dropped before acquiring the write-lock.
1881
        {
1882
            let mut global_state = self.global_state_lock.lock_guard_mut().await;
38✔
1883
            let peer_map = &mut global_state.net.peer_map;
38✔
1884
            if peer_map
38✔
1885
                .values()
38✔
1886
                .any(|pi| pi.instance_id() == self.peer_handshake_data.instance_id)
38✔
1887
            {
1888
                bail!("Attempted to connect to already connected peer. Aborting connection.");
×
1889
            }
38✔
1890

1891
            if peer_map.len() >= cli_args.max_num_peers {
38✔
1892
                bail!("Attempted to connect to more peers than allowed. Aborting connection.");
×
1893
            }
38✔
1894

1895
            if peer_map.contains_key(&self.peer_address) {
38✔
1896
                // This shouldn't be possible, unless the peer reports a different instance ID than
1897
                // for the other connection. Only a malignant client would do that.
1898
                bail!("Already connected to peer. Aborting connection");
×
1899
            }
38✔
1900

1901
            peer_map.insert(self.peer_address, new_peer);
38✔
1902
        }
1903

1904
        // `MutablePeerState` contains the part of the peer-loop's state that is mutable
1905
        let mut peer_state = MutablePeerState::new(self.peer_handshake_data.tip_header.height);
38✔
1906

1907
        // If peer indicates more canonical block, request a block notification to catch up ASAP
1908
        if self.peer_handshake_data.tip_header.cumulative_proof_of_work
38✔
1909
            > self
38✔
1910
                .global_state_lock
38✔
1911
                .lock_guard()
38✔
1912
                .await
38✔
1913
                .chain
1914
                .light_state()
38✔
1915
                .kernel
1916
                .header
1917
                .cumulative_proof_of_work
1918
        {
1919
            // Send block notification request to catch up ASAP, in case we're
1920
            // behind the newly-connected peer.
1921
            peer.send(PeerMessage::BlockNotificationRequest).await?;
×
1922
        }
38✔
1923

1924
        let res = self.run(peer, from_main_rx, &mut peer_state).await;
38✔
1925
        debug!("Exited peer loop for {}", self.peer_address);
32✔
1926

1927
        close_peer_connected_callback(
32✔
1928
            self.global_state_lock.clone(),
32✔
1929
            self.peer_address,
32✔
1930
            &self.to_main_tx,
32✔
1931
        )
32✔
1932
        .await;
32✔
1933

1934
        debug!("Ending peer loop for {}", self.peer_address);
32✔
1935

1936
        // Return any error that `run` returned. Returning and not suppressing errors is a quite nice
1937
        // feature to have for testing purposes.
1938
        res
32✔
1939
    }
32✔
1940

1941
    /// Register graceful peer disconnection in the global state.
1942
    ///
1943
    /// See also [`NetworkingState::register_peer_disconnection`][1].
1944
    ///
1945
    /// # Locking:
1946
    ///   * acquires `global_state_lock` for write
1947
    ///
1948
    /// [1]: crate::models::state::networking_state::NetworkingState::register_peer_disconnection
1949
    async fn register_peer_disconnection(&mut self) {
×
1950
        let peer_id = self.peer_handshake_data.instance_id;
×
1951
        self.global_state_lock
×
1952
            .lock_guard_mut()
×
1953
            .await
×
1954
            .net
1955
            .register_peer_disconnection(peer_id, SystemTime::now());
×
1956
    }
×
1957
}
1958

1959
#[cfg(test)]
1960
#[cfg_attr(coverage_nightly, coverage(off))]
1961
mod tests {
1962
    use macro_rules_attr::apply;
1963
    use rand::rngs::StdRng;
1964
    use rand::Rng;
1965
    use rand::SeedableRng;
1966
    use tokio::sync::mpsc::error::TryRecvError;
1967
    use tracing_test::traced_test;
1968

1969
    use super::*;
1970
    use crate::config_models::cli_args;
1971
    use crate::config_models::network::Network;
1972
    use crate::models::blockchain::type_scripts::native_currency_amount::NativeCurrencyAmount;
1973
    use crate::models::peer::peer_block_notifications::PeerBlockNotification;
1974
    use crate::models::peer::transaction_notification::TransactionNotification;
1975
    use crate::models::peer::Sanction;
1976
    use crate::models::state::mempool::upgrade_priority::UpgradePriority;
1977
    use crate::models::state::tx_creation_config::TxCreationConfig;
1978
    use crate::models::state::tx_proving_capability::TxProvingCapability;
1979
    use crate::models::state::wallet::wallet_entropy::WalletEntropy;
1980
    use crate::tests::shared::blocks::fake_valid_block_for_tests;
1981
    use crate::tests::shared::blocks::fake_valid_sequence_of_blocks_for_tests;
1982
    use crate::tests::shared::globalstate::get_dummy_handshake_data_for_genesis;
1983
    use crate::tests::shared::globalstate::get_dummy_peer_connection_data_genesis;
1984
    use crate::tests::shared::globalstate::get_dummy_socket_address;
1985
    use crate::tests::shared::globalstate::get_test_genesis_setup;
1986
    use crate::tests::shared::mock_tx::invalid_empty_single_proof_transaction;
1987
    use crate::tests::shared::Action;
1988
    use crate::tests::shared::Mock;
1989
    use crate::tests::shared_tokio_runtime;
1990

1991
    #[traced_test]
1992
    #[apply(shared_tokio_runtime)]
1993
    async fn no_disconnect_on_invalid_message() {
1994
        // This test is intended to simulate a deserialization error, which
1995
        // would occur if the node is connected to a newer version of the
1996
        // software which has new variants of `PeerMessage`. The intended
1997
        // behavior is that a small punishment is applied but that the
1998
        // connection stays open.
1999
        let mock = Mock::new(vec![Action::ReadError, Action::Read(PeerMessage::Bye)]);
2000

2001
        let (peer_broadcast_tx, _from_main_rx_clone, to_main_tx, _to_main_rx1, state_lock, hsd) =
2002
            get_test_genesis_setup(Network::Main, 1, cli_args::Args::default())
2003
                .await
2004
                .unwrap();
2005

2006
        let peer_address = get_dummy_socket_address(2);
2007
        let from_main_rx_clone = peer_broadcast_tx.subscribe();
2008
        let mut peer_loop_handler =
2009
            PeerLoopHandler::new(to_main_tx, state_lock.clone(), peer_address, hsd, true, 1);
2010
        peer_loop_handler
2011
            .run_wrapper(mock, from_main_rx_clone)
2012
            .await
2013
            .unwrap();
2014

2015
        let peer_standing = state_lock
2016
            .lock_guard()
2017
            .await
2018
            .net
2019
            .get_peer_standing_from_database(peer_address.ip())
2020
            .await;
2021
        assert_eq!(
2022
            NegativePeerSanction::InvalidMessage.severity(),
2023
            peer_standing.unwrap().standing
2024
        );
2025
        assert_eq!(
2026
            NegativePeerSanction::InvalidMessage,
2027
            peer_standing.unwrap().latest_punishment.unwrap().0
2028
        );
2029
    }
2030

2031
    #[traced_test]
2032
    #[apply(shared_tokio_runtime)]
2033
    async fn test_peer_loop_bye() -> Result<()> {
2034
        let mock = Mock::new(vec![Action::Read(PeerMessage::Bye)]);
2035

2036
        let (peer_broadcast_tx, _from_main_rx_clone, to_main_tx, _to_main_rx1, state_lock, hsd) =
2037
            get_test_genesis_setup(Network::Main, 2, cli_args::Args::default()).await?;
2038

2039
        let peer_address = get_dummy_socket_address(2);
2040
        let from_main_rx_clone = peer_broadcast_tx.subscribe();
2041
        let mut peer_loop_handler =
2042
            PeerLoopHandler::new(to_main_tx, state_lock.clone(), peer_address, hsd, true, 1);
2043
        peer_loop_handler
2044
            .run_wrapper(mock, from_main_rx_clone)
2045
            .await?;
2046

2047
        assert_eq!(
2048
            2,
2049
            state_lock.lock_guard().await.net.peer_map.len(),
2050
            "peer map length must be back to 2 after goodbye"
2051
        );
2052

2053
        Ok(())
2054
    }
2055

2056
    #[traced_test]
2057
    #[apply(shared_tokio_runtime)]
2058
    async fn test_peer_loop_peer_list() {
2059
        let network = Network::Main;
2060
        let (peer_broadcast_tx, _from_main_rx_clone, to_main_tx, _to_main_rx1, state_lock, _hsd) =
2061
            get_test_genesis_setup(network, 2, cli_args::Args::default())
2062
                .await
2063
                .unwrap();
2064

2065
        let mut peer_infos = state_lock
2066
            .lock_guard()
2067
            .await
2068
            .net
2069
            .peer_map
2070
            .clone()
2071
            .into_values()
2072
            .collect::<Vec<_>>();
2073
        peer_infos.sort_by_cached_key(|x| x.connected_address());
2074
        let (peer_address0, instance_id0) = (
2075
            peer_infos[0].connected_address(),
2076
            peer_infos[0].instance_id(),
2077
        );
2078
        let (peer_address1, instance_id1) = (
2079
            peer_infos[1].connected_address(),
2080
            peer_infos[1].instance_id(),
2081
        );
2082

2083
        let (hsd2, sa2) = get_dummy_peer_connection_data_genesis(network, 2);
2084
        let expected_response = vec![
2085
            (peer_address0, instance_id0),
2086
            (peer_address1, instance_id1),
2087
            (sa2, hsd2.instance_id),
2088
        ];
2089
        let mock = Mock::new(vec![
2090
            Action::Read(PeerMessage::PeerListRequest),
2091
            Action::Write(PeerMessage::PeerListResponse(expected_response)),
2092
            Action::Read(PeerMessage::Bye),
2093
        ]);
2094

2095
        let from_main_rx_clone = peer_broadcast_tx.subscribe();
2096

2097
        let mut peer_loop_handler =
2098
            PeerLoopHandler::new(to_main_tx, state_lock.clone(), sa2, hsd2, true, 0);
2099
        peer_loop_handler
2100
            .run_wrapper(mock, from_main_rx_clone)
2101
            .await
2102
            .unwrap();
2103

2104
        assert_eq!(
2105
            2,
2106
            state_lock.lock_guard().await.net.peer_map.len(),
2107
            "peer map must have length 2 after saying goodbye to peer 2"
2108
        );
2109
    }
2110

2111
    #[traced_test]
2112
    #[apply(shared_tokio_runtime)]
2113
    async fn node_does_not_record_disconnection_time_when_peer_initiates_disconnect() -> Result<()>
2114
    {
2115
        let args = cli_args::Args::default();
2116
        let network = args.network;
2117
        let (from_main_tx, from_main_rx, to_main_tx, to_main_rx, state_lock, _) =
2118
            get_test_genesis_setup(network, 0, args).await?;
2119

2120
        let peer_address = get_dummy_socket_address(0);
2121
        let peer_handshake_data = get_dummy_handshake_data_for_genesis(network);
2122
        let peer_id = peer_handshake_data.instance_id;
2123
        let mut peer_loop_handler = PeerLoopHandler::new(
2124
            to_main_tx,
2125
            state_lock.clone(),
2126
            peer_address,
2127
            peer_handshake_data,
2128
            true,
2129
            1,
2130
        );
2131
        let mock = Mock::new(vec![Action::Read(PeerMessage::Bye)]);
2132
        peer_loop_handler.run_wrapper(mock, from_main_rx).await?;
2133

2134
        let global_state = state_lock.lock_guard().await;
2135
        assert!(global_state
2136
            .net
2137
            .last_disconnection_time_of_peer(peer_id)
2138
            .is_none());
2139

2140
        drop(to_main_rx);
2141
        drop(from_main_tx);
2142

2143
        Ok(())
2144
    }
2145

2146
    mod blocks {
2147
        use super::*;
2148

2149
        #[traced_test]
2150
        #[apply(shared_tokio_runtime)]
2151
        async fn different_genesis_test() -> Result<()> {
2152
            // In this scenario a peer provides another genesis block than what has been
2153
            // hardcoded. This should lead to the closing of the connection to this peer
2154
            // and a ban.
2155

2156
            let network = Network::Main;
2157
            let (
2158
                _peer_broadcast_tx,
2159
                from_main_rx_clone,
2160
                to_main_tx,
2161
                mut to_main_rx1,
2162
                state_lock,
2163
                hsd,
2164
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2165
            assert_eq!(1000, state_lock.cli().peer_tolerance);
2166
            let peer_address = get_dummy_socket_address(0);
2167

2168
            // Although the database is empty, `get_latest_block` still returns the genesis block,
2169
            // since that block is hardcoded.
2170
            let mut different_genesis_block = state_lock
2171
                .lock_guard()
2172
                .await
2173
                .chain
2174
                .archival_state()
2175
                .get_tip()
2176
                .await;
2177

2178
            different_genesis_block.set_header_nonce(StdRng::seed_from_u64(5550001).random());
2179
            let [block_1_with_different_genesis] = fake_valid_sequence_of_blocks_for_tests(
2180
                &different_genesis_block,
2181
                Timestamp::hours(1),
2182
                StdRng::seed_from_u64(5550001).random(),
2183
                network,
2184
            )
2185
            .await;
2186
            let mock = Mock::new(vec![Action::Read(PeerMessage::Block(Box::new(
2187
                block_1_with_different_genesis.try_into().unwrap(),
2188
            )))]);
2189

2190
            let mut peer_loop_handler = PeerLoopHandler::new(
2191
                to_main_tx.clone(),
2192
                state_lock.clone(),
2193
                peer_address,
2194
                hsd,
2195
                true,
2196
                1,
2197
            );
2198
            let res = peer_loop_handler
2199
                .run_wrapper(mock, from_main_rx_clone)
2200
                .await;
2201
            assert!(
2202
                res.is_err(),
2203
                "run_wrapper must return failure when genesis is different"
2204
            );
2205

2206
            match to_main_rx1.recv().await {
2207
                Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
2208
                _ => bail!("Must receive remove of peer block max height"),
2209
            }
2210

2211
            // Verify that no further message was sent to main loop
2212
            match to_main_rx1.try_recv() {
2213
                Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (),
2214
                _ => bail!("Block notification must not be sent for block with invalid PoW"),
2215
            };
2216

2217
            drop(to_main_tx);
2218

2219
            let peer_standing = state_lock
2220
                .lock_guard()
2221
                .await
2222
                .net
2223
                .get_peer_standing_from_database(peer_address.ip())
2224
                .await;
2225
            assert_eq!(
2226
                -i32::from(state_lock.cli().peer_tolerance),
2227
                peer_standing.unwrap().standing
2228
            );
2229
            assert_eq!(
2230
                NegativePeerSanction::DifferentGenesis,
2231
                peer_standing.unwrap().latest_punishment.unwrap().0
2232
            );
2233

2234
            Ok(())
2235
        }
2236

2237
        #[traced_test]
2238
        #[apply(shared_tokio_runtime)]
2239
        async fn block_without_valid_pow_test() -> Result<()> {
2240
            // In this scenario, a block without a valid PoW is received. This block should be rejected
2241
            // by the peer loop and a notification should never reach the main loop.
2242

2243
            let network = Network::Main;
2244
            let (
2245
                peer_broadcast_tx,
2246
                _from_main_rx_clone,
2247
                to_main_tx,
2248
                mut to_main_rx1,
2249
                state_lock,
2250
                hsd,
2251
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2252
            let peer_address = get_dummy_socket_address(0);
2253
            let genesis_block: Block = state_lock
2254
                .lock_guard()
2255
                .await
2256
                .chain
2257
                .archival_state()
2258
                .get_tip()
2259
                .await;
2260

2261
            // Make a with hash above what the implied threshold from
2262
            let [mut block_without_valid_pow] = fake_valid_sequence_of_blocks_for_tests(
2263
                &genesis_block,
2264
                Timestamp::hours(1),
2265
                StdRng::seed_from_u64(5550001).random(),
2266
                network,
2267
            )
2268
            .await;
2269

2270
            // This *probably* is invalid PoW -- and needs to be for this test to
2271
            // work.
2272
            block_without_valid_pow.set_header_nonce(Digest::default());
2273

2274
            // Sending an invalid block will not necessarily result in a ban. This depends on the peer
2275
            // tolerance that is set in the client. For this reason, we include a "Bye" here.
2276
            let mock = Mock::new(vec![
2277
                Action::Read(PeerMessage::Block(Box::new(
2278
                    block_without_valid_pow.clone().try_into().unwrap(),
2279
                ))),
2280
                Action::Read(PeerMessage::Bye),
2281
            ]);
2282

2283
            let from_main_rx_clone = peer_broadcast_tx.subscribe();
2284

2285
            let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
2286
                to_main_tx.clone(),
2287
                state_lock.clone(),
2288
                peer_address,
2289
                hsd,
2290
                true,
2291
                1,
2292
                block_without_valid_pow.header().timestamp,
2293
            );
2294
            peer_loop_handler
2295
                .run_wrapper(mock, from_main_rx_clone)
2296
                .await
2297
                .expect("sending (one) invalid block should not result in closed connection");
2298

2299
            match to_main_rx1.recv().await {
2300
                Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
2301
                _ => bail!("Must receive remove of peer block max height"),
2302
            }
2303

2304
            // Verify that no further message was sent to main loop
2305
            match to_main_rx1.try_recv() {
2306
                Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (),
2307
                _ => bail!("Block notification must not be sent for block with invalid PoW"),
2308
            };
2309

2310
            // We need to have the transmitter in scope until we have received from it
2311
            // otherwise the receiver will report the disconnected error when we attempt
2312
            // to read from it. And the purpose is to verify that the channel is empty,
2313
            // not that it has been closed.
2314
            drop(to_main_tx);
2315

2316
            // Verify that peer standing was stored in database
2317
            let standing = state_lock
2318
                .lock_guard()
2319
                .await
2320
                .net
2321
                .peer_databases
2322
                .peer_standings
2323
                .get(peer_address.ip())
2324
                .await
2325
                .unwrap();
2326
            assert!(
2327
                standing.standing < 0,
2328
                "Peer must be sanctioned for sending a bad block"
2329
            );
2330

2331
            Ok(())
2332
        }
2333

2334
        #[traced_test]
2335
        #[apply(shared_tokio_runtime)]
2336
        async fn test_peer_loop_block_with_block_in_db() -> Result<()> {
2337
            // The scenario tested here is that a client receives a block that is already
2338
            // known and stored. The expected behavior is to ignore the block and not send
2339
            // a message to the main task.
2340

2341
            let network = Network::Main;
2342
            let (
2343
                peer_broadcast_tx,
2344
                _from_main_rx_clone,
2345
                to_main_tx,
2346
                mut to_main_rx1,
2347
                mut alice,
2348
                hsd,
2349
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2350
            let peer_address = get_dummy_socket_address(0);
2351
            let genesis_block: Block = Block::genesis(network);
2352

2353
            let now = genesis_block.header().timestamp + Timestamp::hours(1);
2354
            let block_1 =
2355
                fake_valid_block_for_tests(&alice, StdRng::seed_from_u64(5550001).random()).await;
2356
            assert!(
2357
                block_1.is_valid(&genesis_block, now, network).await,
2358
                "Block must be valid for this test to make sense"
2359
            );
2360
            alice.set_new_tip(block_1.clone()).await?;
2361

2362
            let mock_peer_messages = Mock::new(vec![
2363
                Action::Read(PeerMessage::Block(Box::new(
2364
                    block_1.clone().try_into().unwrap(),
2365
                ))),
2366
                Action::Read(PeerMessage::Bye),
2367
            ]);
2368

2369
            let from_main_rx_clone = peer_broadcast_tx.subscribe();
2370

2371
            let mut alice_peer_loop_handler = PeerLoopHandler::with_mocked_time(
2372
                to_main_tx.clone(),
2373
                alice.clone(),
2374
                peer_address,
2375
                hsd,
2376
                false,
2377
                1,
2378
                block_1.header().timestamp,
2379
            );
2380
            alice_peer_loop_handler
2381
                .run_wrapper(mock_peer_messages, from_main_rx_clone)
2382
                .await?;
2383

2384
            match to_main_rx1.recv().await {
2385
                Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
2386
                other => bail!("Must receive remove of peer block max height. Got:\n {other:?}"),
2387
            }
2388
            match to_main_rx1.try_recv() {
2389
                Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (),
2390
                _ => bail!("Block notification must not be sent for block with invalid PoW"),
2391
            };
2392
            drop(to_main_tx);
2393

2394
            if !alice.lock_guard().await.net.peer_map.is_empty() {
2395
                bail!("peer map must be empty after closing connection gracefully");
2396
            }
2397

2398
            Ok(())
2399
        }
2400

2401
        #[traced_test]
2402
        #[apply(shared_tokio_runtime)]
2403
        async fn block_request_batch_simple() {
2404
            // Scenario: Six blocks (including genesis) are known. Peer requests
2405
            // from all possible starting points, and client responds with the
2406
            // correct list of blocks.
2407
            let network = Network::Main;
2408
            let (
2409
                _peer_broadcast_tx,
2410
                from_main_rx_clone,
2411
                to_main_tx,
2412
                _to_main_rx1,
2413
                mut state_lock,
2414
                handshake,
2415
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default())
2416
                .await
2417
                .unwrap();
2418
            let genesis_block: Block = Block::genesis(network);
2419
            let peer_address = get_dummy_socket_address(0);
2420
            let [block_1, block_2, block_3, block_4, block_5] =
2421
                fake_valid_sequence_of_blocks_for_tests(
2422
                    &genesis_block,
2423
                    Timestamp::hours(1),
2424
                    StdRng::seed_from_u64(5550001).random(),
2425
                    network,
2426
                )
2427
                .await;
2428
            let blocks = vec![
2429
                genesis_block,
2430
                block_1,
2431
                block_2,
2432
                block_3,
2433
                block_4,
2434
                block_5.clone(),
2435
            ];
2436
            for block in blocks.iter().skip(1) {
2437
                state_lock.set_new_tip(block.to_owned()).await.unwrap();
2438
            }
2439

2440
            let mmra = state_lock
2441
                .lock_guard()
2442
                .await
2443
                .chain
2444
                .archival_state()
2445
                .archival_block_mmr
2446
                .ammr()
2447
                .to_accumulator_async()
2448
                .await;
2449
            for i in 0..=4 {
2450
                let expected_response = {
2451
                    let state = state_lock.lock_guard().await;
2452
                    let blocks_for_response = blocks.iter().skip(i + 1).cloned().collect_vec();
2453
                    PeerLoopHandler::batch_response(&state, blocks_for_response, &mmra)
2454
                        .await
2455
                        .unwrap()
2456
                };
2457
                let mock = Mock::new(vec![
2458
                    Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch {
2459
                        known_blocks: vec![blocks[i].hash()],
2460
                        max_response_len: 14,
2461
                        anchor: mmra.clone(),
2462
                    })),
2463
                    Action::Write(PeerMessage::BlockResponseBatch(expected_response)),
2464
                    Action::Read(PeerMessage::Bye),
2465
                ]);
2466
                let mut peer_loop_handler = PeerLoopHandler::new(
2467
                    to_main_tx.clone(),
2468
                    state_lock.clone(),
2469
                    peer_address,
2470
                    handshake,
2471
                    false,
2472
                    1,
2473
                );
2474

2475
                peer_loop_handler
2476
                    .run_wrapper(mock, from_main_rx_clone.resubscribe())
2477
                    .await
2478
                    .unwrap();
2479
            }
2480
        }
2481

2482
        #[traced_test]
2483
        #[apply(shared_tokio_runtime)]
2484
        async fn block_request_batch_in_order_test() -> Result<()> {
2485
            // Scenario: A fork began at block 2, node knows two blocks of height 2 and two of height 3.
2486
            // A peer requests a batch of blocks starting from block 1. Ensure that the correct blocks
2487
            // are returned.
2488

2489
            let network = Network::Main;
2490
            let (
2491
                _peer_broadcast_tx,
2492
                from_main_rx_clone,
2493
                to_main_tx,
2494
                _to_main_rx1,
2495
                mut state_lock,
2496
                hsd,
2497
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2498
            let genesis_block: Block = Block::genesis(network);
2499
            let peer_address = get_dummy_socket_address(0);
2500
            let [block_1, block_2_a, block_3_a] = fake_valid_sequence_of_blocks_for_tests(
2501
                &genesis_block,
2502
                Timestamp::hours(1),
2503
                StdRng::seed_from_u64(5550001).random(),
2504
                network,
2505
            )
2506
            .await;
2507
            let [block_2_b, block_3_b] = fake_valid_sequence_of_blocks_for_tests(
2508
                &block_1,
2509
                Timestamp::hours(1),
2510
                StdRng::seed_from_u64(5550002).random(),
2511
                network,
2512
            )
2513
            .await;
2514
            assert_ne!(block_2_b.hash(), block_2_a.hash());
2515

2516
            state_lock.set_new_tip(block_1.clone()).await?;
2517
            state_lock.set_new_tip(block_2_a.clone()).await?;
2518
            state_lock.set_new_tip(block_2_b.clone()).await?;
2519
            state_lock.set_new_tip(block_3_b.clone()).await?;
2520
            state_lock.set_new_tip(block_3_a.clone()).await?;
2521

2522
            let anchor = state_lock
2523
                .lock_guard()
2524
                .await
2525
                .chain
2526
                .archival_state()
2527
                .archival_block_mmr
2528
                .ammr()
2529
                .to_accumulator_async()
2530
                .await;
2531
            let response_1 = {
2532
                let state_lock = state_lock.lock_guard().await;
2533
                PeerLoopHandler::batch_response(
2534
                    &state_lock,
2535
                    vec![block_1.clone(), block_2_a.clone(), block_3_a.clone()],
2536
                    &anchor,
2537
                )
2538
                .await
2539
                .unwrap()
2540
            };
2541

2542
            let mut mock = Mock::new(vec![
2543
                Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch {
2544
                    known_blocks: vec![genesis_block.hash()],
2545
                    max_response_len: 14,
2546
                    anchor: anchor.clone(),
2547
                })),
2548
                Action::Write(PeerMessage::BlockResponseBatch(response_1)),
2549
                Action::Read(PeerMessage::Bye),
2550
            ]);
2551

2552
            let mut peer_loop_handler_1 = PeerLoopHandler::with_mocked_time(
2553
                to_main_tx.clone(),
2554
                state_lock.clone(),
2555
                peer_address,
2556
                hsd,
2557
                false,
2558
                1,
2559
                block_3_a.header().timestamp,
2560
            );
2561

2562
            peer_loop_handler_1
2563
                .run_wrapper(mock, from_main_rx_clone.resubscribe())
2564
                .await?;
2565

2566
            // Peer knows block 2_b, verify that canonical chain with 2_a is returned
2567
            let response_2 = {
2568
                let state_lock = state_lock.lock_guard().await;
2569
                PeerLoopHandler::batch_response(
2570
                    &state_lock,
2571
                    vec![block_2_a, block_3_a.clone()],
2572
                    &anchor,
2573
                )
2574
                .await
2575
                .unwrap()
2576
            };
2577
            mock = Mock::new(vec![
2578
                Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch {
2579
                    known_blocks: vec![block_2_b.hash(), block_1.hash(), genesis_block.hash()],
2580
                    max_response_len: 14,
2581
                    anchor,
2582
                })),
2583
                Action::Write(PeerMessage::BlockResponseBatch(response_2)),
2584
                Action::Read(PeerMessage::Bye),
2585
            ]);
2586

2587
            let mut peer_loop_handler_2 = PeerLoopHandler::with_mocked_time(
2588
                to_main_tx.clone(),
2589
                state_lock.clone(),
2590
                peer_address,
2591
                hsd,
2592
                false,
2593
                1,
2594
                block_3_a.header().timestamp,
2595
            );
2596

2597
            peer_loop_handler_2
2598
                .run_wrapper(mock, from_main_rx_clone)
2599
                .await?;
2600

2601
            Ok(())
2602
        }
2603

2604
        #[traced_test]
2605
        #[apply(shared_tokio_runtime)]
2606
        async fn block_request_batch_out_of_order_test() -> Result<()> {
2607
            // Scenario: A fork began at block 2, node knows two blocks of height 2 and two of height 3.
2608
            // A peer requests a batch of blocks starting from block 1, but the peer supplies their
2609
            // hashes in a wrong order. Ensure that the correct blocks are returned, in the right order.
2610
            // The blocks will be supplied in the correct order but starting from the first digest in
2611
            // the list that is known and canonical.
2612

2613
            let network = Network::Main;
2614
            let (
2615
                _peer_broadcast_tx,
2616
                from_main_rx_clone,
2617
                to_main_tx,
2618
                _to_main_rx1,
2619
                mut state_lock,
2620
                hsd,
2621
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2622
            let genesis_block = Block::genesis(network);
2623
            let peer_address = get_dummy_socket_address(0);
2624
            let [block_1, block_2_a, block_3_a] = fake_valid_sequence_of_blocks_for_tests(
2625
                &genesis_block,
2626
                Timestamp::hours(1),
2627
                StdRng::seed_from_u64(5550001).random(),
2628
                network,
2629
            )
2630
            .await;
2631
            let [block_2_b, block_3_b] = fake_valid_sequence_of_blocks_for_tests(
2632
                &block_1,
2633
                Timestamp::hours(1),
2634
                StdRng::seed_from_u64(5550002).random(),
2635
                network,
2636
            )
2637
            .await;
2638
            assert_ne!(block_2_a.hash(), block_2_b.hash());
2639

2640
            state_lock.set_new_tip(block_1.clone()).await?;
2641
            state_lock.set_new_tip(block_2_a.clone()).await?;
2642
            state_lock.set_new_tip(block_2_b.clone()).await?;
2643
            state_lock.set_new_tip(block_3_b.clone()).await?;
2644
            state_lock.set_new_tip(block_3_a.clone()).await?;
2645

2646
            // Peer knows block 2_b, verify that canonical chain with 2_a is returned
2647
            let mut expected_anchor = block_3_a.body().block_mmr_accumulator.clone();
2648
            expected_anchor.append(block_3_a.hash());
2649
            let state_anchor = state_lock
2650
                .lock_guard()
2651
                .await
2652
                .chain
2653
                .archival_state()
2654
                .archival_block_mmr
2655
                .ammr()
2656
                .to_accumulator_async()
2657
                .await;
2658
            assert_eq!(
2659
                expected_anchor, state_anchor,
2660
                "Catching assumption about MMRA in tip and in archival state"
2661
            );
2662

2663
            let response = {
2664
                let state_lock = state_lock.lock_guard().await;
2665
                PeerLoopHandler::batch_response(
2666
                    &state_lock,
2667
                    vec![block_1.clone(), block_2_a, block_3_a.clone()],
2668
                    &expected_anchor,
2669
                )
2670
                .await
2671
                .unwrap()
2672
            };
2673
            let mock = Mock::new(vec![
2674
                Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch {
2675
                    known_blocks: vec![block_2_b.hash(), genesis_block.hash(), block_1.hash()],
2676
                    max_response_len: 14,
2677
                    anchor: expected_anchor,
2678
                })),
2679
                // Since genesis block is the 1st known in the list of known blocks,
2680
                // it's immediate descendent, block_1, is the first one returned.
2681
                Action::Write(PeerMessage::BlockResponseBatch(response)),
2682
                Action::Read(PeerMessage::Bye),
2683
            ]);
2684

2685
            let mut peer_loop_handler_2 = PeerLoopHandler::with_mocked_time(
2686
                to_main_tx.clone(),
2687
                state_lock.clone(),
2688
                peer_address,
2689
                hsd,
2690
                false,
2691
                1,
2692
                block_3_a.header().timestamp,
2693
            );
2694

2695
            peer_loop_handler_2
2696
                .run_wrapper(mock, from_main_rx_clone)
2697
                .await?;
2698

2699
            Ok(())
2700
        }
2701

2702
        #[traced_test]
2703
        #[apply(shared_tokio_runtime)]
2704
        async fn request_unknown_height_doesnt_crash() {
2705
            // Scenario: Only genesis block is known. Peer requests block of height
2706
            // 2.
2707
            let network = Network::Main;
2708
            let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, state_lock, hsd) =
2709
                get_test_genesis_setup(network, 0, cli_args::Args::default())
2710
                    .await
2711
                    .unwrap();
2712
            let peer_address = get_dummy_socket_address(0);
2713
            let mock = Mock::new(vec![
2714
                Action::Read(PeerMessage::BlockRequestByHeight(2.into())),
2715
                Action::Read(PeerMessage::Bye),
2716
            ]);
2717

2718
            let mut peer_loop_handler = PeerLoopHandler::new(
2719
                to_main_tx.clone(),
2720
                state_lock.clone(),
2721
                peer_address,
2722
                hsd,
2723
                false,
2724
                1,
2725
            );
2726

2727
            // This will return error if seen read/write order does not match that of the
2728
            // mocked object.
2729
            peer_loop_handler
2730
                .run_wrapper(mock, from_main_rx_clone)
2731
                .await
2732
                .unwrap();
2733

2734
            // Verify that peer is sanctioned for this nonsense.
2735
            assert!(state_lock
2736
                .lock_guard()
2737
                .await
2738
                .net
2739
                .get_peer_standing_from_database(peer_address.ip())
2740
                .await
2741
                .unwrap()
2742
                .standing
2743
                .is_negative());
2744
        }
2745

2746
        #[traced_test]
2747
        #[apply(shared_tokio_runtime)]
2748
        async fn find_canonical_chain_when_multiple_blocks_at_same_height_test() -> Result<()> {
2749
            // Scenario: A fork began at block 2, node knows two blocks of height 2 and two of height 3.
2750
            // A peer requests a block at height 2. Verify that the correct block at height 2 is
2751
            // returned.
2752

2753
            let network = Network::Main;
2754
            let (
2755
                _peer_broadcast_tx,
2756
                from_main_rx_clone,
2757
                to_main_tx,
2758
                _to_main_rx1,
2759
                mut state_lock,
2760
                hsd,
2761
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2762
            let genesis_block = Block::genesis(network);
2763
            let peer_address = get_dummy_socket_address(0);
2764

2765
            let [block_1, block_2_a, block_3_a] = fake_valid_sequence_of_blocks_for_tests(
2766
                &genesis_block,
2767
                Timestamp::hours(1),
2768
                StdRng::seed_from_u64(5550001).random(),
2769
                network,
2770
            )
2771
            .await;
2772
            let [block_2_b, block_3_b] = fake_valid_sequence_of_blocks_for_tests(
2773
                &block_1,
2774
                Timestamp::hours(1),
2775
                StdRng::seed_from_u64(5550002).random(),
2776
                network,
2777
            )
2778
            .await;
2779
            assert_ne!(block_2_a.hash(), block_2_b.hash());
2780

2781
            state_lock.set_new_tip(block_1.clone()).await?;
2782
            state_lock.set_new_tip(block_2_a.clone()).await?;
2783
            state_lock.set_new_tip(block_2_b.clone()).await?;
2784
            state_lock.set_new_tip(block_3_b.clone()).await?;
2785
            state_lock.set_new_tip(block_3_a.clone()).await?;
2786

2787
            let mock = Mock::new(vec![
2788
                Action::Read(PeerMessage::BlockRequestByHeight(2.into())),
2789
                Action::Write(PeerMessage::Block(Box::new(block_2_a.try_into().unwrap()))),
2790
                Action::Read(PeerMessage::BlockRequestByHeight(3.into())),
2791
                Action::Write(PeerMessage::Block(Box::new(
2792
                    block_3_a.clone().try_into().unwrap(),
2793
                ))),
2794
                Action::Read(PeerMessage::Bye),
2795
            ]);
2796

2797
            let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
2798
                to_main_tx.clone(),
2799
                state_lock.clone(),
2800
                peer_address,
2801
                hsd,
2802
                false,
2803
                1,
2804
                block_3_a.header().timestamp,
2805
            );
2806

2807
            // This will return error if seen read/write order does not match that of the
2808
            // mocked object.
2809
            peer_loop_handler
2810
                .run_wrapper(mock, from_main_rx_clone)
2811
                .await?;
2812

2813
            Ok(())
2814
        }
2815

2816
        #[traced_test]
2817
        #[apply(shared_tokio_runtime)]
2818
        async fn receival_of_block_notification_height_1() {
2819
            // Scenario: client only knows genesis block. Then receives block
2820
            // notification of height 1. Must request block 1.
2821
            let network = Network::Main;
2822
            let mut rng = StdRng::seed_from_u64(5552401);
2823
            let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, to_main_rx1, state_lock, hsd) =
2824
                get_test_genesis_setup(network, 0, cli_args::Args::default())
2825
                    .await
2826
                    .unwrap();
2827
            let block_1 = fake_valid_block_for_tests(&state_lock, rng.random()).await;
2828
            let notification_height1 = (&block_1).into();
2829
            let mock = Mock::new(vec![
2830
                Action::Read(PeerMessage::BlockNotification(notification_height1)),
2831
                Action::Write(PeerMessage::BlockRequestByHeight(1u64.into())),
2832
                Action::Read(PeerMessage::Bye),
2833
            ]);
2834

2835
            let peer_address = get_dummy_socket_address(0);
2836
            let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
2837
                to_main_tx.clone(),
2838
                state_lock.clone(),
2839
                peer_address,
2840
                hsd,
2841
                false,
2842
                1,
2843
                block_1.header().timestamp,
2844
            );
2845
            peer_loop_handler
2846
                .run_wrapper(mock, from_main_rx_clone)
2847
                .await
2848
                .unwrap();
2849

2850
            drop(to_main_rx1);
2851
        }
2852

2853
        #[traced_test]
2854
        #[apply(shared_tokio_runtime)]
2855
        async fn receive_block_request_by_height_block_7() {
2856
            // Scenario: client only knows blocks up to height 7. Then receives block-
2857
            // request-by-height for height 7. Must respond with block 7.
2858
            let network = Network::Main;
2859
            let mut rng = StdRng::seed_from_u64(5552401);
2860
            let (
2861
                _peer_broadcast_tx,
2862
                from_main_rx_clone,
2863
                to_main_tx,
2864
                to_main_rx1,
2865
                mut state_lock,
2866
                hsd,
2867
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default())
2868
                .await
2869
                .unwrap();
2870
            let genesis_block = Block::genesis(network);
2871
            let blocks: [Block; 7] = fake_valid_sequence_of_blocks_for_tests(
2872
                &genesis_block,
2873
                Timestamp::hours(1),
2874
                rng.random(),
2875
                network,
2876
            )
2877
            .await;
2878
            let block7 = blocks.last().unwrap().to_owned();
2879
            let tip_height: u64 = block7.header().height.into();
2880
            assert_eq!(7, tip_height);
2881

2882
            for block in &blocks {
2883
                state_lock.set_new_tip(block.to_owned()).await.unwrap();
2884
            }
2885

2886
            let block7_response = PeerMessage::Block(Box::new(block7.try_into().unwrap()));
2887
            let mock = Mock::new(vec![
2888
                Action::Read(PeerMessage::BlockRequestByHeight(7u64.into())),
2889
                Action::Write(block7_response),
2890
                Action::Read(PeerMessage::Bye),
2891
            ]);
2892

2893
            let peer_address = get_dummy_socket_address(0);
2894
            let mut peer_loop_handler = PeerLoopHandler::new(
2895
                to_main_tx.clone(),
2896
                state_lock.clone(),
2897
                peer_address,
2898
                hsd,
2899
                false,
2900
                1,
2901
            );
2902
            peer_loop_handler
2903
                .run_wrapper(mock, from_main_rx_clone)
2904
                .await
2905
                .unwrap();
2906

2907
            drop(to_main_rx1);
2908
        }
2909

2910
        #[traced_test]
2911
        #[apply(shared_tokio_runtime)]
2912
        async fn test_peer_loop_receival_of_first_block() -> Result<()> {
2913
            // Scenario: client only knows genesis block. Then receives block 1.
2914

2915
            let network = Network::Main;
2916
            let mut rng = StdRng::seed_from_u64(5550001);
2917
            let (
2918
                _peer_broadcast_tx,
2919
                from_main_rx_clone,
2920
                to_main_tx,
2921
                mut to_main_rx1,
2922
                state_lock,
2923
                hsd,
2924
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2925
            let peer_address = get_dummy_socket_address(0);
2926

2927
            let block_1 = fake_valid_block_for_tests(&state_lock, rng.random()).await;
2928
            let mock = Mock::new(vec![
2929
                Action::Read(PeerMessage::Block(Box::new(
2930
                    block_1.clone().try_into().unwrap(),
2931
                ))),
2932
                Action::Read(PeerMessage::Bye),
2933
            ]);
2934

2935
            let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
2936
                to_main_tx.clone(),
2937
                state_lock.clone(),
2938
                peer_address,
2939
                hsd,
2940
                false,
2941
                1,
2942
                block_1.header().timestamp,
2943
            );
2944
            peer_loop_handler
2945
                .run_wrapper(mock, from_main_rx_clone)
2946
                .await?;
2947

2948
            // Verify that a block was sent to `main_loop`
2949
            match to_main_rx1.recv().await {
2950
                Some(PeerTaskToMain::NewBlocks(_block)) => (),
2951
                _ => bail!("Did not find msg sent to main task"),
2952
            };
2953

2954
            match to_main_rx1.recv().await {
2955
                Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
2956
                _ => bail!("Must receive remove of peer block max height"),
2957
            }
2958

2959
            if !state_lock.lock_guard().await.net.peer_map.is_empty() {
2960
                bail!("peer map must be empty after closing connection gracefully");
2961
            }
2962

2963
            Ok(())
2964
        }
2965

2966
        #[traced_test]
2967
        #[apply(shared_tokio_runtime)]
2968
        async fn test_peer_loop_receival_of_second_block_no_blocks_in_db() -> Result<()> {
2969
            // In this scenario, the client only knows the genesis block (block 0) and then
2970
            // receives block 2, meaning that block 1 will have to be requested.
2971

2972
            let network = Network::Main;
2973
            let (
2974
                _peer_broadcast_tx,
2975
                from_main_rx_clone,
2976
                to_main_tx,
2977
                mut to_main_rx1,
2978
                state_lock,
2979
                hsd,
2980
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2981
            let peer_address = get_dummy_socket_address(0);
2982
            let genesis_block: Block = state_lock
2983
                .lock_guard()
2984
                .await
2985
                .chain
2986
                .archival_state()
2987
                .get_tip()
2988
                .await;
2989
            let [block_1, block_2] = fake_valid_sequence_of_blocks_for_tests(
2990
                &genesis_block,
2991
                Timestamp::hours(1),
2992
                StdRng::seed_from_u64(5550001).random(),
2993
                network,
2994
            )
2995
            .await;
2996

2997
            let mock = Mock::new(vec![
2998
                Action::Read(PeerMessage::Block(Box::new(
2999
                    block_2.clone().try_into().unwrap(),
3000
                ))),
3001
                Action::Write(PeerMessage::BlockRequestByHash(block_1.hash())),
3002
                Action::Read(PeerMessage::Block(Box::new(
3003
                    block_1.clone().try_into().unwrap(),
3004
                ))),
3005
                Action::Read(PeerMessage::Bye),
3006
            ]);
3007

3008
            let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
3009
                to_main_tx.clone(),
3010
                state_lock.clone(),
3011
                peer_address,
3012
                hsd,
3013
                true,
3014
                1,
3015
                block_2.header().timestamp,
3016
            );
3017
            peer_loop_handler
3018
                .run_wrapper(mock, from_main_rx_clone)
3019
                .await?;
3020

3021
            match to_main_rx1.recv().await {
3022
                Some(PeerTaskToMain::NewBlocks(blocks)) => {
3023
                    if blocks[0].hash() != block_1.hash() {
3024
                        bail!("1st received block by main loop must be block 1");
3025
                    }
3026
                    if blocks[1].hash() != block_2.hash() {
3027
                        bail!("2nd received block by main loop must be block 2");
3028
                    }
3029
                }
3030
                _ => bail!("Did not find msg sent to main task 1"),
3031
            };
3032
            match to_main_rx1.recv().await {
3033
                Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
3034
                _ => bail!("Must receive remove of peer block max height"),
3035
            }
3036

3037
            if !state_lock.lock_guard().await.net.peer_map.is_empty() {
3038
                bail!("peer map must be empty after closing connection gracefully");
3039
            }
3040

3041
            Ok(())
3042
        }
3043

3044
        #[traced_test]
3045
        #[apply(shared_tokio_runtime)]
3046
        async fn prevent_ram_exhaustion_test() -> Result<()> {
3047
            // In this scenario the peer sends more blocks than the client allows to store in the
3048
            // fork-reconciliation field. This should result in abandonment of the fork-reconciliation
3049
            // process as the alternative is that the program will crash because it runs out of RAM.
3050

3051
            let network = Network::Main;
3052
            let mut rng = StdRng::seed_from_u64(5550001);
3053
            let (
3054
                _peer_broadcast_tx,
3055
                from_main_rx_clone,
3056
                to_main_tx,
3057
                mut to_main_rx1,
3058
                mut state_lock,
3059
                _hsd,
3060
            ) = get_test_genesis_setup(network, 1, cli_args::Args::default()).await?;
3061
            let genesis_block = Block::genesis(network);
3062

3063
            // Restrict max number of blocks held in memory to 2.
3064
            let mut cli = state_lock.cli().clone();
3065
            cli.sync_mode_threshold = 2;
3066
            state_lock.set_cli(cli).await;
3067

3068
            let (hsd1, peer_address1) = get_dummy_peer_connection_data_genesis(network, 1);
3069
            let [block_1, _block_2, block_3, block_4] = fake_valid_sequence_of_blocks_for_tests(
3070
                &genesis_block,
3071
                Timestamp::hours(1),
3072
                rng.random(),
3073
                network,
3074
            )
3075
            .await;
3076
            state_lock.set_new_tip(block_1.clone()).await?;
3077

3078
            let mock = Mock::new(vec![
3079
                Action::Read(PeerMessage::Block(Box::new(
3080
                    block_4.clone().try_into().unwrap(),
3081
                ))),
3082
                Action::Write(PeerMessage::BlockRequestByHash(block_3.hash())),
3083
                Action::Read(PeerMessage::Block(Box::new(
3084
                    block_3.clone().try_into().unwrap(),
3085
                ))),
3086
                Action::Read(PeerMessage::Bye),
3087
            ]);
3088

3089
            let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
3090
                to_main_tx.clone(),
3091
                state_lock.clone(),
3092
                peer_address1,
3093
                hsd1,
3094
                true,
3095
                1,
3096
                block_4.header().timestamp,
3097
            );
3098
            peer_loop_handler
3099
                .run_wrapper(mock, from_main_rx_clone)
3100
                .await?;
3101

3102
            match to_main_rx1.recv().await {
3103
                Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
3104
                _ => bail!("Must receive remove of peer block max height"),
3105
            }
3106

3107
            // Verify that no block is sent to main loop.
3108
            match to_main_rx1.try_recv() {
3109
            Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (),
3110
            _ => bail!("Peer must not handle more fork-reconciliation blocks than specified in CLI arguments"),
3111
        };
3112
            drop(to_main_tx);
3113

3114
            // Verify that peer is sanctioned for failed fork reconciliation attempt
3115
            assert!(state_lock
3116
                .lock_guard()
3117
                .await
3118
                .net
3119
                .get_peer_standing_from_database(peer_address1.ip())
3120
                .await
3121
                .unwrap()
3122
                .standing
3123
                .is_negative());
3124

3125
            Ok(())
3126
        }
3127

3128
        #[traced_test]
3129
        #[apply(shared_tokio_runtime)]
3130
        async fn test_peer_loop_receival_of_fourth_block_one_block_in_db() {
3131
            // In this scenario, the client know the genesis block (block 0) and block 1, it
3132
            // then receives block 4, meaning that block 3 and 2 will have to be requested.
3133

3134
            let network = Network::Main;
3135
            let (
3136
                _peer_broadcast_tx,
3137
                from_main_rx_clone,
3138
                to_main_tx,
3139
                mut to_main_rx1,
3140
                mut state_lock,
3141
                hsd,
3142
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default())
3143
                .await
3144
                .unwrap();
3145
            let peer_address: SocketAddr = get_dummy_socket_address(0);
3146
            let genesis_block = Block::genesis(network);
3147
            let [block_1, block_2, block_3, block_4] = fake_valid_sequence_of_blocks_for_tests(
3148
                &genesis_block,
3149
                Timestamp::hours(1),
3150
                StdRng::seed_from_u64(5550001).random(),
3151
                network,
3152
            )
3153
            .await;
3154
            state_lock.set_new_tip(block_1.clone()).await.unwrap();
3155

3156
            let mock = Mock::new(vec![
3157
                Action::Read(PeerMessage::Block(Box::new(
3158
                    block_4.clone().try_into().unwrap(),
3159
                ))),
3160
                Action::Write(PeerMessage::BlockRequestByHash(block_3.hash())),
3161
                Action::Read(PeerMessage::Block(Box::new(
3162
                    block_3.clone().try_into().unwrap(),
3163
                ))),
3164
                Action::Write(PeerMessage::BlockRequestByHash(block_2.hash())),
3165
                Action::Read(PeerMessage::Block(Box::new(
3166
                    block_2.clone().try_into().unwrap(),
3167
                ))),
3168
                Action::Read(PeerMessage::Bye),
3169
            ]);
3170

3171
            let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
3172
                to_main_tx.clone(),
3173
                state_lock.clone(),
3174
                peer_address,
3175
                hsd,
3176
                true,
3177
                1,
3178
                block_4.header().timestamp,
3179
            );
3180
            peer_loop_handler
3181
                .run_wrapper(mock, from_main_rx_clone)
3182
                .await
3183
                .unwrap();
3184

3185
            let Some(PeerTaskToMain::NewBlocks(blocks)) = to_main_rx1.recv().await else {
3186
                panic!("Did not find msg sent to main task");
3187
            };
3188
            assert_eq!(blocks[0].hash(), block_2.hash());
3189
            assert_eq!(blocks[1].hash(), block_3.hash());
3190
            assert_eq!(blocks[2].hash(), block_4.hash());
3191

3192
            let Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) = to_main_rx1.recv().await else {
3193
                panic!("Must receive remove of peer block max height");
3194
            };
3195

3196
            assert!(
3197
                state_lock.lock_guard().await.net.peer_map.is_empty(),
3198
                "peer map must be empty after closing connection gracefully"
3199
            );
3200
        }
3201

3202
        #[traced_test]
3203
        #[apply(shared_tokio_runtime)]
3204
        async fn test_peer_loop_receival_of_third_block_no_blocks_in_db() -> Result<()> {
3205
            // In this scenario, the client only knows the genesis block (block 0) and then
3206
            // receives block 3, meaning that block 2 and 1 will have to be requested.
3207

3208
            let network = Network::Main;
3209
            let (
3210
                _peer_broadcast_tx,
3211
                from_main_rx_clone,
3212
                to_main_tx,
3213
                mut to_main_rx1,
3214
                state_lock,
3215
                hsd,
3216
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
3217
            let peer_address = get_dummy_socket_address(0);
3218
            let genesis_block = Block::genesis(network);
3219

3220
            let [block_1, block_2, block_3] = fake_valid_sequence_of_blocks_for_tests(
3221
                &genesis_block,
3222
                Timestamp::hours(1),
3223
                StdRng::seed_from_u64(5550001).random(),
3224
                network,
3225
            )
3226
            .await;
3227

3228
            let mock = Mock::new(vec![
3229
                Action::Read(PeerMessage::Block(Box::new(
3230
                    block_3.clone().try_into().unwrap(),
3231
                ))),
3232
                Action::Write(PeerMessage::BlockRequestByHash(block_2.hash())),
3233
                Action::Read(PeerMessage::Block(Box::new(
3234
                    block_2.clone().try_into().unwrap(),
3235
                ))),
3236
                Action::Write(PeerMessage::BlockRequestByHash(block_1.hash())),
3237
                Action::Read(PeerMessage::Block(Box::new(
3238
                    block_1.clone().try_into().unwrap(),
3239
                ))),
3240
                Action::Read(PeerMessage::Bye),
3241
            ]);
3242

3243
            let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
3244
                to_main_tx.clone(),
3245
                state_lock.clone(),
3246
                peer_address,
3247
                hsd,
3248
                true,
3249
                1,
3250
                block_3.header().timestamp,
3251
            );
3252
            peer_loop_handler
3253
                .run_wrapper(mock, from_main_rx_clone)
3254
                .await?;
3255

3256
            match to_main_rx1.recv().await {
3257
                Some(PeerTaskToMain::NewBlocks(blocks)) => {
3258
                    if blocks[0].hash() != block_1.hash() {
3259
                        bail!("1st received block by main loop must be block 1");
3260
                    }
3261
                    if blocks[1].hash() != block_2.hash() {
3262
                        bail!("2nd received block by main loop must be block 2");
3263
                    }
3264
                    if blocks[2].hash() != block_3.hash() {
3265
                        bail!("3rd received block by main loop must be block 3");
3266
                    }
3267
                }
3268
                _ => bail!("Did not find msg sent to main task"),
3269
            };
3270
            match to_main_rx1.recv().await {
3271
                Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
3272
                _ => bail!("Must receive remove of peer block max height"),
3273
            }
3274

3275
            if !state_lock.lock_guard().await.net.peer_map.is_empty() {
3276
                bail!("peer map must be empty after closing connection gracefully");
3277
            }
3278

3279
            Ok(())
3280
        }
3281

3282
        #[traced_test]
3283
        #[apply(shared_tokio_runtime)]
3284
        async fn test_block_reconciliation_interrupted_by_block_notification() -> Result<()> {
3285
            // In this scenario, the client know the genesis block (block 0) and block 1, it
3286
            // then receives block 4, meaning that block 3, 2, and 1 will have to be requested.
3287
            // But the requests are interrupted by the peer sending another message: a new block
3288
            // notification.
3289

3290
            let network = Network::Main;
3291
            let (
3292
                _peer_broadcast_tx,
3293
                from_main_rx_clone,
3294
                to_main_tx,
3295
                mut to_main_rx1,
3296
                mut state_lock,
3297
                hsd,
3298
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
3299
            let peer_socket_address: SocketAddr = get_dummy_socket_address(0);
3300
            let genesis_block: Block = state_lock
3301
                .lock_guard()
3302
                .await
3303
                .chain
3304
                .archival_state()
3305
                .get_tip()
3306
                .await;
3307

3308
            let [block_1, block_2, block_3, block_4, block_5] =
3309
                fake_valid_sequence_of_blocks_for_tests(
3310
                    &genesis_block,
3311
                    Timestamp::hours(1),
3312
                    StdRng::seed_from_u64(5550001).random(),
3313
                    network,
3314
                )
3315
                .await;
3316
            state_lock.set_new_tip(block_1.clone()).await?;
3317

3318
            let mock = Mock::new(vec![
3319
                Action::Read(PeerMessage::Block(Box::new(
3320
                    block_4.clone().try_into().unwrap(),
3321
                ))),
3322
                Action::Write(PeerMessage::BlockRequestByHash(block_3.hash())),
3323
                Action::Read(PeerMessage::Block(Box::new(
3324
                    block_3.clone().try_into().unwrap(),
3325
                ))),
3326
                Action::Write(PeerMessage::BlockRequestByHash(block_2.hash())),
3327
                //
3328
                // Now make the interruption of the block reconciliation process
3329
                Action::Read(PeerMessage::BlockNotification((&block_5).into())),
3330
                //
3331
                // Complete the block reconciliation process by requesting the last block
3332
                // in this process, to get back to a mutually known block.
3333
                Action::Read(PeerMessage::Block(Box::new(
3334
                    block_2.clone().try_into().unwrap(),
3335
                ))),
3336
                //
3337
                // Then anticipate the request of the block that was announced
3338
                // in the interruption.
3339
                // Note that we cannot anticipate the response, as only the main
3340
                // task writes to the database. And the database needs to be updated
3341
                // for the handling of block 5 to be done correctly.
3342
                Action::Write(PeerMessage::BlockRequestByHeight(
3343
                    block_5.kernel.header.height,
3344
                )),
3345
                Action::Read(PeerMessage::Bye),
3346
            ]);
3347

3348
            let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
3349
                to_main_tx.clone(),
3350
                state_lock.clone(),
3351
                peer_socket_address,
3352
                hsd,
3353
                false,
3354
                1,
3355
                block_5.header().timestamp,
3356
            );
3357
            peer_loop_handler
3358
                .run_wrapper(mock, from_main_rx_clone)
3359
                .await?;
3360

3361
            match to_main_rx1.recv().await {
3362
                Some(PeerTaskToMain::NewBlocks(blocks)) => {
3363
                    if blocks[0].hash() != block_2.hash() {
3364
                        bail!("1st received block by main loop must be block 1");
3365
                    }
3366
                    if blocks[1].hash() != block_3.hash() {
3367
                        bail!("2nd received block by main loop must be block 2");
3368
                    }
3369
                    if blocks[2].hash() != block_4.hash() {
3370
                        bail!("3rd received block by main loop must be block 3");
3371
                    }
3372
                }
3373
                _ => bail!("Did not find msg sent to main task"),
3374
            };
3375
            match to_main_rx1.recv().await {
3376
                Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
3377
                _ => bail!("Must receive remove of peer block max height"),
3378
            }
3379

3380
            if !state_lock.lock_guard().await.net.peer_map.is_empty() {
3381
                bail!("peer map must be empty after closing connection gracefully");
3382
            }
3383

3384
            Ok(())
3385
        }
3386

3387
        #[traced_test]
3388
        #[apply(shared_tokio_runtime)]
3389
        async fn test_block_reconciliation_interrupted_by_peer_list_request() -> Result<()> {
3390
            // In this scenario, the client knows the genesis block (block 0) and block 1, it
3391
            // then receives block 4, meaning that block 3, 2, and 1 will have to be requested.
3392
            // But the requests are interrupted by the peer sending another message: a request
3393
            // for a list of peers.
3394

3395
            let network = Network::Main;
3396
            let (
3397
                _peer_broadcast_tx,
3398
                from_main_rx_clone,
3399
                to_main_tx,
3400
                mut to_main_rx1,
3401
                mut state_lock,
3402
                _hsd,
3403
            ) = get_test_genesis_setup(network, 1, cli_args::Args::default()).await?;
3404
            let genesis_block = Block::genesis(network);
3405
            let peer_infos: Vec<PeerInfo> = state_lock
3406
                .lock_guard()
3407
                .await
3408
                .net
3409
                .peer_map
3410
                .clone()
3411
                .into_values()
3412
                .collect::<Vec<_>>();
3413

3414
            let [block_1, block_2, block_3, block_4] = fake_valid_sequence_of_blocks_for_tests(
3415
                &genesis_block,
3416
                Timestamp::hours(1),
3417
                StdRng::seed_from_u64(5550001).random(),
3418
                network,
3419
            )
3420
            .await;
3421
            state_lock.set_new_tip(block_1.clone()).await?;
3422

3423
            let (hsd_1, sa_1) = get_dummy_peer_connection_data_genesis(network, 1);
3424
            let expected_peer_list_resp = vec![
3425
                (
3426
                    peer_infos[0].listen_address().unwrap(),
3427
                    peer_infos[0].instance_id(),
3428
                ),
3429
                (sa_1, hsd_1.instance_id),
3430
            ];
3431
            let mock = Mock::new(vec![
3432
                Action::Read(PeerMessage::Block(Box::new(
3433
                    block_4.clone().try_into().unwrap(),
3434
                ))),
3435
                Action::Write(PeerMessage::BlockRequestByHash(block_3.hash())),
3436
                Action::Read(PeerMessage::Block(Box::new(
3437
                    block_3.clone().try_into().unwrap(),
3438
                ))),
3439
                Action::Write(PeerMessage::BlockRequestByHash(block_2.hash())),
3440
                //
3441
                // Now make the interruption of the block reconciliation process
3442
                Action::Read(PeerMessage::PeerListRequest),
3443
                //
3444
                // Answer the request for a peer list
3445
                Action::Write(PeerMessage::PeerListResponse(expected_peer_list_resp)),
3446
                //
3447
                // Complete the block reconciliation process by requesting the last block
3448
                // in this process, to get back to a mutually known block.
3449
                Action::Read(PeerMessage::Block(Box::new(
3450
                    block_2.clone().try_into().unwrap(),
3451
                ))),
3452
                Action::Read(PeerMessage::Bye),
3453
            ]);
3454

3455
            let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
3456
                to_main_tx,
3457
                state_lock.clone(),
3458
                sa_1,
3459
                hsd_1,
3460
                true,
3461
                1,
3462
                block_4.header().timestamp,
3463
            );
3464
            peer_loop_handler
3465
                .run_wrapper(mock, from_main_rx_clone)
3466
                .await?;
3467

3468
            // Verify that blocks are sent to `main_loop` in expected ordering
3469
            match to_main_rx1.recv().await {
3470
                Some(PeerTaskToMain::NewBlocks(blocks)) => {
3471
                    if blocks[0].hash() != block_2.hash() {
3472
                        bail!("1st received block by main loop must be block 1");
3473
                    }
3474
                    if blocks[1].hash() != block_3.hash() {
3475
                        bail!("2nd received block by main loop must be block 2");
3476
                    }
3477
                    if blocks[2].hash() != block_4.hash() {
3478
                        bail!("3rd received block by main loop must be block 3");
3479
                    }
3480
                }
3481
                _ => bail!("Did not find msg sent to main task"),
3482
            };
3483

3484
            assert_eq!(
3485
                1,
3486
                state_lock.lock_guard().await.net.peer_map.len(),
3487
                "One peer must remain in peer list after peer_1 closed gracefully"
3488
            );
3489

3490
            Ok(())
3491
        }
3492
    }
3493

3494
    mod transactions {
3495
        use crate::main_loop::proof_upgrader::PrimitiveWitnessToProofCollection;
3496
        use crate::main_loop::proof_upgrader::PrimitiveWitnessToSingleProof;
3497
        use crate::models::blockchain::transaction::primitive_witness::PrimitiveWitness;
3498
        use crate::models::proof_abstractions::tasm::program::TritonVmProofJobOptions;
3499
        use crate::tests::shared::blocks::fake_deterministic_successor;
3500
        use crate::tests::shared::mock_tx::genesis_tx_with_proof_type;
3501
        use crate::triton_vm_job_queue::vm_job_queue;
3502

3503
        use super::*;
3504

3505
        #[traced_test]
3506
        #[apply(shared_tokio_runtime)]
3507
        async fn receive_transaction_request() {
3508
            let network = Network::Main;
3509
            let dummy_tx = invalid_empty_single_proof_transaction();
3510
            let txid = dummy_tx.kernel.txid();
3511

3512
            for transaction_is_known in [false, true] {
3513
                let (_peer_broadcast_tx, from_main_rx, to_main_tx, _, mut state_lock, _hsd) =
3514
                    get_test_genesis_setup(network, 1, cli_args::Args::default())
3515
                        .await
3516
                        .unwrap();
3517
                if transaction_is_known {
3518
                    state_lock
3519
                        .lock_guard_mut()
3520
                        .await
3521
                        .mempool_insert(dummy_tx.clone(), UpgradePriority::Irrelevant)
3522
                        .await;
3523
                }
3524

3525
                let mock = if transaction_is_known {
3526
                    Mock::new(vec![
3527
                        Action::Read(PeerMessage::TransactionRequest(txid)),
3528
                        Action::Write(PeerMessage::Transaction(Box::new(
3529
                            (&dummy_tx).try_into().unwrap(),
3530
                        ))),
3531
                        Action::Read(PeerMessage::Bye),
3532
                    ])
3533
                } else {
3534
                    Mock::new(vec![
3535
                        Action::Read(PeerMessage::TransactionRequest(txid)),
3536
                        Action::Read(PeerMessage::Bye),
3537
                    ])
3538
                };
3539

3540
                let hsd = get_dummy_handshake_data_for_genesis(network);
3541
                let mut peer_state = MutablePeerState::new(hsd.tip_header.height);
3542
                let mut peer_loop_handler = PeerLoopHandler::new(
3543
                    to_main_tx,
3544
                    state_lock,
3545
                    get_dummy_socket_address(0),
3546
                    hsd,
3547
                    true,
3548
                    1,
3549
                );
3550

3551
                peer_loop_handler
3552
                    .run(mock, from_main_rx, &mut peer_state)
3553
                    .await
3554
                    .unwrap();
3555
            }
3556
        }
3557

3558
        #[traced_test]
3559
        #[apply(shared_tokio_runtime)]
3560
        async fn empty_mempool_request_tx_test() {
3561
            // In this scenario the client receives a transaction notification from
3562
            // a peer of a transaction it doesn't know; the client must then request it.
3563

3564
            let network = Network::Main;
3565
            let (
3566
                _peer_broadcast_tx,
3567
                from_main_rx_clone,
3568
                to_main_tx,
3569
                mut to_main_rx1,
3570
                state_lock,
3571
                _hsd,
3572
            ) = get_test_genesis_setup(network, 1, cli_args::Args::default())
3573
                .await
3574
                .unwrap();
3575

3576
            let spending_key = state_lock
3577
                .lock_guard()
3578
                .await
3579
                .wallet_state
3580
                .wallet_entropy
3581
                .nth_symmetric_key_for_tests(0);
3582
            let genesis_block = Block::genesis(network);
3583
            let now = genesis_block.kernel.header.timestamp;
3584
            let config = TxCreationConfig::default()
3585
                .recover_change_off_chain(spending_key.into())
3586
                .with_prover_capability(TxProvingCapability::ProofCollection);
3587
            let consensus_rule_set = ConsensusRuleSet::infer_from(network, BlockHeight::genesis());
3588
            let transaction_1: Transaction = state_lock
3589
                .api()
3590
                .tx_initiator_internal()
3591
                .create_transaction(
3592
                    Default::default(),
3593
                    NativeCurrencyAmount::coins(0),
3594
                    now,
3595
                    config,
3596
                    consensus_rule_set,
3597
                )
3598
                .await
3599
                .unwrap()
3600
                .transaction
3601
                .into();
3602

3603
            // Build the resulting transaction notification
3604
            let tx_notification: TransactionNotification = (&transaction_1).try_into().unwrap();
3605
            let mock = Mock::new(vec![
3606
                Action::Read(PeerMessage::TransactionNotification(tx_notification)),
3607
                Action::Write(PeerMessage::TransactionRequest(tx_notification.txid)),
3608
                Action::Read(PeerMessage::Transaction(Box::new(
3609
                    (&transaction_1).try_into().unwrap(),
3610
                ))),
3611
                Action::Read(PeerMessage::Bye),
3612
            ]);
3613

3614
            let (hsd_1, _sa_1) = get_dummy_peer_connection_data_genesis(network, 1);
3615

3616
            // Mock a timestamp to allow transaction to be considered valid
3617
            let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
3618
                to_main_tx,
3619
                state_lock.clone(),
3620
                get_dummy_socket_address(0),
3621
                hsd_1,
3622
                true,
3623
                1,
3624
                now,
3625
            );
3626

3627
            let mut peer_state = MutablePeerState::new(hsd_1.tip_header.height);
3628

3629
            assert!(
3630
                state_lock.lock_guard().await.mempool.is_empty(),
3631
                "Mempool must be empty at init"
3632
            );
3633
            peer_loop_handler
3634
                .run(mock, from_main_rx_clone, &mut peer_state)
3635
                .await
3636
                .unwrap();
3637

3638
            // Transaction must be sent to `main_loop`. The transaction is stored to the mempool
3639
            // by the `main_loop`.
3640
            match to_main_rx1.recv().await {
3641
                Some(PeerTaskToMain::Transaction(_)) => (),
3642
                _ => panic!("Must receive remove of peer block max height"),
3643
            };
3644
        }
3645

3646
        #[traced_test]
3647
        #[apply(shared_tokio_runtime)]
3648
        async fn populated_mempool_request_tx_test() -> Result<()> {
3649
            // In this scenario the peer is informed of a transaction that it already knows
3650

3651
            let network = Network::Main;
3652
            let (
3653
                _peer_broadcast_tx,
3654
                from_main_rx_clone,
3655
                to_main_tx,
3656
                mut to_main_rx1,
3657
                mut state_lock,
3658
                _hsd,
3659
            ) = get_test_genesis_setup(network, 1, cli_args::Args::default())
3660
                .await
3661
                .unwrap();
3662
            let spending_key = state_lock
3663
                .lock_guard()
3664
                .await
3665
                .wallet_state
3666
                .wallet_entropy
3667
                .nth_symmetric_key_for_tests(0);
3668

3669
            let genesis_block = Block::genesis(network);
3670
            let now = genesis_block.kernel.header.timestamp;
3671
            let config = TxCreationConfig::default()
3672
                .recover_change_off_chain(spending_key.into())
3673
                .with_prover_capability(TxProvingCapability::ProofCollection);
3674
            let consensus_rule_set = ConsensusRuleSet::infer_from(network, BlockHeight::genesis());
3675
            let transaction_1: Transaction = state_lock
3676
                .api()
3677
                .tx_initiator_internal()
3678
                .create_transaction(
3679
                    Default::default(),
3680
                    NativeCurrencyAmount::coins(0),
3681
                    now,
3682
                    config,
3683
                    consensus_rule_set,
3684
                )
3685
                .await
3686
                .unwrap()
3687
                .transaction
3688
                .into();
3689

3690
            let (hsd_1, _sa_1) = get_dummy_peer_connection_data_genesis(network, 1);
3691
            let mut peer_loop_handler = PeerLoopHandler::new(
3692
                to_main_tx,
3693
                state_lock.clone(),
3694
                get_dummy_socket_address(0),
3695
                hsd_1,
3696
                true,
3697
                1,
3698
            );
3699
            let mut peer_state = MutablePeerState::new(hsd_1.tip_header.height);
3700

3701
            assert!(
3702
                state_lock.lock_guard().await.mempool.is_empty(),
3703
                "Mempool must be empty at init"
3704
            );
3705
            state_lock
3706
                .lock_guard_mut()
3707
                .await
3708
                .mempool_insert(transaction_1.clone(), UpgradePriority::Irrelevant)
3709
                .await;
3710
            assert!(
3711
                !state_lock.lock_guard().await.mempool.is_empty(),
3712
                "Mempool must be non-empty after insertion"
3713
            );
3714

3715
            // Run the peer loop and verify expected exchange -- namely that the
3716
            // tx notification is received and the the transaction is *not*
3717
            // requested.
3718
            let tx_notification: TransactionNotification = (&transaction_1).try_into().unwrap();
3719
            let mock = Mock::new(vec![
3720
                Action::Read(PeerMessage::TransactionNotification(tx_notification)),
3721
                Action::Read(PeerMessage::Bye),
3722
            ]);
3723
            peer_loop_handler
3724
                .run(mock, from_main_rx_clone, &mut peer_state)
3725
                .await
3726
                .unwrap();
3727

3728
            // nothing is allowed to be sent to `main_loop`
3729
            match to_main_rx1.try_recv() {
3730
                Err(TryRecvError::Empty) => (),
3731
                Err(TryRecvError::Disconnected) => panic!("to_main channel must still be open"),
3732
                Ok(_) => panic!("to_main channel must be empty"),
3733
            };
3734
            Ok(())
3735
        }
3736

3737
        #[traced_test]
3738
        #[apply(shared_tokio_runtime)]
3739
        async fn accepts_tx_with_updated_mutator_set() {
3740
            // Scenario: node has transaction in mempool and receives
3741
            // transaction notification for the same transaction with an updated
3742
            // mutator set. The node must request the new transaction and it
3743
            // must be passed on to main loop.
3744
            //
3745
            // Both ProofCollection and SingleProof backed transactions are
3746
            // tested.
3747

3748
            enum ProofType {
3749
                ProofCollection,
3750
                SingleProof,
3751
            }
3752

3753
            let network = Network::Main;
3754

3755
            for proof_type in [ProofType::ProofCollection, ProofType::SingleProof] {
3756
                let proof_job_options = TritonVmProofJobOptions::default();
3757
                let upgrade =
3758
                    async |primitive_witness: PrimitiveWitness,
3759
                           consensus_rule_set: ConsensusRuleSet| {
3760
                        match proof_type {
3761
                            ProofType::ProofCollection => {
3762
                                PrimitiveWitnessToProofCollection { primitive_witness }
3763
                                    .upgrade(vm_job_queue(), &proof_job_options)
3764
                                    .await
3765
                                    .unwrap()
3766
                            }
3767
                            ProofType::SingleProof => {
3768
                                PrimitiveWitnessToSingleProof { primitive_witness }
3769
                                    .upgrade(vm_job_queue(), &proof_job_options, consensus_rule_set)
3770
                                    .await
3771
                                    .unwrap()
3772
                            }
3773
                        }
3774
                    };
3775

3776
                let (
3777
                    _peer_broadcast_tx,
3778
                    from_main_rx_clone,
3779
                    to_main_tx,
3780
                    mut to_main_rx1,
3781
                    mut state_lock,
3782
                    _hsd,
3783
                ) = get_test_genesis_setup(network, 1, cli_args::Args::default())
3784
                    .await
3785
                    .unwrap();
3786
                let consensus_rule_set =
3787
                    ConsensusRuleSet::infer_from(network, BlockHeight::genesis());
3788
                let fee = NativeCurrencyAmount::from_nau(500);
3789
                let pw_genesis =
3790
                    genesis_tx_with_proof_type(TxProvingCapability::PrimitiveWitness, network, fee)
3791
                        .await
3792
                        .proof
3793
                        .clone()
3794
                        .into_primitive_witness();
3795

3796
                let tx_synced_to_genesis = upgrade(pw_genesis.clone(), consensus_rule_set).await;
3797

3798
                let genesis_block = Block::genesis(network);
3799
                let block1 = fake_deterministic_successor(&genesis_block, network).await;
3800
                state_lock
3801
                    .lock_guard_mut()
3802
                    .await
3803
                    .set_new_tip(block1.clone())
3804
                    .await
3805
                    .unwrap();
3806

3807
                state_lock
3808
                    .lock_guard_mut()
3809
                    .await
3810
                    .mempool_insert(tx_synced_to_genesis, UpgradePriority::Irrelevant)
3811
                    .await;
3812

3813
                // Mempool should now contain the unsynced transaction. Tip is block 1.
3814
                let pw_block1 =
3815
                    pw_genesis.update_with_new_ms_data(block1.mutator_set_update().unwrap());
3816
                let tx_synced_to_block1 = upgrade(pw_block1, consensus_rule_set).await;
3817

3818
                let tx_notification: TransactionNotification =
3819
                    (&tx_synced_to_block1).try_into().unwrap();
3820
                let mock = Mock::new(vec![
3821
                    Action::Read(PeerMessage::TransactionNotification(tx_notification)),
3822
                    Action::Write(PeerMessage::TransactionRequest(tx_notification.txid)),
3823
                    Action::Read(PeerMessage::Transaction(Box::new(
3824
                        (&tx_synced_to_block1).try_into().unwrap(),
3825
                    ))),
3826
                    Action::Read(PeerMessage::Bye),
3827
                ]);
3828

3829
                // Mock a timestamp to allow transaction to be considered valid
3830
                let now = tx_synced_to_block1.kernel.timestamp;
3831
                let (hsd_1, _sa_1) = get_dummy_peer_connection_data_genesis(network, 1);
3832
                let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
3833
                    to_main_tx,
3834
                    state_lock.clone(),
3835
                    get_dummy_socket_address(0),
3836
                    hsd_1,
3837
                    true,
3838
                    1,
3839
                    now,
3840
                );
3841

3842
                let mut peer_state = MutablePeerState::new(hsd_1.tip_header.height);
3843
                peer_loop_handler
3844
                    .run(mock, from_main_rx_clone, &mut peer_state)
3845
                    .await
3846
                    .unwrap();
3847

3848
                // Transaction must be sent to `main_loop`. The transaction is stored to the mempool
3849
                // by the `main_loop`.
3850
                match to_main_rx1.recv().await {
3851
                    Some(PeerTaskToMain::Transaction(_)) => (),
3852
                    _ => panic!("Main loop must receive new transaction"),
3853
                };
3854
            }
3855
        }
3856
    }
3857

3858
    mod block_proposals {
3859
        use super::*;
3860

3861
        struct TestSetup {
3862
            peer_loop_handler: PeerLoopHandler,
3863
            to_main_rx: mpsc::Receiver<PeerTaskToMain>,
3864
            from_main_rx: broadcast::Receiver<MainToPeerTask>,
3865
            peer_state: MutablePeerState,
3866
            to_main_tx: mpsc::Sender<PeerTaskToMain>,
3867
            genesis_block: Block,
3868
            peer_broadcast_tx: broadcast::Sender<MainToPeerTask>,
3869
        }
3870

3871
        async fn genesis_setup(network: Network) -> TestSetup {
3872
            let (peer_broadcast_tx, from_main_rx, to_main_tx, to_main_rx, alice, _hsd) =
3873
                get_test_genesis_setup(network, 0, cli_args::Args::default())
3874
                    .await
3875
                    .unwrap();
3876
            let peer_hsd = get_dummy_handshake_data_for_genesis(network);
3877
            let peer_loop_handler = PeerLoopHandler::new(
3878
                to_main_tx.clone(),
3879
                alice.clone(),
3880
                get_dummy_socket_address(0),
3881
                peer_hsd,
3882
                true,
3883
                1,
3884
            );
3885
            let peer_state = MutablePeerState::new(peer_hsd.tip_header.height);
3886

3887
            // (peer_loop_handler, to_main_rx1)
3888
            TestSetup {
3889
                peer_broadcast_tx,
3890
                peer_loop_handler,
3891
                to_main_rx,
3892
                from_main_rx,
3893
                peer_state,
3894
                to_main_tx,
3895
                genesis_block: Block::genesis(network),
3896
            }
3897
        }
3898

3899
        #[traced_test]
3900
        #[apply(shared_tokio_runtime)]
3901
        async fn accept_block_proposal_height_one() {
3902
            // Node knows genesis block, receives a block proposal for block 1
3903
            // and must accept this. Verify that main loop is informed of block
3904
            // proposal.
3905
            let TestSetup {
3906
                peer_broadcast_tx,
3907
                mut peer_loop_handler,
3908
                mut to_main_rx,
3909
                from_main_rx,
3910
                mut peer_state,
3911
                to_main_tx,
3912
                genesis_block,
3913
            } = genesis_setup(Network::Main).await;
3914
            let block1 = fake_valid_block_for_tests(
3915
                &peer_loop_handler.global_state_lock,
3916
                StdRng::seed_from_u64(5550001).random(),
3917
            )
3918
            .await;
3919

3920
            let mock = Mock::new(vec![
3921
                Action::Read(PeerMessage::BlockProposal(Box::new(block1))),
3922
                Action::Read(PeerMessage::Bye),
3923
            ]);
3924
            peer_loop_handler
3925
                .run(mock, from_main_rx, &mut peer_state)
3926
                .await
3927
                .unwrap();
3928

3929
            match to_main_rx.try_recv().unwrap() {
3930
                PeerTaskToMain::BlockProposal(block) => {
3931
                    assert_eq!(genesis_block.hash(), block.header().prev_block_digest);
3932
                }
3933
                _ => panic!("Expected main loop to be informed of block proposal"),
3934
            };
3935

3936
            drop(to_main_tx);
3937
            drop(peer_broadcast_tx);
3938
        }
3939

3940
        #[traced_test]
3941
        #[apply(shared_tokio_runtime)]
3942
        async fn accept_block_proposal_notification_height_one() {
3943
            // Node knows genesis block, receives a block proposal notification
3944
            // for block 1 and must accept this by requesting the block
3945
            // proposal from peer.
3946
            let TestSetup {
3947
                peer_broadcast_tx,
3948
                mut peer_loop_handler,
3949
                to_main_rx: _,
3950
                from_main_rx,
3951
                mut peer_state,
3952
                to_main_tx,
3953
                ..
3954
            } = genesis_setup(Network::Main).await;
3955
            let block1 = fake_valid_block_for_tests(
3956
                &peer_loop_handler.global_state_lock,
3957
                StdRng::seed_from_u64(5550001).random(),
3958
            )
3959
            .await;
3960

3961
            let mock = Mock::new(vec![
3962
                Action::Read(PeerMessage::BlockProposalNotification((&block1).into())),
3963
                Action::Write(PeerMessage::BlockProposalRequest(
3964
                    BlockProposalRequest::new(block1.body().mast_hash()),
3965
                )),
3966
                Action::Read(PeerMessage::Bye),
3967
            ]);
3968
            peer_loop_handler
3969
                .run(mock, from_main_rx, &mut peer_state)
3970
                .await
3971
                .unwrap();
3972

3973
            drop(to_main_tx);
3974
            drop(peer_broadcast_tx);
3975
        }
3976
    }
3977

3978
    mod proof_qualities {
3979
        use strum::IntoEnumIterator;
3980

3981
        use super::*;
3982
        use crate::config_models::cli_args;
3983
        use crate::models::blockchain::transaction::Transaction;
3984
        use crate::models::peer::transfer_transaction::TransactionProofQuality;
3985
        use crate::models::state::wallet::transaction_output::TxOutput;
3986
        use crate::tests::shared::globalstate::mock_genesis_global_state;
3987

3988
        async fn tx_of_proof_quality(
3989
            network: Network,
3990
            quality: TransactionProofQuality,
3991
        ) -> Transaction {
3992
            let wallet_secret = WalletEntropy::devnet_wallet();
3993
            let alice_key = wallet_secret.nth_generation_spending_key_for_tests(0);
3994
            let alice_gsl = mock_genesis_global_state(
3995
                1,
3996
                wallet_secret,
3997
                cli_args::Args::default_with_network(network),
3998
            )
3999
            .await;
4000
            let alice = alice_gsl.lock_guard().await;
4001
            let genesis_block = alice.chain.light_state();
4002
            let in_seven_months = genesis_block.header().timestamp + Timestamp::months(7);
4003
            let prover_capability = match quality {
4004
                TransactionProofQuality::ProofCollection => TxProvingCapability::ProofCollection,
4005
                TransactionProofQuality::SingleProof => TxProvingCapability::SingleProof,
4006
            };
4007
            let config = TxCreationConfig::default()
4008
                .recover_change_off_chain(alice_key.into())
4009
                .with_prover_capability(prover_capability);
4010
            let block_height = genesis_block.header().height;
4011
            let consensus_rule_set = ConsensusRuleSet::infer_from(network, block_height);
4012
            alice_gsl
4013
                .api()
4014
                .tx_initiator_internal()
4015
                .create_transaction(
4016
                    Vec::<TxOutput>::new().into(),
4017
                    NativeCurrencyAmount::coins(1),
4018
                    in_seven_months,
4019
                    config,
4020
                    consensus_rule_set,
4021
                )
4022
                .await
4023
                .unwrap()
4024
                .transaction()
4025
                .clone()
4026
        }
4027

4028
        #[traced_test]
4029
        #[apply(shared_tokio_runtime)]
4030
        async fn client_favors_higher_proof_quality() {
4031
            // In this scenario the peer is informed of a transaction that it
4032
            // already knows, and it's tested that it checks the proof quality
4033
            // field and verifies that it exceeds the proof in the mempool
4034
            // before requesting the transasction.
4035
            let network = Network::Main;
4036
            let proof_collection_tx =
4037
                tx_of_proof_quality(network, TransactionProofQuality::ProofCollection).await;
4038
            let single_proof_tx =
4039
                tx_of_proof_quality(network, TransactionProofQuality::SingleProof).await;
4040

4041
            for (own_tx_pq, new_tx_pq) in
4042
                TransactionProofQuality::iter().cartesian_product(TransactionProofQuality::iter())
4043
            {
4044
                use TransactionProofQuality::*;
4045

4046
                let (
4047
                    _peer_broadcast_tx,
4048
                    from_main_rx_clone,
4049
                    to_main_tx,
4050
                    mut to_main_rx1,
4051
                    mut alice,
4052
                    handshake,
4053
                ) = get_test_genesis_setup(network, 1, cli_args::Args::default())
4054
                    .await
4055
                    .unwrap();
4056

4057
                let (own_tx, new_tx) = match (own_tx_pq, new_tx_pq) {
4058
                    (ProofCollection, ProofCollection) => {
4059
                        (&proof_collection_tx, &proof_collection_tx)
4060
                    }
4061
                    (ProofCollection, SingleProof) => (&proof_collection_tx, &single_proof_tx),
4062
                    (SingleProof, ProofCollection) => (&single_proof_tx, &proof_collection_tx),
4063
                    (SingleProof, SingleProof) => (&single_proof_tx, &single_proof_tx),
4064
                };
4065

4066
                alice
4067
                    .lock_guard_mut()
4068
                    .await
4069
                    .mempool_insert((*own_tx).to_owned(), UpgradePriority::Irrelevant)
4070
                    .await;
4071

4072
                let tx_notification: TransactionNotification = new_tx.try_into().unwrap();
4073

4074
                let own_proof_is_supreme = own_tx_pq >= new_tx_pq;
4075
                let mock = if own_proof_is_supreme {
4076
                    Mock::new(vec![
4077
                        Action::Read(PeerMessage::TransactionNotification(tx_notification)),
4078
                        Action::Read(PeerMessage::Bye),
4079
                    ])
4080
                } else {
4081
                    Mock::new(vec![
4082
                        Action::Read(PeerMessage::TransactionNotification(tx_notification)),
4083
                        Action::Write(PeerMessage::TransactionRequest(tx_notification.txid)),
4084
                        Action::Read(PeerMessage::Transaction(Box::new(
4085
                            new_tx.try_into().unwrap(),
4086
                        ))),
4087
                        Action::Read(PeerMessage::Bye),
4088
                    ])
4089
                };
4090

4091
                let now = proof_collection_tx.kernel.timestamp;
4092
                let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
4093
                    to_main_tx,
4094
                    alice.clone(),
4095
                    get_dummy_socket_address(0),
4096
                    handshake,
4097
                    true,
4098
                    1,
4099
                    now,
4100
                );
4101
                let mut peer_state = MutablePeerState::new(handshake.tip_header.height);
4102

4103
                peer_loop_handler
4104
                    .run(mock, from_main_rx_clone, &mut peer_state)
4105
                    .await
4106
                    .unwrap();
4107

4108
                if own_proof_is_supreme {
4109
                    match to_main_rx1.try_recv() {
4110
                        Err(TryRecvError::Empty) => (),
4111
                        Err(TryRecvError::Disconnected) => {
4112
                            panic!("to_main channel must still be open")
4113
                        }
4114
                        Ok(_) => panic!("to_main channel must be empty"),
4115
                    }
4116
                } else {
4117
                    match to_main_rx1.try_recv() {
4118
                        Err(TryRecvError::Empty) => panic!("Transaction must be sent to main loop"),
4119
                        Err(TryRecvError::Disconnected) => {
4120
                            panic!("to_main channel must still be open")
4121
                        }
4122
                        Ok(PeerTaskToMain::Transaction(_)) => (),
4123
                        _ => panic!("Unexpected result from channel"),
4124
                    }
4125
                }
4126
            }
4127
        }
4128
    }
4129

4130
    mod sync_challenges {
4131
        use super::*;
4132
        use crate::tests::shared::blocks::fake_valid_sequence_of_blocks_for_tests_dyn;
4133

4134
        #[traced_test]
4135
        #[apply(shared_tokio_runtime)]
4136
        async fn bad_sync_challenge_height_greater_than_tip() {
4137
            // Criterium: Challenge height may not exceed that of tip in the
4138
            // request.
4139

4140
            let network = Network::Main;
4141
            let (
4142
                _alice_main_to_peer_tx,
4143
                alice_main_to_peer_rx,
4144
                alice_peer_to_main_tx,
4145
                alice_peer_to_main_rx,
4146
                mut alice,
4147
                alice_hsd,
4148
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default())
4149
                .await
4150
                .unwrap();
4151
            let genesis_block: Block = Block::genesis(network);
4152
            let blocks: [Block; 11] = fake_valid_sequence_of_blocks_for_tests(
4153
                &genesis_block,
4154
                Timestamp::hours(1),
4155
                Default::default(),
4156
                network,
4157
            )
4158
            .await;
4159
            for block in &blocks {
4160
                alice.set_new_tip(block.clone()).await.unwrap();
4161
            }
4162

4163
            let bh12 = blocks.last().unwrap().header().height;
4164
            let sync_challenge = SyncChallenge {
4165
                tip_digest: blocks[9].hash(),
4166
                challenges: [bh12; 10],
4167
            };
4168
            let alice_p2p_messages = Mock::new(vec![
4169
                Action::Read(PeerMessage::SyncChallenge(sync_challenge)),
4170
                Action::Read(PeerMessage::Bye),
4171
            ]);
4172

4173
            let peer_address = get_dummy_socket_address(0);
4174
            let mut alice_peer_loop_handler = PeerLoopHandler::new(
4175
                alice_peer_to_main_tx.clone(),
4176
                alice.clone(),
4177
                peer_address,
4178
                alice_hsd,
4179
                false,
4180
                1,
4181
            );
4182
            alice_peer_loop_handler
4183
                .run_wrapper(alice_p2p_messages, alice_main_to_peer_rx)
4184
                .await
4185
                .unwrap();
4186

4187
            drop(alice_peer_to_main_rx);
4188

4189
            let latest_sanction = alice
4190
                .lock_guard()
4191
                .await
4192
                .net
4193
                .get_peer_standing_from_database(peer_address.ip())
4194
                .await
4195
                .unwrap();
4196
            assert_eq!(
4197
                NegativePeerSanction::InvalidSyncChallenge,
4198
                latest_sanction
4199
                    .latest_punishment
4200
                    .expect("peer must be sanctioned")
4201
                    .0
4202
            );
4203
        }
4204

4205
        #[traced_test]
4206
        #[apply(shared_tokio_runtime)]
4207
        async fn bad_sync_challenge_genesis_block_doesnt_crash_client() {
4208
            // Criterium: Challenge may not point to genesis block, or block 1, as
4209
            // tip.
4210

4211
            let network = Network::Main;
4212
            let genesis_block: Block = Block::genesis(network);
4213

4214
            let alice_cli = cli_args::Args::default();
4215
            let (
4216
                _alice_main_to_peer_tx,
4217
                alice_main_to_peer_rx,
4218
                alice_peer_to_main_tx,
4219
                alice_peer_to_main_rx,
4220
                alice,
4221
                alice_hsd,
4222
            ) = get_test_genesis_setup(network, 0, alice_cli).await.unwrap();
4223

4224
            let sync_challenge = SyncChallenge {
4225
                tip_digest: genesis_block.hash(),
4226
                challenges: [BlockHeight::genesis(); 10],
4227
            };
4228

4229
            let alice_p2p_messages = Mock::new(vec![
4230
                Action::Read(PeerMessage::SyncChallenge(sync_challenge)),
4231
                Action::Read(PeerMessage::Bye),
4232
            ]);
4233

4234
            let peer_address = get_dummy_socket_address(0);
4235
            let mut alice_peer_loop_handler = PeerLoopHandler::new(
4236
                alice_peer_to_main_tx.clone(),
4237
                alice.clone(),
4238
                peer_address,
4239
                alice_hsd,
4240
                false,
4241
                1,
4242
            );
4243
            alice_peer_loop_handler
4244
                .run_wrapper(alice_p2p_messages, alice_main_to_peer_rx)
4245
                .await
4246
                .unwrap();
4247

4248
            drop(alice_peer_to_main_rx);
4249

4250
            let latest_sanction = alice
4251
                .lock_guard()
4252
                .await
4253
                .net
4254
                .get_peer_standing_from_database(peer_address.ip())
4255
                .await
4256
                .unwrap();
4257
            assert_eq!(
4258
                NegativePeerSanction::InvalidSyncChallenge,
4259
                latest_sanction
4260
                    .latest_punishment
4261
                    .expect("peer must be sanctioned")
4262
                    .0
4263
            );
4264
        }
4265

4266
        #[traced_test]
4267
        #[apply(shared_tokio_runtime)]
4268
        async fn sync_challenge_happy_path() -> Result<()> {
4269
            // Bob notifies Alice of a block whose parameters satisfy the sync mode
4270
            // criterion. Alice issues a challenge. Bob responds. Alice enters into
4271
            // sync mode.
4272

4273
            let mut rng = rand::rng();
4274
            let network = Network::Main;
4275
            let genesis_block: Block = Block::genesis(network);
4276

4277
            const ALICE_SYNC_MODE_THRESHOLD: usize = 10;
4278
            let alice_cli = cli_args::Args {
4279
                sync_mode_threshold: ALICE_SYNC_MODE_THRESHOLD,
4280
                ..Default::default()
4281
            };
4282
            let (
4283
                _alice_main_to_peer_tx,
4284
                alice_main_to_peer_rx,
4285
                alice_peer_to_main_tx,
4286
                mut alice_peer_to_main_rx,
4287
                mut alice,
4288
                alice_hsd,
4289
            ) = get_test_genesis_setup(network, 0, alice_cli).await?;
4290
            let _alice_socket_address = get_dummy_socket_address(0);
4291

4292
            let (
4293
                _bob_main_to_peer_tx,
4294
                _bob_main_to_peer_rx,
4295
                _bob_peer_to_main_tx,
4296
                _bob_peer_to_main_rx,
4297
                mut bob,
4298
                _bob_hsd,
4299
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
4300
            let bob_socket_address = get_dummy_socket_address(0);
4301

4302
            let now = genesis_block.header().timestamp + Timestamp::hours(1);
4303
            let block_1 = fake_valid_block_for_tests(&alice, rng.random()).await;
4304
            assert!(
4305
                block_1.is_valid(&genesis_block, now, network).await,
4306
                "Block must be valid for this test to make sense"
4307
            );
4308
            let alice_tip = &block_1;
4309
            alice.set_new_tip(block_1.clone()).await?;
4310
            bob.set_new_tip(block_1.clone()).await?;
4311

4312
            // produce enough blocks to ensure alice needs to go into sync mode
4313
            // with this block notification.
4314
            let blocks = fake_valid_sequence_of_blocks_for_tests_dyn(
4315
                &block_1,
4316
                network.target_block_interval(),
4317
                (0..rng.random_range(ALICE_SYNC_MODE_THRESHOLD + 1..20))
4318
                    .map(|_| rng.random())
4319
                    .collect_vec(),
4320
                network,
4321
            )
4322
            .await;
4323
            for block in &blocks {
4324
                bob.set_new_tip(block.clone()).await?;
4325
            }
4326
            let bob_tip = blocks.last().unwrap();
4327

4328
            let block_notification_from_bob = PeerBlockNotification {
4329
                hash: bob_tip.hash(),
4330
                height: bob_tip.header().height,
4331
                cumulative_proof_of_work: bob_tip.header().cumulative_proof_of_work,
4332
            };
4333

4334
            let alice_rng_seed = rng.random::<[u8; 32]>();
4335
            let mut alice_rng_clone = StdRng::from_seed(alice_rng_seed);
4336
            let sync_challenge_from_alice = SyncChallenge::generate(
4337
                &block_notification_from_bob,
4338
                alice_tip.header().height,
4339
                alice_rng_clone.random(),
4340
            );
4341

4342
            println!(
4343
                "sync challenge from alice:\n{:?}",
4344
                sync_challenge_from_alice
4345
            );
4346

4347
            let sync_challenge_response_from_bob = bob
4348
                .lock_guard()
4349
                .await
4350
                .response_to_sync_challenge(sync_challenge_from_alice)
4351
                .await
4352
                .expect("should be able to respond to sync challenge");
4353

4354
            let alice_p2p_messages = Mock::new(vec![
4355
                Action::Read(PeerMessage::BlockNotification(block_notification_from_bob)),
4356
                Action::Write(PeerMessage::SyncChallenge(sync_challenge_from_alice)),
4357
                Action::Read(PeerMessage::SyncChallengeResponse(Box::new(
4358
                    sync_challenge_response_from_bob,
4359
                ))),
4360
                Action::Read(PeerMessage::BlockNotification(block_notification_from_bob)),
4361
                // Ensure no 2nd sync challenge is sent, as timeout has not yet passed.
4362
                // The absence of a Write here checks that a 2nd challenge isn't sent
4363
                // when a successful was just received.
4364
                Action::Read(PeerMessage::Bye),
4365
            ]);
4366

4367
            let mut alice_peer_loop_handler = PeerLoopHandler::with_mocked_time(
4368
                alice_peer_to_main_tx.clone(),
4369
                alice.clone(),
4370
                bob_socket_address,
4371
                alice_hsd,
4372
                false,
4373
                1,
4374
                bob_tip.header().timestamp,
4375
            );
4376
            alice_peer_loop_handler.set_rng(StdRng::from_seed(alice_rng_seed));
4377
            alice_peer_loop_handler
4378
                .run_wrapper(alice_p2p_messages, alice_main_to_peer_rx)
4379
                .await?;
4380

4381
            // AddPeerMaxBlockHeight message triggered *after* sync challenge
4382
            let mut expected_anchor_mmra = bob_tip.body().block_mmr_accumulator.clone();
4383
            expected_anchor_mmra.append(bob_tip.hash());
4384
            let expected_message_from_alice_peer_loop = PeerTaskToMain::AddPeerMaxBlockHeight {
4385
                peer_address: bob_socket_address,
4386
                claimed_height: bob_tip.header().height,
4387
                claimed_cumulative_pow: bob_tip.header().cumulative_proof_of_work,
4388
                claimed_block_mmra: expected_anchor_mmra,
4389
            };
4390
            let observed_message_from_alice_peer_loop = alice_peer_to_main_rx.recv().await.unwrap();
4391
            assert_eq!(
4392
                expected_message_from_alice_peer_loop,
4393
                observed_message_from_alice_peer_loop
4394
            );
4395

4396
            Ok(())
4397
        }
4398
    }
4399
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc