• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Neptune-Crypto / neptune-core / 16637402370

31 Jul 2025 01:17AM UTC coverage: 74.067% (-0.04%) from 74.103%
16637402370

Pull #637

github

web-flow
Merge 1821519d6 into cb206efa3
Pull Request #637: feat: add dump-db command to neptune-cli

0 of 19 new or added lines in 1 file covered. (0.0%)

9 existing lines in 4 files now uncovered.

22923 of 30949 relevant lines covered (74.07%)

623077.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.64
/src/peer_loop.rs
1
use std::cmp;
2
use std::marker::Unpin;
3
use std::net::SocketAddr;
4
use std::time::SystemTime;
5

6
use anyhow::bail;
7
use anyhow::Result;
8
use chrono::DateTime;
9
use chrono::Utc;
10
use futures::sink::Sink;
11
use futures::sink::SinkExt;
12
use futures::stream::TryStream;
13
use futures::stream::TryStreamExt;
14
use itertools::Itertools;
15
use rand::rngs::StdRng;
16
use rand::Rng;
17
use rand::SeedableRng;
18
use tasm_lib::triton_vm::prelude::Digest;
19
use tasm_lib::twenty_first::prelude::Mmr;
20
use tasm_lib::twenty_first::prelude::MmrMembershipProof;
21
use tasm_lib::twenty_first::util_types::mmr::mmr_accumulator::MmrAccumulator;
22
use tokio::select;
23
use tokio::sync::broadcast;
24
use tokio::sync::mpsc;
25
use tracing::debug;
26
use tracing::error;
27
use tracing::info;
28
use tracing::warn;
29

30
use crate::connect_to_peers::close_peer_connected_callback;
31
use crate::macros::fn_name;
32
use crate::macros::log_slow_scope;
33
use crate::main_loop::MAX_NUM_DIGESTS_IN_BATCH_REQUEST;
34
use crate::models::blockchain::block::block_height::BlockHeight;
35
use crate::models::blockchain::block::mutator_set_update::MutatorSetUpdate;
36
use crate::models::blockchain::block::Block;
37
use crate::models::blockchain::consensus_rule_set::ConsensusRuleSet;
38
use crate::models::blockchain::transaction::transaction_kernel::TransactionConfirmabilityError;
39
use crate::models::blockchain::transaction::Transaction;
40
use crate::models::channel::MainToPeerTask;
41
use crate::models::channel::PeerTaskToMain;
42
use crate::models::channel::PeerTaskToMainTransaction;
43
use crate::models::peer::handshake_data::HandshakeData;
44
use crate::models::peer::peer_info::PeerConnectionInfo;
45
use crate::models::peer::peer_info::PeerInfo;
46
use crate::models::peer::transfer_block::TransferBlock;
47
use crate::models::peer::BlockProposalRequest;
48
use crate::models::peer::BlockRequestBatch;
49
use crate::models::peer::IssuedSyncChallenge;
50
use crate::models::peer::MutablePeerState;
51
use crate::models::peer::NegativePeerSanction;
52
use crate::models::peer::PeerMessage;
53
use crate::models::peer::PeerSanction;
54
use crate::models::peer::PeerStanding;
55
use crate::models::peer::PositivePeerSanction;
56
use crate::models::peer::SyncChallenge;
57
use crate::models::proof_abstractions::mast_hash::MastHash;
58
use crate::models::proof_abstractions::timestamp::Timestamp;
59
use crate::models::state::block_proposal::BlockProposalRejectError;
60
use crate::models::state::mempool::MEMPOOL_IGNORE_TRANSACTIONS_THIS_MANY_SECS_AHEAD;
61
use crate::models::state::mempool::MEMPOOL_TX_THRESHOLD_AGE_IN_SECS;
62
use crate::models::state::GlobalState;
63
use crate::models::state::GlobalStateLock;
64
use crate::util_types::mutator_set::removal_record::RemovalRecordValidityError;
65

66
const STANDARD_BLOCK_BATCH_SIZE: usize = 250;
67
const MAX_PEER_LIST_LENGTH: usize = 10;
68
const MINIMUM_BLOCK_BATCH_SIZE: usize = 2;
69

70
const KEEP_CONNECTION_ALIVE: bool = false;
71
const DISCONNECT_CONNECTION: bool = true;
72

73
pub type PeerStandingNumber = i32;
74

75
/// Handles messages from peers via TCP
76
///
77
/// also handles messages from main task over the main-to-peer-tasks broadcast
78
/// channel.
79
#[derive(Debug, Clone)]
80
pub struct PeerLoopHandler {
81
    to_main_tx: mpsc::Sender<PeerTaskToMain>,
82
    global_state_lock: GlobalStateLock,
83
    peer_address: SocketAddr,
84
    peer_handshake_data: HandshakeData,
85
    inbound_connection: bool,
86
    distance: u8,
87
    rng: StdRng,
88
    #[cfg(test)]
89
    mock_now: Option<Timestamp>,
90
}
91

92
impl PeerLoopHandler {
93
    pub(crate) fn new(
29✔
94
        to_main_tx: mpsc::Sender<PeerTaskToMain>,
29✔
95
        global_state_lock: GlobalStateLock,
29✔
96
        peer_address: SocketAddr,
29✔
97
        peer_handshake_data: HandshakeData,
29✔
98
        inbound_connection: bool,
29✔
99
        distance: u8,
29✔
100
    ) -> Self {
29✔
101
        Self {
29✔
102
            to_main_tx,
29✔
103
            global_state_lock,
29✔
104
            peer_address,
29✔
105
            peer_handshake_data,
29✔
106
            inbound_connection,
29✔
107
            distance,
29✔
108
            rng: StdRng::from_rng(&mut rand::rng()),
29✔
109
            #[cfg(test)]
29✔
110
            mock_now: None,
29✔
111
        }
29✔
112
    }
29✔
113

114
    /// Allows for mocked timestamps such that time dependencies may be tested.
115
    #[cfg(test)]
116
    pub(crate) fn with_mocked_time(
22✔
117
        to_main_tx: mpsc::Sender<PeerTaskToMain>,
22✔
118
        global_state_lock: GlobalStateLock,
22✔
119
        peer_address: SocketAddr,
22✔
120
        peer_handshake_data: HandshakeData,
22✔
121
        inbound_connection: bool,
22✔
122
        distance: u8,
22✔
123
        mocked_time: Timestamp,
22✔
124
    ) -> Self {
22✔
125
        Self {
22✔
126
            to_main_tx,
22✔
127
            global_state_lock,
22✔
128
            peer_address,
22✔
129
            peer_handshake_data,
22✔
130
            inbound_connection,
22✔
131
            distance,
22✔
132
            mock_now: Some(mocked_time),
22✔
133
            rng: StdRng::from_rng(&mut rand::rng()),
22✔
134
        }
22✔
135
    }
22✔
136

137
    /// Overwrite the random number generator object with a specific one.
138
    ///
139
    /// Useful for derandomizing tests.
140
    #[cfg(test)]
141
    fn set_rng(&mut self, rng: StdRng) {
1✔
142
        self.rng = rng;
1✔
143
    }
1✔
144

145
    fn now(&self) -> Timestamp {
56✔
146
        #[cfg(not(test))]
147
        {
148
            Timestamp::now()
27✔
149
        }
150
        #[cfg(test)]
151
        {
152
            self.mock_now.unwrap_or(Timestamp::now())
29✔
153
        }
154
    }
56✔
155

156
    /// Punish a peer for bad behavior.
157
    ///
158
    /// Return `Err` if the peer in question is (now) banned.
159
    ///
160
    /// # Locking:
161
    ///   * acquires `global_state_lock` for write
162
    async fn punish(&mut self, reason: NegativePeerSanction) -> Result<()> {
7✔
163
        let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
7✔
164
        warn!("Punishing peer {} for {:?}", self.peer_address.ip(), reason);
7✔
165
        debug!(
7✔
166
            "Peer standing before punishment is {}",
×
167
            global_state_mut
×
168
                .net
×
169
                .peer_map
×
170
                .get(&self.peer_address)
×
171
                .unwrap()
×
172
                .standing
173
        );
174

175
        let Some(peer_info) = global_state_mut.net.peer_map.get_mut(&self.peer_address) else {
7✔
176
            bail!("Could not read peer map.");
×
177
        };
178
        let sanction_result = peer_info.standing.sanction(PeerSanction::Negative(reason));
7✔
179
        if let Err(err) = sanction_result {
7✔
180
            warn!("Banning peer: {err}");
1✔
181
        }
6✔
182

183
        sanction_result.map_err(|err| anyhow::anyhow!("Banning peer: {err}"))
7✔
184
    }
7✔
185

186
    /// Reward a peer for good behavior.
187
    ///
188
    /// Return `Err` if the peer in question is banned.
189
    ///
190
    /// # Locking:
191
    ///   * acquires `global_state_lock` for write
192
    async fn reward(&mut self, reason: PositivePeerSanction) -> Result<()> {
19✔
193
        let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
19✔
194
        info!("Rewarding peer {} for {:?}", self.peer_address.ip(), reason);
19✔
195
        let Some(peer_info) = global_state_mut.net.peer_map.get_mut(&self.peer_address) else {
19✔
196
            error!("Could not read peer map.");
1✔
197
            return Ok(());
1✔
198
        };
199
        let sanction_result = peer_info.standing.sanction(PeerSanction::Positive(reason));
18✔
200
        if sanction_result.is_err() {
18✔
201
            error!("Cannot reward banned peer");
×
202
        }
18✔
203

204
        sanction_result.map_err(|err| anyhow::anyhow!("Cannot reward banned peer: {err}"))
18✔
205
    }
19✔
206

207
    /// Construct a batch response, with blocks and their MMR membership proofs
208
    /// relative to a specified anchor.
209
    ///
210
    /// Returns `None` if the anchor has a lower leaf count than the blocks, or
211
    /// a block height of the response exceeds own tip height.
212
    async fn batch_response(
16✔
213
        state: &GlobalState,
16✔
214
        blocks: Vec<Block>,
16✔
215
        anchor: &MmrAccumulator,
16✔
216
    ) -> Option<Vec<(TransferBlock, MmrMembershipProof)>> {
16✔
217
        let own_tip_height = state.chain.light_state().header().height;
16✔
218
        let block_heights_match_anchor = blocks
16✔
219
            .iter()
16✔
220
            .all(|bl| bl.header().height < anchor.num_leafs().into());
46✔
221
        let block_heights_known = blocks.iter().all(|bl| bl.header().height <= own_tip_height);
46✔
222
        if !block_heights_match_anchor || !block_heights_known {
16✔
223
            let max_block_height = match blocks.iter().map(|bl| bl.header().height).max() {
×
224
                Some(height) => height.to_string(),
×
225
                None => "None".to_owned(),
×
226
            };
227

228
            debug!("max_block_height: {max_block_height}");
×
229
            debug!("own_tip_height: {own_tip_height}");
×
230
            debug!("anchor.num_leafs(): {}", anchor.num_leafs());
×
231
            debug!("block_heights_match_anchor: {block_heights_match_anchor}");
×
232
            debug!("block_heights_known: {block_heights_known}");
×
233
            return None;
×
234
        }
16✔
235

236
        let mut ret = vec![];
16✔
237
        for block in blocks {
62✔
238
            let mmr_mp = state
46✔
239
                .chain
46✔
240
                .archival_state()
46✔
241
                .archival_block_mmr
46✔
242
                .ammr()
46✔
243
                .prove_membership_relative_to_smaller_mmr(
46✔
244
                    block.header().height.into(),
46✔
245
                    anchor.num_leafs(),
46✔
246
                )
46✔
247
                .await;
46✔
248
            let block: TransferBlock = block.try_into().unwrap();
46✔
249
            ret.push((block, mmr_mp));
46✔
250
        }
251

252
        Some(ret)
16✔
253
    }
16✔
254

255
    /// Handle validation and send all blocks to the main task if they're all
256
    /// valid. Use with a list of blocks or a single block. When the
257
    /// `received_blocks` is a list, the parent of the `i+1`th block in the
258
    /// list is the `i`th block. The parent of element zero in this list is
259
    /// `parent_of_first_block`.
260
    ///
261
    /// # Return Value
262
    ///  - `Err` when the connection should be closed;
263
    ///  - `Ok(None)` if some block is invalid
264
    ///  - `Ok(None)` if the last block has insufficient cumulative PoW and we
265
    ///    are not syncing;
266
    ///  - `Ok(None)` if the last block has insufficient height and we are
267
    ///    syncing;
268
    ///  - `Ok(Some(block_height))` otherwise, referring to the block with the
269
    ///    highest height in the batch.
270
    ///
271
    /// A return value of Ok(Some(_)) means that the message was passed on to
272
    /// main loop.
273
    ///
274
    /// # Locking
275
    ///   * Acquires `global_state_lock` for write via `self.punish(..)` and
276
    ///     `self.reward(..)`.
277
    ///
278
    /// # Panics
279
    ///
280
    ///  - Panics if called with the empty list.
281
    async fn handle_blocks(
20✔
282
        &mut self,
20✔
283
        received_blocks: Vec<Block>,
20✔
284
        parent_of_first_block: Block,
20✔
285
    ) -> Result<Option<BlockHeight>> {
20✔
286
        debug!(
20✔
287
            "attempting to validate {} {}",
×
288
            received_blocks.len(),
×
289
            if received_blocks.len() == 1 {
×
290
                "block"
×
291
            } else {
292
                "blocks"
×
293
            }
294
        );
295
        let now = self.now();
20✔
296
        debug!("validating with respect to current timestamp {now}");
20✔
297
        let mut previous_block = &parent_of_first_block;
20✔
298
        for new_block in &received_blocks {
48✔
299
            let new_block_has_proof_of_work = new_block.has_proof_of_work(
29✔
300
                self.global_state_lock.cli().network,
29✔
301
                previous_block.header(),
29✔
302
            );
303
            debug!("new block has proof of work? {new_block_has_proof_of_work}");
29✔
304
            let new_block_is_valid = new_block
29✔
305
                .is_valid(previous_block, now, self.global_state_lock.cli().network)
29✔
306
                .await;
29✔
307
            debug!("new block is valid? {new_block_is_valid}");
29✔
308
            if !new_block_has_proof_of_work {
29✔
309
                warn!(
1✔
310
                    "Received invalid proof-of-work for block of height {} from peer with IP {}",
×
311
                    new_block.kernel.header.height, self.peer_address
×
312
                );
313
                warn!("Difficulty is {}.", previous_block.kernel.header.difficulty);
1✔
314
                warn!(
1✔
315
                    "Proof of work should be {} (or more) but was [{}].",
×
316
                    previous_block.kernel.header.difficulty.target(),
×
317
                    new_block.hash().values().iter().join(", ")
×
318
                );
319
                self.punish(NegativePeerSanction::InvalidBlock((
1✔
320
                    new_block.kernel.header.height,
1✔
321
                    new_block.hash(),
1✔
322
                )))
1✔
323
                .await?;
1✔
324
                warn!("Failed to validate block due to insufficient PoW");
1✔
325
                return Ok(None);
1✔
326
            } else if !new_block_is_valid {
28✔
327
                warn!(
×
328
                    "Received invalid block of height {} from peer with IP {}",
×
329
                    new_block.kernel.header.height, self.peer_address
×
330
                );
331
                self.punish(NegativePeerSanction::InvalidBlock((
×
332
                    new_block.kernel.header.height,
×
333
                    new_block.hash(),
×
334
                )))
×
335
                .await?;
×
336
                warn!("Failed to validate block: invalid block");
×
337
                return Ok(None);
×
338
            }
28✔
339
            info!(
28✔
340
                "Block with height {} is valid. mined: {}",
×
341
                new_block.kernel.header.height,
×
342
                new_block.kernel.header.timestamp.standard_format()
×
343
            );
344

345
            previous_block = new_block;
28✔
346
        }
347

348
        // evaluate the fork choice rule
349
        debug!("Checking last block's canonicity ...");
19✔
350
        let last_block = received_blocks.last().unwrap();
19✔
351
        let is_canonical = self
19✔
352
            .global_state_lock
19✔
353
            .lock_guard()
19✔
354
            .await
19✔
355
            .incoming_block_is_more_canonical(last_block);
19✔
356
        let last_block_height = last_block.header().height;
19✔
357
        let sync_mode_active_and_have_new_champion = self
19✔
358
            .global_state_lock
19✔
359
            .lock_guard()
19✔
360
            .await
19✔
361
            .net
362
            .sync_anchor
363
            .as_ref()
19✔
364
            .is_some_and(|x| {
19✔
365
                x.champion
×
366
                    .is_none_or(|(height, _)| height < last_block_height)
×
367
            });
×
368
        if !is_canonical && !sync_mode_active_and_have_new_champion {
19✔
369
            warn!(
1✔
370
                "Received {} blocks from peer but incoming blocks are less \
×
371
            canonical than current tip, or current sync-champion.",
×
372
                received_blocks.len()
×
373
            );
374
            return Ok(None);
1✔
375
        }
18✔
376

377
        // Send the new blocks to the main task which handles the state update
378
        // and storage to the database.
379
        let number_of_received_blocks = received_blocks.len();
18✔
380
        self.to_main_tx
18✔
381
            .send(PeerTaskToMain::NewBlocks(received_blocks))
18✔
382
            .await?;
18✔
383
        info!(
18✔
384
            "Updated block info by block from peer. block height {}",
×
385
            last_block_height
386
        );
387

388
        // Valuable, new, hard-to-produce information. Reward peer.
389
        self.reward(PositivePeerSanction::ValidBlocks(number_of_received_blocks))
18✔
390
            .await?;
18✔
391

392
        Ok(Some(last_block_height))
18✔
393
    }
20✔
394

395
    /// Take a single block received from a peer and (attempt to) find a path
396
    /// between the received block and a common ancestor stored in the blocks
397
    /// database.
398
    ///
399
    /// This function attempts to find the parent of the received block, either
400
    /// by searching the database or by requesting it from a peer.
401
    ///  - If the parent is not stored, it is requested from the peer and the
402
    ///    received block is pushed to the fork reconciliation list for later
403
    ///    handling by this function. The fork reconciliation list starts out
404
    ///    empty, but grows as more parents are requested and transmitted.
405
    ///  - If the parent is found in the database, a) block handling continues:
406
    ///    the entire list of fork reconciliation blocks are passed down the
407
    ///    pipeline, potentially leading to a state update; and b) the fork
408
    ///    reconciliation list is cleared.
409
    ///
410
    /// Locking:
411
    ///   * Acquires `global_state_lock` for write via `self.punish(..)` and
412
    ///     `self.reward(..)`.
413
    async fn try_ensure_path<S>(
32✔
414
        &mut self,
32✔
415
        received_block: Box<Block>,
32✔
416
        peer: &mut S,
32✔
417
        peer_state: &mut MutablePeerState,
32✔
418
    ) -> Result<()>
32✔
419
    where
32✔
420
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
32✔
421
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
32✔
422
        <S as TryStream>::Error: std::error::Error,
32✔
423
    {
32✔
424
        // Does the received block match the fork reconciliation list?
425
        let received_block_matches_fork_reconciliation_list = if let Some(successor) =
32✔
426
            peer_state.fork_reconciliation_blocks.last()
32✔
427
        {
428
            let valid = successor
10✔
429
                .is_valid(
10✔
430
                    received_block.as_ref(),
10✔
431
                    self.now(),
10✔
432
                    self.global_state_lock.cli().network,
10✔
433
                )
10✔
434
                .await;
10✔
435
            if !valid {
10✔
436
                warn!(
×
437
                        "Fork reconciliation failed after receiving {} blocks: successor of received block is invalid",
×
438
                        peer_state.fork_reconciliation_blocks.len() + 1
×
439
                    );
440
            }
10✔
441
            valid
10✔
442
        } else {
443
            true
22✔
444
        };
445

446
        // Are we running out of RAM?
447
        let too_many_blocks = peer_state.fork_reconciliation_blocks.len() + 1
32✔
448
            >= self.global_state_lock.cli().sync_mode_threshold;
32✔
449
        if too_many_blocks {
32✔
450
            warn!(
1✔
451
                "Fork reconciliation failed after receiving {} blocks: block count exceeds sync mode threshold",
×
452
                peer_state.fork_reconciliation_blocks.len() + 1
×
453
            );
454
        }
31✔
455

456
        // Block mismatch or too many blocks: abort!
457
        if !received_block_matches_fork_reconciliation_list || too_many_blocks {
32✔
458
            self.punish(NegativePeerSanction::ForkResolutionError((
1✔
459
                received_block.header().height,
1✔
460
                peer_state.fork_reconciliation_blocks.len() as u16,
1✔
461
                received_block.hash(),
1✔
462
            )))
1✔
463
            .await?;
1✔
464
            peer_state.fork_reconciliation_blocks = vec![];
1✔
465
            return Ok(());
1✔
466
        }
31✔
467

468
        // otherwise, append
469
        peer_state.fork_reconciliation_blocks.push(*received_block);
31✔
470

471
        // Try fetch parent
472
        let received_block_header = *peer_state
31✔
473
            .fork_reconciliation_blocks
31✔
474
            .last()
31✔
475
            .unwrap()
31✔
476
            .header();
31✔
477

478
        let parent_digest = received_block_header.prev_block_digest;
31✔
479
        let parent_height = received_block_header.height.previous()
31✔
480
            .expect("transferred block must have previous height because genesis block cannot be transferred");
31✔
481
        debug!("Try ensure path: fetching parent block");
31✔
482
        let parent_block = self
31✔
483
            .global_state_lock
31✔
484
            .lock_guard()
31✔
485
            .await
31✔
486
            .chain
487
            .archival_state()
31✔
488
            .get_block(parent_digest)
31✔
489
            .await?;
31✔
490
        debug!(
31✔
491
            "Completed parent block fetching from DB: {}",
×
492
            if parent_block.is_some() {
×
493
                "found".to_string()
×
494
            } else {
495
                "not found".to_string()
×
496
            }
497
        );
498

499
        // If parent is not known (but not genesis) request it.
500
        let Some(parent_block) = parent_block else {
31✔
501
            if parent_height.is_genesis() {
11✔
502
                peer_state.fork_reconciliation_blocks.clear();
1✔
503
                self.punish(NegativePeerSanction::DifferentGenesis).await?;
1✔
504
                return Ok(());
×
505
            }
10✔
506
            info!(
10✔
507
                "Parent not known: Requesting previous block with height {} from peer",
×
508
                parent_height
509
            );
510

511
            peer.send(PeerMessage::BlockRequestByHash(parent_digest))
10✔
512
                .await?;
10✔
513

514
            return Ok(());
10✔
515
        };
516

517
        // We want to treat the received fork reconciliation blocks (plus the
518
        // received block) in reverse order, from oldest to newest, because
519
        // they were requested from high to low block height.
520
        let mut new_blocks = peer_state.fork_reconciliation_blocks.clone();
20✔
521
        new_blocks.reverse();
20✔
522

523
        // Reset the fork resolution state since we got all the way back to a
524
        // block that we have.
525
        let fork_reconciliation_event = !peer_state.fork_reconciliation_blocks.is_empty();
20✔
526
        peer_state.fork_reconciliation_blocks.clear();
20✔
527

528
        if let Some(new_block_height) = self.handle_blocks(new_blocks, parent_block).await? {
20✔
529
            // If `BlockNotification` was received during a block reconciliation
530
            // event, then the peer might have one (or more (unlikely)) blocks
531
            // that we do not have. We should thus request those blocks.
532
            if fork_reconciliation_event
18✔
533
                && peer_state.highest_shared_block_height > new_block_height
18✔
534
            {
535
                peer.send(PeerMessage::BlockRequestByHeight(
1✔
536
                    peer_state.highest_shared_block_height,
1✔
537
                ))
1✔
538
                .await?;
1✔
539
            }
17✔
540
        }
2✔
541

542
        Ok(())
20✔
543
    }
32✔
544

545
    /// Handle peer messages and returns Ok(true) if connection should be closed.
546
    /// Connection should also be closed if an error is returned.
547
    /// Otherwise, returns OK(false).
548
    ///
549
    /// Locking:
550
    ///   * Acquires `global_state_lock` for read.
551
    ///   * Acquires `global_state_lock` for write via `self.punish(..)` and
552
    ///     `self.reward(..)`.
553
    async fn handle_peer_message<S>(
154✔
554
        &mut self,
154✔
555
        msg: PeerMessage,
154✔
556
        peer: &mut S,
154✔
557
        peer_state_info: &mut MutablePeerState,
154✔
558
    ) -> Result<bool>
154✔
559
    where
154✔
560
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
154✔
561
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
154✔
562
        <S as TryStream>::Error: std::error::Error,
154✔
563
    {
154✔
564
        debug!(
154✔
565
            "Received {} from peer {}",
×
566
            msg.get_type(),
×
567
            self.peer_address
568
        );
569
        match msg {
154✔
570
            PeerMessage::Bye => {
571
                // Note that the current peer is not removed from the global_state.peer_map here
572
                // but that this is done by the caller.
573
                info!("Got bye. Closing connection to peer");
43✔
574
                Ok(DISCONNECT_CONNECTION)
43✔
575
            }
576
            PeerMessage::PeerListRequest => {
577
                let peer_info = {
5✔
578
                    log_slow_scope!(fn_name!() + "::PeerMessage::PeerListRequest");
5✔
579

580
                    // We are interested in the address on which peers accept ingoing connections,
581
                    // not in the address in which they are connected to us. We are only interested in
582
                    // peers that accept incoming connections.
583
                    let mut peer_info: Vec<(SocketAddr, u128)> = self
5✔
584
                        .global_state_lock
5✔
585
                        .lock_guard()
5✔
586
                        .await
5✔
587
                        .net
588
                        .peer_map
589
                        .values()
5✔
590
                        .filter(|peer_info| peer_info.listen_address().is_some())
8✔
591
                        .take(MAX_PEER_LIST_LENGTH) // limit length of response
5✔
592
                        .map(|peer_info| {
8✔
593
                            (
8✔
594
                                // unwrap is safe bc of above `filter`
8✔
595
                                peer_info.listen_address().unwrap(),
8✔
596
                                peer_info.instance_id(),
8✔
597
                            )
8✔
598
                        })
8✔
599
                        .collect();
5✔
600

601
                    // We sort the returned list, so this function is easier to test
602
                    peer_info.sort_by_cached_key(|x| x.0);
5✔
603
                    peer_info
5✔
604
                };
605

606
                debug!("Responding with: {:?}", peer_info);
5✔
607
                peer.send(PeerMessage::PeerListResponse(peer_info)).await?;
5✔
608
                Ok(KEEP_CONNECTION_ALIVE)
5✔
609
            }
610
            PeerMessage::PeerListResponse(peers) => {
3✔
611
                log_slow_scope!(fn_name!() + "::PeerMessage::PeerListResponse");
3✔
612

613
                if peers.len() > MAX_PEER_LIST_LENGTH {
3✔
614
                    self.punish(NegativePeerSanction::FloodPeerListResponse)
×
615
                        .await?;
×
616
                }
3✔
617
                self.to_main_tx
3✔
618
                    .send(PeerTaskToMain::PeerDiscoveryAnswer((
3✔
619
                        peers,
3✔
620
                        self.peer_address,
3✔
621
                        // The distance to the revealed peers is 1 + this peer's distance
3✔
622
                        self.distance + 1,
3✔
623
                    )))
3✔
624
                    .await?;
3✔
625
                Ok(KEEP_CONNECTION_ALIVE)
3✔
626
            }
627
            PeerMessage::BlockNotificationRequest => {
628
                debug!("Got BlockNotificationRequest");
×
629

630
                peer.send(PeerMessage::BlockNotification(
×
631
                    self.global_state_lock
×
632
                        .lock_guard()
×
633
                        .await
×
634
                        .chain
635
                        .light_state()
×
636
                        .into(),
×
637
                ))
638
                .await?;
×
639

640
                Ok(KEEP_CONNECTION_ALIVE)
×
641
            }
642
            PeerMessage::BlockNotification(block_notification) => {
16✔
643
                const SYNC_CHALLENGE_COOLDOWN: Timestamp = Timestamp::minutes(10);
644

645
                let (tip_header, sync_anchor_is_set) = {
16✔
646
                    let state = self.global_state_lock.lock_guard().await;
16✔
647
                    (
16✔
648
                        *state.chain.light_state().header(),
16✔
649
                        state.net.sync_anchor.is_some(),
16✔
650
                    )
16✔
651
                };
652
                debug!(
16✔
653
                    "Got BlockNotification of height {}. Own height is {}",
×
654
                    block_notification.height, tip_header.height
655
                );
656

657
                let sync_mode_threshold = self.global_state_lock.cli().sync_mode_threshold;
16✔
658
                let now = self.now();
16✔
659
                let time_since_latest_successful_challenge = peer_state_info
16✔
660
                    .successful_sync_challenge_response_time
16✔
661
                    .map(|then| now - then);
16✔
662
                let cooldown_expired = time_since_latest_successful_challenge
16✔
663
                    .is_none_or(|time_passed| time_passed > SYNC_CHALLENGE_COOLDOWN);
16✔
664
                let exceeds_sync_mode_threshold = GlobalState::sync_mode_threshold_stateless(
16✔
665
                    &tip_header,
16✔
666
                    block_notification.height,
16✔
667
                    block_notification.cumulative_proof_of_work,
16✔
668
                    sync_mode_threshold,
16✔
669
                );
670
                if cooldown_expired && exceeds_sync_mode_threshold {
16✔
671
                    debug!("sync mode criterion satisfied.");
1✔
672

673
                    if peer_state_info.sync_challenge.is_some() {
1✔
674
                        warn!("Cannot launch new sync challenge because one is already on-going.");
×
675
                        return Ok(KEEP_CONNECTION_ALIVE);
×
676
                    }
1✔
677

678
                    info!(
1✔
679
                        "Peer indicates block which satisfies sync mode criterion, issuing challenge."
×
680
                    );
681
                    let challenge = SyncChallenge::generate(
1✔
682
                        &block_notification,
1✔
683
                        tip_header.height,
1✔
684
                        self.rng.random(),
1✔
685
                    );
686
                    peer_state_info.sync_challenge = Some(IssuedSyncChallenge::new(
1✔
687
                        challenge,
1✔
688
                        block_notification.cumulative_proof_of_work,
1✔
689
                        self.now(),
1✔
690
                    ));
1✔
691

692
                    debug!("sending challenge ...");
1✔
693
                    peer.send(PeerMessage::SyncChallenge(challenge)).await?;
1✔
694

695
                    return Ok(KEEP_CONNECTION_ALIVE);
1✔
696
                }
15✔
697

698
                peer_state_info.highest_shared_block_height = block_notification.height;
15✔
699
                let block_is_new = tip_header.cumulative_proof_of_work
15✔
700
                    < block_notification.cumulative_proof_of_work;
15✔
701

702
                debug!("block_is_new: {}", block_is_new);
15✔
703

704
                if block_is_new
15✔
705
                    && peer_state_info.fork_reconciliation_blocks.is_empty()
15✔
706
                    && !sync_anchor_is_set
14✔
707
                    && !exceeds_sync_mode_threshold
14✔
708
                {
709
                    debug!(
13✔
710
                        "sending BlockRequestByHeight to peer for block with height {}",
×
711
                        block_notification.height
712
                    );
713
                    peer.send(PeerMessage::BlockRequestByHeight(block_notification.height))
13✔
714
                        .await?;
13✔
715
                } else {
716
                    debug!(
2✔
717
                        "ignoring peer block. height {}. new: {}, reconciling_fork: {}",
×
718
                        block_notification.height,
719
                        block_is_new,
720
                        !peer_state_info.fork_reconciliation_blocks.is_empty()
×
721
                    );
722
                }
723

724
                Ok(KEEP_CONNECTION_ALIVE)
15✔
725
            }
726
            PeerMessage::SyncChallenge(sync_challenge) => {
2✔
727
                let response = {
×
728
                    log_slow_scope!(fn_name!() + "::PeerMessage::SyncChallenge");
2✔
729

730
                    info!("Got sync challenge from {}", self.peer_address.ip());
2✔
731

732
                    let response = self
2✔
733
                        .global_state_lock
2✔
734
                        .lock_guard()
2✔
735
                        .await
2✔
736
                        .response_to_sync_challenge(sync_challenge)
2✔
737
                        .await;
2✔
738

739
                    match response {
2✔
740
                        Ok(resp) => resp,
×
741
                        Err(e) => {
2✔
742
                            warn!("could not generate sync challenge response:\n{e}");
2✔
743
                            self.punish(NegativePeerSanction::InvalidSyncChallenge)
2✔
744
                                .await?;
2✔
745
                            return Ok(KEEP_CONNECTION_ALIVE);
2✔
746
                        }
747
                    }
748
                };
749

750
                info!(
×
751
                    "Responding to sync challenge from {}",
×
752
                    self.peer_address.ip()
×
753
                );
754
                peer.send(PeerMessage::SyncChallengeResponse(Box::new(response)))
×
755
                    .await?;
×
756

757
                Ok(KEEP_CONNECTION_ALIVE)
×
758
            }
759
            PeerMessage::SyncChallengeResponse(challenge_response) => {
1✔
760
                const SYNC_RESPONSE_TIMEOUT: Timestamp = Timestamp::seconds(45);
761

762
                log_slow_scope!(fn_name!() + "::PeerMessage::SyncChallengeResponse");
1✔
763
                info!(
1✔
764
                    "Got sync challenge response from {}",
×
765
                    self.peer_address.ip()
×
766
                );
767

768
                // The purpose of the sync challenge and sync challenge response
769
                // is to avoid going into sync mode based on a malicious target
770
                // fork. Instead of verifying that the claimed proof-of-work
771
                // number is correct (which would require sending and verifying,
772
                // at least, all blocks between luca (whatever that is) and the
773
                // claimed tip), we use a heuristic that requires less
774
                // communication and less verification work. The downside of
775
                // using a heuristic here is a nonzero false positive and false
776
                // negative rate. Note that the false negative event
777
                // (maliciously sending someone into sync mode based on a bogus
778
                // fork) still requires a significant amount of work from the
779
                // attacker, *in addition* to being lucky. Also, down the line
780
                // succinctness (and more specifically, recursive block
781
                // validation) obviates this entire subprotocol.
782

783
                // Did we issue a challenge?
784
                let Some(issued_challenge) = peer_state_info.sync_challenge else {
1✔
785
                    warn!("Sync challenge response was not prompted.");
×
786
                    self.punish(NegativePeerSanction::UnexpectedSyncChallengeResponse)
×
787
                        .await?;
×
788
                    return Ok(KEEP_CONNECTION_ALIVE);
×
789
                };
790

791
                // Reset the challenge, regardless of the response's success.
792
                peer_state_info.sync_challenge = None;
1✔
793

794
                // Does response match issued challenge?
795
                if !challenge_response
1✔
796
                    .matches(self.global_state_lock.cli().network, issued_challenge)
1✔
797
                {
798
                    self.punish(NegativePeerSanction::InvalidSyncChallengeResponse)
×
799
                        .await?;
×
800
                    return Ok(KEEP_CONNECTION_ALIVE);
×
801
                }
1✔
802

803
                // Does response verify?
804
                let claimed_tip_height = challenge_response.tip.header.height;
1✔
805
                let now = self.now();
1✔
806
                if !challenge_response
1✔
807
                    .is_valid(now, self.global_state_lock.cli().network)
1✔
808
                    .await
1✔
809
                {
810
                    self.punish(NegativePeerSanction::InvalidSyncChallengeResponse)
×
811
                        .await?;
×
812
                    return Ok(KEEP_CONNECTION_ALIVE);
×
813
                }
1✔
814

815
                // Does cumulative proof-of-work evolve reasonably?
816
                let own_tip_header = *self
1✔
817
                    .global_state_lock
1✔
818
                    .lock_guard()
1✔
819
                    .await
1✔
820
                    .chain
821
                    .light_state()
1✔
822
                    .header();
1✔
823
                if !challenge_response
1✔
824
                    .check_pow(self.global_state_lock.cli().network, own_tip_header.height)
1✔
825
                {
826
                    self.punish(NegativePeerSanction::FishyPowEvolutionChallengeResponse)
×
827
                        .await?;
×
828
                    return Ok(KEEP_CONNECTION_ALIVE);
×
829
                }
1✔
830

831
                // Is there some specific (*i.e.*, not aggregate) proof of work?
832
                if !challenge_response.check_difficulty(own_tip_header.difficulty) {
1✔
833
                    self.punish(NegativePeerSanction::FishyDifficultiesChallengeResponse)
×
834
                        .await?;
×
835
                    return Ok(KEEP_CONNECTION_ALIVE);
×
836
                }
1✔
837

838
                // Did it come in time?
839
                if now - issued_challenge.issued_at > SYNC_RESPONSE_TIMEOUT {
1✔
840
                    self.punish(NegativePeerSanction::TimedOutSyncChallengeResponse)
×
841
                        .await?;
×
842
                    return Ok(KEEP_CONNECTION_ALIVE);
×
843
                }
1✔
844

845
                info!("Successful sync challenge response; relaying peer tip info to main loop.");
1✔
846
                peer_state_info.successful_sync_challenge_response_time = Some(now);
1✔
847

848
                let mut sync_mmra_anchor = challenge_response.tip.body.block_mmr_accumulator;
1✔
849
                sync_mmra_anchor.append(issued_challenge.challenge.tip_digest);
1✔
850

851
                // Inform main loop
852
                self.to_main_tx
1✔
853
                    .send(PeerTaskToMain::AddPeerMaxBlockHeight {
1✔
854
                        peer_address: self.peer_address,
1✔
855
                        claimed_height: claimed_tip_height,
1✔
856
                        claimed_cumulative_pow: issued_challenge.accumulated_pow,
1✔
857
                        claimed_block_mmra: sync_mmra_anchor,
1✔
858
                    })
1✔
859
                    .await?;
1✔
860

861
                Ok(KEEP_CONNECTION_ALIVE)
1✔
862
            }
863
            PeerMessage::BlockRequestByHash(block_digest) => {
×
864
                let block = self
×
865
                    .global_state_lock
×
866
                    .lock_guard()
×
867
                    .await
×
868
                    .chain
869
                    .archival_state()
×
870
                    .get_block(block_digest)
×
871
                    .await?;
×
872

873
                match block {
×
874
                    None => {
875
                        // TODO: Consider punishing here
876
                        warn!("Peer requested unknown block with hash {}", block_digest);
×
877
                        Ok(KEEP_CONNECTION_ALIVE)
×
878
                    }
879
                    Some(b) => {
×
880
                        peer.send(PeerMessage::Block(Box::new(b.try_into().unwrap())))
×
881
                            .await?;
×
882
                        Ok(KEEP_CONNECTION_ALIVE)
×
883
                    }
884
                }
885
            }
886
            PeerMessage::BlockRequestByHeight(block_height) => {
16✔
887
                let block_response = {
15✔
888
                    log_slow_scope!(fn_name!() + "::PeerMessage::BlockRequestByHeight");
16✔
889

890
                    debug!("Got BlockRequestByHeight of height {}", block_height);
16✔
891

892
                    let canonical_block_digest = self
16✔
893
                        .global_state_lock
16✔
894
                        .lock_guard()
16✔
895
                        .await
16✔
896
                        .chain
897
                        .archival_state()
16✔
898
                        .archival_block_mmr
899
                        .ammr()
16✔
900
                        .try_get_leaf(block_height.into())
16✔
901
                        .await;
16✔
902

903
                    let Some(canonical_block_digest) = canonical_block_digest else {
16✔
904
                        let own_tip_height = self
1✔
905
                            .global_state_lock
1✔
906
                            .lock_guard()
1✔
907
                            .await
1✔
908
                            .chain
909
                            .light_state()
1✔
910
                            .header()
1✔
911
                            .height;
912
                        warn!("Got block request by height ({block_height}) for unknown block. Own tip height is {own_tip_height}.");
1✔
913
                        self.punish(NegativePeerSanction::BlockRequestUnknownHeight)
1✔
914
                            .await?;
1✔
915

916
                        return Ok(KEEP_CONNECTION_ALIVE);
1✔
917
                    };
918

919
                    let canonical_chain_block: Block = self
15✔
920
                        .global_state_lock
15✔
921
                        .lock_guard()
15✔
922
                        .await
15✔
923
                        .chain
924
                        .archival_state()
15✔
925
                        .get_block(canonical_block_digest)
15✔
926
                        .await?
15✔
927
                        .unwrap();
15✔
928

929
                    PeerMessage::Block(Box::new(canonical_chain_block.try_into().unwrap()))
15✔
930
                };
931

932
                debug!("Sending block");
15✔
933
                peer.send(block_response).await?;
15✔
934
                debug!("Sent block");
15✔
935
                Ok(KEEP_CONNECTION_ALIVE)
15✔
936
            }
937
            PeerMessage::Block(t_block) => {
32✔
938
                log_slow_scope!(fn_name!() + "::PeerMessage::Block");
32✔
939

940
                info!(
32✔
941
                    "Got new block from peer {}, height {}, mined {}",
×
942
                    self.peer_address,
943
                    t_block.header.height,
944
                    t_block.header.timestamp.standard_format()
×
945
                );
946
                let new_block_height = t_block.header.height;
32✔
947

948
                let block = match Block::try_from(*t_block) {
32✔
949
                    Ok(block) => Box::new(block),
32✔
950
                    Err(e) => {
×
951
                        warn!("Peer sent invalid block: {e:?}");
×
952
                        self.punish(NegativePeerSanction::InvalidTransferBlock)
×
953
                            .await?;
×
954

955
                        return Ok(KEEP_CONNECTION_ALIVE);
×
956
                    }
957
                };
958

959
                // Update the value for the highest known height that peer possesses iff
960
                // we are not in a fork reconciliation state.
961
                if peer_state_info.fork_reconciliation_blocks.is_empty() {
32✔
962
                    peer_state_info.highest_shared_block_height = new_block_height;
22✔
963
                }
22✔
964

965
                self.try_ensure_path(block, peer, peer_state_info).await?;
32✔
966

967
                // Reward happens as part of `try_ensure_path`
968

969
                Ok(KEEP_CONNECTION_ALIVE)
31✔
970
            }
971
            PeerMessage::BlockRequestBatch(BlockRequestBatch {
972
                known_blocks,
8✔
973
                max_response_len,
8✔
974
                anchor,
8✔
975
            }) => {
976
                debug!(
8✔
977
                    "Received BlockRequestBatch from peer {}, max_response_len: {max_response_len}",
×
978
                    self.peer_address
979
                );
980

981
                if known_blocks.len() > MAX_NUM_DIGESTS_IN_BATCH_REQUEST {
8✔
982
                    self.punish(NegativePeerSanction::BatchBlocksRequestTooManyDigests)
×
983
                        .await?;
×
984

985
                    return Ok(KEEP_CONNECTION_ALIVE);
×
986
                }
8✔
987

988
                // The last block in the list of the peers known block is the
989
                // earliest block, block with lowest height, the peer has
990
                // requested. If it does not belong to canonical chain, none of
991
                // the later will. So we can do an early abort in that case.
992
                let least_preferred = match known_blocks.last() {
8✔
993
                    Some(least_preferred) => *least_preferred,
8✔
994
                    None => {
995
                        self.punish(NegativePeerSanction::BatchBlocksRequestEmpty)
×
996
                            .await?;
×
997

998
                        return Ok(KEEP_CONNECTION_ALIVE);
×
999
                    }
1000
                };
1001

1002
                let state = self.global_state_lock.lock_guard().await;
8✔
1003
                let block_mmr_num_leafs = state.chain.light_state().header().height.next().into();
8✔
1004
                let luca_is_known = state
8✔
1005
                    .chain
8✔
1006
                    .archival_state()
8✔
1007
                    .block_belongs_to_canonical_chain(least_preferred)
8✔
1008
                    .await;
8✔
1009
                if !luca_is_known || anchor.num_leafs() > block_mmr_num_leafs {
8✔
1010
                    drop(state);
×
1011
                    self.punish(NegativePeerSanction::BatchBlocksUnknownRequest)
×
1012
                        .await?;
×
1013
                    peer.send(PeerMessage::UnableToSatisfyBatchRequest).await?;
×
1014

1015
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1016
                }
8✔
1017

1018
                // Happy case: At least *one* of the blocks referenced by peer
1019
                // is known to us.
1020
                let first_block_in_response = {
8✔
1021
                    let mut first_block_in_response: Option<BlockHeight> = None;
8✔
1022
                    for block_digest in known_blocks {
10✔
1023
                        if state
10✔
1024
                            .chain
10✔
1025
                            .archival_state()
10✔
1026
                            .block_belongs_to_canonical_chain(block_digest)
10✔
1027
                            .await
10✔
1028
                        {
1029
                            let height = state
8✔
1030
                                .chain
8✔
1031
                                .archival_state()
8✔
1032
                                .get_block_header(block_digest)
8✔
1033
                                .await
8✔
1034
                                .unwrap()
8✔
1035
                                .height;
1036
                            first_block_in_response = Some(height);
8✔
1037
                            debug!(
8✔
1038
                                "Found block in canonical chain for batch response: {}",
×
1039
                                block_digest
1040
                            );
1041
                            break;
8✔
1042
                        }
2✔
1043
                    }
1044

1045
                    first_block_in_response
8✔
1046
                        .expect("existence of LUCA should have been established already.")
8✔
1047
                };
1048

1049
                debug!(
8✔
1050
                    "Peer's most preferred block has height {first_block_in_response}.\
×
1051
                 Now building response from that height."
×
1052
                );
1053

1054
                // Get the relevant blocks, at most batch-size many, descending from the
1055
                // peer's (alleged) most canonical block. Don't exceed `max_response_len`
1056
                // or `STANDARD_BLOCK_BATCH_SIZE` number of blocks in response.
1057
                let max_response_len = cmp::min(
8✔
1058
                    max_response_len,
8✔
1059
                    self.global_state_lock.cli().sync_mode_threshold,
8✔
1060
                );
1061
                let max_response_len = cmp::max(max_response_len, MINIMUM_BLOCK_BATCH_SIZE);
8✔
1062
                let max_response_len = cmp::min(max_response_len, STANDARD_BLOCK_BATCH_SIZE);
8✔
1063

1064
                let mut digests_of_returned_blocks = Vec::with_capacity(max_response_len);
8✔
1065
                let response_start_height: u64 = first_block_in_response.into();
8✔
1066
                let mut i: u64 = 1;
8✔
1067
                while digests_of_returned_blocks.len() < max_response_len {
31✔
1068
                    let block_height = response_start_height + i;
31✔
1069
                    match state
31✔
1070
                        .chain
31✔
1071
                        .archival_state()
31✔
1072
                        .archival_block_mmr
31✔
1073
                        .ammr()
31✔
1074
                        .try_get_leaf(block_height)
31✔
1075
                        .await
31✔
1076
                    {
1077
                        Some(digest) => {
23✔
1078
                            digests_of_returned_blocks.push(digest);
23✔
1079
                        }
23✔
1080
                        None => break,
8✔
1081
                    }
1082
                    i += 1;
23✔
1083
                }
1084

1085
                let mut returned_blocks: Vec<Block> =
8✔
1086
                    Vec::with_capacity(digests_of_returned_blocks.len());
8✔
1087
                for block_digest in digests_of_returned_blocks {
31✔
1088
                    let block = state
23✔
1089
                        .chain
23✔
1090
                        .archival_state()
23✔
1091
                        .get_block(block_digest)
23✔
1092
                        .await?
23✔
1093
                        .unwrap();
23✔
1094
                    returned_blocks.push(block);
23✔
1095
                }
1096

1097
                let response = Self::batch_response(&state, returned_blocks, &anchor).await;
8✔
1098

1099
                // issue 457. do not hold lock across a peer.send(), nor self.punish()
1100
                drop(state);
8✔
1101

1102
                let Some(response) = response else {
8✔
1103
                    warn!("Unable to satisfy batch-block request");
×
1104
                    self.punish(NegativePeerSanction::BatchBlocksUnknownRequest)
×
1105
                        .await?;
×
1106
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1107
                };
1108

1109
                debug!("Returning {} blocks in batch response", response.len());
8✔
1110

1111
                let response = PeerMessage::BlockResponseBatch(response);
8✔
1112
                peer.send(response).await?;
8✔
1113

1114
                Ok(KEEP_CONNECTION_ALIVE)
8✔
1115
            }
1116
            PeerMessage::BlockResponseBatch(authenticated_blocks) => {
×
1117
                log_slow_scope!(fn_name!() + "::PeerMessage::BlockResponseBatch");
×
1118

1119
                debug!(
×
1120
                    "handling block response batch with {} blocks",
×
1121
                    authenticated_blocks.len()
×
1122
                );
1123

1124
                // (Alan:) why is there even a minimum?
1125
                if authenticated_blocks.len() < MINIMUM_BLOCK_BATCH_SIZE {
×
1126
                    warn!("Got smaller batch response than allowed");
×
1127
                    self.punish(NegativePeerSanction::TooShortBlockBatch)
×
1128
                        .await?;
×
1129
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1130
                }
×
1131

1132
                // Verify that we are in fact in syncing mode
1133
                // TODO: Separate peer messages into those allowed under syncing
1134
                // and those that are not
1135
                let Some(sync_anchor) = self
×
1136
                    .global_state_lock
×
1137
                    .lock_guard()
×
1138
                    .await
×
1139
                    .net
1140
                    .sync_anchor
1141
                    .clone()
×
1142
                else {
1143
                    warn!("Received a batch of blocks without being in syncing mode");
×
1144
                    self.punish(NegativePeerSanction::ReceivedBatchBlocksOutsideOfSync)
×
1145
                        .await?;
×
1146
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1147
                };
1148

1149
                // Verify that the response matches the current state
1150
                // We get the latest block from the DB here since this message is
1151
                // only valid for archival nodes.
1152
                let (first_block, _) = &authenticated_blocks[0];
×
1153
                let first_blocks_parent_digest: Digest = first_block.header.prev_block_digest;
×
1154
                let most_canonical_own_block_match: Option<Block> = self
×
1155
                    .global_state_lock
×
1156
                    .lock_guard()
×
1157
                    .await
×
1158
                    .chain
1159
                    .archival_state()
×
1160
                    .get_block(first_blocks_parent_digest)
×
1161
                    .await
×
1162
                    .expect("Block lookup must succeed");
×
1163
                let most_canonical_own_block_match: Block = match most_canonical_own_block_match {
×
1164
                    Some(block) => block,
×
1165
                    None => {
1166
                        warn!("Got batch response with invalid start block");
×
1167
                        self.punish(NegativePeerSanction::BatchBlocksInvalidStartHeight)
×
1168
                            .await?;
×
1169
                        return Ok(KEEP_CONNECTION_ALIVE);
×
1170
                    }
1171
                };
1172

1173
                // Convert all blocks to Block objects
1174
                debug!(
×
1175
                    "Found own block of height {} to match received batch",
×
1176
                    most_canonical_own_block_match.kernel.header.height
×
1177
                );
1178
                let mut received_blocks = vec![];
×
1179
                for (t_block, membership_proof) in authenticated_blocks {
×
1180
                    let Ok(block) = Block::try_from(t_block) else {
×
1181
                        warn!("Received invalid transfer block from peer");
×
1182
                        self.punish(NegativePeerSanction::InvalidTransferBlock)
×
1183
                            .await?;
×
1184
                        return Ok(KEEP_CONNECTION_ALIVE);
×
1185
                    };
1186

1187
                    if !membership_proof.verify(
×
1188
                        block.header().height.into(),
×
1189
                        block.hash(),
×
1190
                        &sync_anchor.block_mmr.peaks(),
×
1191
                        sync_anchor.block_mmr.num_leafs(),
×
1192
                    ) {
×
1193
                        warn!("Authentication of received block fails relative to anchor");
×
1194
                        self.punish(NegativePeerSanction::InvalidBlockMmrAuthentication)
×
1195
                            .await?;
×
1196
                        return Ok(KEEP_CONNECTION_ALIVE);
×
1197
                    }
×
1198

1199
                    received_blocks.push(block);
×
1200
                }
1201

1202
                // Get the latest block that we know of and handle all received blocks
1203
                self.handle_blocks(received_blocks, most_canonical_own_block_match)
×
1204
                    .await?;
×
1205

1206
                // Reward happens as part of `handle_blocks`.
1207

1208
                Ok(KEEP_CONNECTION_ALIVE)
×
1209
            }
1210
            PeerMessage::UnableToSatisfyBatchRequest => {
1211
                log_slow_scope!(fn_name!() + "::PeerMessage::UnableToSatisfyBatchRequest");
×
1212
                warn!(
×
1213
                    "Peer {} reports inability to satisfy batch request.",
×
1214
                    self.peer_address
1215
                );
1216

1217
                Ok(KEEP_CONNECTION_ALIVE)
×
1218
            }
1219
            PeerMessage::Handshake { .. } => {
1220
                log_slow_scope!(fn_name!() + "::PeerMessage::Handshake");
×
1221

1222
                // The handshake should have been sent during connection
1223
                // initialization. Here it is out of order at best, malicious at
1224
                // worst.
1225
                self.punish(NegativePeerSanction::InvalidMessage).await?;
×
1226
                Ok(KEEP_CONNECTION_ALIVE)
×
1227
            }
1228
            PeerMessage::ConnectionStatus(_) => {
1229
                log_slow_scope!(fn_name!() + "::PeerMessage::ConnectionStatus");
×
1230

1231
                // The connection status should have been sent during connection
1232
                // initialization. Here it is out of order at best, malicious at
1233
                // worst.
1234

1235
                self.punish(NegativePeerSanction::InvalidMessage).await?;
×
1236
                Ok(KEEP_CONNECTION_ALIVE)
×
1237
            }
1238
            PeerMessage::Transaction(transaction) => {
7✔
1239
                log_slow_scope!(fn_name!() + "::PeerMessage::Transaction");
7✔
1240

1241
                debug!(
7✔
1242
                    "`peer_loop` received following transaction from peer. {} inputs, {} outputs. Synced to mutator set hash: {}",
×
1243
                    transaction.kernel.inputs.len(),
×
1244
                    transaction.kernel.outputs.len(),
×
1245
                    transaction.kernel.mutator_set_hash
×
1246
                );
1247

1248
                let transaction: Transaction = (*transaction).into();
7✔
1249

1250
                let (tip, mutator_set_accumulator_after, current_block_height) = {
7✔
1251
                    let state = self.global_state_lock.lock_guard().await;
7✔
1252

1253
                    (
7✔
1254
                        state.chain.light_state().hash(),
7✔
1255
                        state
7✔
1256
                            .chain
7✔
1257
                            .light_state()
7✔
1258
                            .mutator_set_accumulator_after()
7✔
1259
                            .expect("Block from state must have mutator set after"),
7✔
1260
                        state.chain.light_state().header().height,
7✔
1261
                    )
7✔
1262
                };
1263

1264
                // 1. If transaction is invalid, punish.
1265
                let network = self.global_state_lock.cli().network;
7✔
1266
                let consensus_rule_set =
7✔
1267
                    ConsensusRuleSet::infer_from(network, current_block_height);
7✔
1268
                if !transaction.is_valid(network, consensus_rule_set).await {
7✔
1269
                    warn!("Received invalid tx");
×
1270
                    self.punish(NegativePeerSanction::InvalidTransaction)
×
1271
                        .await?;
×
1272
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1273
                }
7✔
1274

1275
                // 2. If transaction has coinbase, punish.
1276
                // Transactions received from peers have not been mined yet.
1277
                // Only the miner is allowed to produce transactions with non-empty coinbase fields.
1278
                if transaction.kernel.coinbase.is_some() {
7✔
1279
                    warn!("Received non-mined transaction with coinbase.");
×
1280
                    self.punish(NegativePeerSanction::NonMinedTransactionHasCoinbase)
×
1281
                        .await?;
×
1282
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1283
                }
7✔
1284

1285
                // 3. If negative fee, punish.
1286
                if transaction.kernel.fee.is_negative() {
7✔
1287
                    warn!("Received negative-fee transaction.");
×
1288
                    self.punish(NegativePeerSanction::TransactionWithNegativeFee)
×
1289
                        .await?;
×
1290
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1291
                }
7✔
1292

1293
                // 4. If transaction is already known, ignore.
1294
                if self
7✔
1295
                    .global_state_lock
7✔
1296
                    .lock_guard()
7✔
1297
                    .await
7✔
1298
                    .mempool
1299
                    .contains_with_higher_proof_quality(
7✔
1300
                        transaction.kernel.txid(),
7✔
1301
                        transaction.proof.proof_quality()?,
7✔
1302
                        transaction.kernel.mutator_set_hash,
7✔
1303
                    )
1304
                {
1305
                    warn!("Received transaction that was already known");
×
1306

1307
                    // We received a transaction that we *probably* haven't requested.
1308
                    // Consider punishing here, if this is abused.
1309
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1310
                }
7✔
1311

1312
                // 5. if transaction is not confirmable, punish.
1313
                if !transaction.is_confirmable_relative_to(&mutator_set_accumulator_after) {
7✔
1314
                    warn!(
×
1315
                        "Received unconfirmable transaction with TXID {}. Unconfirmable because:",
×
1316
                        transaction.kernel.txid()
×
1317
                    );
1318
                    // get fine-grained error code for informative logging
1319
                    let confirmability_error_code = transaction
×
1320
                        .kernel
×
1321
                        .is_confirmable_relative_to(&mutator_set_accumulator_after);
×
1322
                    match confirmability_error_code {
×
1323
                        Ok(_) => unreachable!(),
×
1324
                        Err(TransactionConfirmabilityError::InvalidRemovalRecord(index)) => {
×
1325
                            warn!("invalid removal record (at index {index})");
×
1326
                            let invalid_removal_record = transaction.kernel.inputs[index].clone();
×
1327
                            let removal_record_error_code = invalid_removal_record
×
1328
                                .validate_inner(&mutator_set_accumulator_after);
×
1329
                            debug!(
×
1330
                                "Absolute index set of removal record {index}: {:?}",
×
1331
                                invalid_removal_record.absolute_indices
1332
                            );
1333
                            match removal_record_error_code {
×
1334
                                Ok(_) => unreachable!(),
×
1335
                                Err(RemovalRecordValidityError::AbsentAuthenticatedChunk) => {
1336
                                    debug!("invalid because membership proof is missing");
×
1337
                                }
1338
                                Err(RemovalRecordValidityError::InvalidSwbfiMmrMp {
1339
                                    chunk_index,
×
1340
                                }) => {
1341
                                    debug!("invalid because membership proof for chunk index {chunk_index} is invalid");
×
1342
                                }
1343
                            };
1344
                            self.punish(NegativePeerSanction::UnconfirmableTransaction)
×
1345
                                .await?;
×
1346
                            return Ok(KEEP_CONNECTION_ALIVE);
×
1347
                        }
1348
                        Err(TransactionConfirmabilityError::DuplicateInputs) => {
1349
                            warn!("duplicate inputs");
×
1350
                            self.punish(NegativePeerSanction::DoubleSpendingTransaction)
×
1351
                                .await?;
×
1352
                            return Ok(KEEP_CONNECTION_ALIVE);
×
1353
                        }
1354
                        Err(TransactionConfirmabilityError::AlreadySpentInput(index)) => {
×
1355
                            warn!("already spent input (at index {index})");
×
1356
                            self.punish(NegativePeerSanction::DoubleSpendingTransaction)
×
1357
                                .await?;
×
1358
                            return Ok(KEEP_CONNECTION_ALIVE);
×
1359
                        }
1360
                        Err(TransactionConfirmabilityError::RemovalRecordUnpackFailure) => {
1361
                            warn!("Failed to unpack removal records");
×
1362
                            self.punish(NegativePeerSanction::InvalidTransaction)
×
1363
                                .await?;
×
1364
                            return Ok(KEEP_CONNECTION_ALIVE);
×
1365
                        }
1366
                    };
1367
                }
7✔
1368

1369
                // If transaction cannot be applied to mutator set, punish.
1370
                // I don't think this can happen when above checks pass but we include
1371
                // the check to ensure that transaction can be applied.
1372

1373
                // TODO: Try unpacking tx-inputs
1374
                let ms_update = MutatorSetUpdate::new(
7✔
1375
                    transaction.kernel.inputs.clone(),
7✔
1376
                    transaction.kernel.outputs.clone(),
7✔
1377
                );
1378
                let can_apply = ms_update
7✔
1379
                    .apply_to_accumulator(&mut mutator_set_accumulator_after.clone())
7✔
1380
                    .is_ok();
7✔
1381
                if !can_apply {
7✔
1382
                    warn!("Cannot apply transaction to current mutator set.");
×
1383
                    warn!("Transaction ID: {}", transaction.kernel.txid());
×
1384
                    self.punish(NegativePeerSanction::CannotApplyTransactionToMutatorSet)
×
1385
                        .await?;
×
1386
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1387
                }
7✔
1388

1389
                let tx_timestamp = transaction.kernel.timestamp;
7✔
1390

1391
                // 6. Ignore if transaction is too old
1392
                let now = self.now();
7✔
1393
                if tx_timestamp < now - Timestamp::seconds(MEMPOOL_TX_THRESHOLD_AGE_IN_SECS) {
7✔
1394
                    // TODO: Consider punishing here
1395
                    warn!("Received too old tx");
×
1396
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1397
                }
7✔
1398

1399
                // 7. Ignore if transaction is too far into the future
1400
                if tx_timestamp
7✔
1401
                    > now + Timestamp::seconds(MEMPOOL_IGNORE_TRANSACTIONS_THIS_MANY_SECS_AHEAD)
7✔
1402
                {
1403
                    // TODO: Consider punishing here
1404
                    warn!("Received tx too far into the future. Got timestamp: {tx_timestamp:?}");
×
1405
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1406
                }
7✔
1407

1408
                // Otherwise, relay to main
1409
                let pt2m_transaction = PeerTaskToMainTransaction {
7✔
1410
                    transaction,
7✔
1411
                    confirmable_for_block: tip,
7✔
1412
                };
7✔
1413
                self.to_main_tx
7✔
1414
                    .send(PeerTaskToMain::Transaction(Box::new(pt2m_transaction)))
7✔
1415
                    .await?;
7✔
1416

1417
                Ok(KEEP_CONNECTION_ALIVE)
7✔
1418
            }
1419
            PeerMessage::TransactionNotification(tx_notification) => {
14✔
1420
                // addresses #457
1421
                // new scope for state read-lock to avoid holding across peer.send()
1422
                {
1423
                    log_slow_scope!(fn_name!() + "::PeerMessage::TransactionNotification");
14✔
1424

1425
                    // 1. Ignore if we already know this transaction, and
1426
                    // the proof quality is not higher than what we already know.
1427
                    let state = self.global_state_lock.lock_guard().await;
14✔
1428
                    let transaction_of_same_or_higher_proof_quality_is_known =
14✔
1429
                        state.mempool.contains_with_higher_proof_quality(
14✔
1430
                            tx_notification.txid,
14✔
1431
                            tx_notification.proof_quality,
14✔
1432
                            tx_notification.mutator_set_hash,
14✔
1433
                        );
1434
                    if transaction_of_same_or_higher_proof_quality_is_known {
14✔
1435
                        debug!("transaction with same or higher proof quality was already known");
7✔
1436
                        return Ok(KEEP_CONNECTION_ALIVE);
7✔
1437
                    }
7✔
1438

1439
                    // Only accept transactions that do not require executing
1440
                    // `update`.
1441
                    if state
7✔
1442
                        .chain
7✔
1443
                        .light_state()
7✔
1444
                        .mutator_set_accumulator_after()
7✔
1445
                        .expect("Block from state must have mutator set after")
7✔
1446
                        .hash()
7✔
1447
                        != tx_notification.mutator_set_hash
7✔
1448
                    {
1449
                        debug!("transaction refers to non-canonical mutator set state");
×
1450
                        return Ok(KEEP_CONNECTION_ALIVE);
×
1451
                    }
7✔
1452
                }
1453

1454
                // 2. Request the actual `Transaction` from peer
1455
                debug!("requesting transaction from peer");
7✔
1456
                peer.send(PeerMessage::TransactionRequest(tx_notification.txid))
7✔
1457
                    .await?;
7✔
1458

1459
                Ok(KEEP_CONNECTION_ALIVE)
7✔
1460
            }
1461
            PeerMessage::TransactionRequest(transaction_identifier) => {
5✔
1462
                let state = self.global_state_lock.lock_guard().await;
5✔
1463
                let Some(transaction) = state.mempool.get(transaction_identifier) else {
5✔
1464
                    return Ok(KEEP_CONNECTION_ALIVE);
1✔
1465
                };
1466

1467
                let Ok(transfer_transaction) = transaction.try_into() else {
4✔
1468
                    warn!("Peer requested transaction that cannot be converted to transfer object");
×
1469
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1470
                };
1471

1472
                // Drop state immediately to prevent holding over a response.
1473
                drop(state);
4✔
1474

1475
                peer.send(PeerMessage::Transaction(Box::new(transfer_transaction)))
4✔
1476
                    .await?;
4✔
1477

1478
                Ok(KEEP_CONNECTION_ALIVE)
4✔
1479
            }
1480
            PeerMessage::BlockProposalNotification(block_proposal_notification) => {
1✔
1481
                let verdict = self
1✔
1482
                    .global_state_lock
1✔
1483
                    .lock_guard()
1✔
1484
                    .await
1✔
1485
                    .favor_incoming_block_proposal_legacy(
1✔
1486
                        block_proposal_notification.height,
1✔
1487
                        block_proposal_notification.guesser_fee,
1✔
1488
                    );
1489
                match verdict {
1✔
1490
                    Ok(_) => {
1491
                        peer.send(PeerMessage::BlockProposalRequest(
1✔
1492
                            BlockProposalRequest::new(block_proposal_notification.body_mast_hash),
1✔
1493
                        ))
1✔
1494
                        .await?
1✔
1495
                    }
1496
                    Err(reject_reason) => {
×
1497
                        info!(
×
1498
                        "Rejecting notification of block proposal with guesser fee {} from peer \
×
1499
                        {}. Reason:\n{reject_reason}",
×
1500
                        block_proposal_notification.guesser_fee.display_n_decimals(5),
×
1501
                        self.peer_address
1502
                    )
1503
                    }
1504
                }
1505

1506
                Ok(KEEP_CONNECTION_ALIVE)
1✔
1507
            }
1508
            PeerMessage::BlockProposalRequest(block_proposal_request) => {
×
1509
                let matching_proposal = self
×
1510
                    .global_state_lock
×
1511
                    .lock_guard()
×
1512
                    .await
×
1513
                    .mining_state
1514
                    .block_proposal
1515
                    .filter(|x| x.body().mast_hash() == block_proposal_request.body_mast_hash)
×
1516
                    .map(|x| x.to_owned());
×
1517
                if let Some(proposal) = matching_proposal {
×
1518
                    peer.send(PeerMessage::BlockProposal(Box::new(proposal)))
×
1519
                        .await?;
×
1520
                } else {
1521
                    self.punish(NegativePeerSanction::BlockProposalNotFound)
×
1522
                        .await?;
×
1523
                }
1524

1525
                Ok(KEEP_CONNECTION_ALIVE)
×
1526
            }
1527
            PeerMessage::BlockProposal(new_proposal) => {
1✔
1528
                info!("Got block proposal from peer.");
1✔
1529

1530
                // Is the proposal valid?
1531
                // Lock needs to be held here because race conditions: otherwise
1532
                // the block proposal that was validated might not match with
1533
                // the one whose favorability is being computed.
1534
                let state = self.global_state_lock.lock_guard().await;
1✔
1535
                let tip = state.chain.light_state();
1✔
1536
                let proposal_is_valid = new_proposal
1✔
1537
                    .is_valid(tip, self.now(), self.global_state_lock.cli().network)
1✔
1538
                    .await;
1✔
1539
                if !proposal_is_valid {
1✔
1540
                    drop(state);
×
1541
                    self.punish(NegativePeerSanction::InvalidBlockProposal)
×
1542
                        .await?;
×
1543
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1544
                }
1✔
1545

1546
                // Is block proposal favorable?
1547
                let is_favorable = state.favor_incoming_block_proposal(
1✔
1548
                    new_proposal.header().prev_block_digest,
1✔
1549
                    new_proposal
1✔
1550
                        .total_guesser_reward()
1✔
1551
                        .expect("Block was validated"),
1✔
1552
                );
1✔
1553
                drop(state);
1✔
1554

1555
                if let Err(rejection_reason) = is_favorable {
1✔
1556
                    match rejection_reason {
×
1557
                        // no need to punish and log if the fees are equal.  we just ignore the incoming proposal.
1558
                        BlockProposalRejectError::InsufficientFee { current, received }
×
1559
                            if Some(received) == current =>
×
1560
                        {
1561
                            debug!("ignoring new block proposal because the fee is equal to the present one");
×
1562
                            return Ok(KEEP_CONNECTION_ALIVE);
×
1563
                        }
1564
                        _ => {
1565
                            warn!("Rejecting new block proposal:\n{rejection_reason}");
×
1566
                            self.punish(NegativePeerSanction::NonFavorableBlockProposal)
×
1567
                                .await?;
×
1568
                            return Ok(KEEP_CONNECTION_ALIVE);
×
1569
                        }
1570
                    }
1571
                };
1✔
1572

1573
                self.send_to_main(PeerTaskToMain::BlockProposal(new_proposal), line!())
1✔
1574
                    .await?;
1✔
1575

1576
                // Valuable, new, hard-to-produce information. Reward peer.
1577
                self.reward(PositivePeerSanction::NewBlockProposal).await?;
1✔
1578

1579
                Ok(KEEP_CONNECTION_ALIVE)
1✔
1580
            }
1581
        }
1582
    }
154✔
1583

1584
    /// send msg to main via mpsc channel `to_main_tx` and logs if slow.
1585
    ///
1586
    /// the channel could potentially fill up in which case the send() will
1587
    /// block until there is capacity.  we wrap the send() so we can log if
1588
    /// that ever happens to the extent it passes slow-scope threshold.
1589
    async fn send_to_main(
1✔
1590
        &self,
1✔
1591
        msg: PeerTaskToMain,
1✔
1592
        line: u32,
1✔
1593
    ) -> Result<(), tokio::sync::mpsc::error::SendError<PeerTaskToMain>> {
1✔
1594
        // we measure across the send() in case the channel ever fills up.
1595
        log_slow_scope!(fn_name!() + &format!("peer_loop.rs:{}", line));
1✔
1596

1597
        self.to_main_tx.send(msg).await
1✔
1598
    }
1✔
1599

1600
    /// Handle message from main task. The boolean return value indicates if
1601
    /// the connection should be closed.
1602
    ///
1603
    /// Locking:
1604
    ///   * acquires `global_state_lock` for write via Self::punish()
1605
    async fn handle_main_task_message<S>(
30✔
1606
        &mut self,
30✔
1607
        msg: MainToPeerTask,
30✔
1608
        peer: &mut S,
30✔
1609
        peer_state_info: &mut MutablePeerState,
30✔
1610
    ) -> Result<bool>
30✔
1611
    where
30✔
1612
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
30✔
1613
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
30✔
1614
        <S as TryStream>::Error: std::error::Error,
30✔
1615
    {
30✔
1616
        debug!("Handling {} message from main in peer loop", msg.get_type());
30✔
1617
        match msg {
30✔
1618
            MainToPeerTask::Block(block) => {
24✔
1619
                // We don't currently differentiate whether a new block came from a peer, or from our
1620
                // own miner. It's always shared through this logic.
1621
                let new_block_height = block.kernel.header.height;
24✔
1622
                if new_block_height > peer_state_info.highest_shared_block_height {
24✔
1623
                    debug!("Sending PeerMessage::BlockNotification");
12✔
1624
                    peer_state_info.highest_shared_block_height = new_block_height;
12✔
1625
                    peer.send(PeerMessage::BlockNotification(block.as_ref().into()))
12✔
1626
                        .await?;
12✔
1627
                    debug!("Sent PeerMessage::BlockNotification");
12✔
1628
                }
12✔
1629
                Ok(KEEP_CONNECTION_ALIVE)
24✔
1630
            }
1631
            MainToPeerTask::RequestBlockBatch(batch_block_request) => {
×
1632
                // Only ask one of the peers about the batch of blocks
1633
                if batch_block_request.peer_addr_target != self.peer_address {
×
1634
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1635
                }
×
1636

1637
                let max_response_len = std::cmp::min(
×
1638
                    STANDARD_BLOCK_BATCH_SIZE,
1639
                    self.global_state_lock.cli().sync_mode_threshold,
×
1640
                );
1641

1642
                peer.send(PeerMessage::BlockRequestBatch(BlockRequestBatch {
×
1643
                    known_blocks: batch_block_request.known_blocks,
×
1644
                    max_response_len,
×
1645
                    anchor: batch_block_request.anchor_mmr,
×
1646
                }))
×
1647
                .await?;
×
1648

1649
                Ok(KEEP_CONNECTION_ALIVE)
×
1650
            }
1651
            MainToPeerTask::PeerSynchronizationTimeout(socket_addr) => {
×
1652
                log_slow_scope!(fn_name!() + "::MainToPeerTask::PeerSynchronizationTimeout");
×
1653

1654
                if self.peer_address != socket_addr {
×
1655
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1656
                }
×
1657

1658
                self.punish(NegativePeerSanction::SynchronizationTimeout)
×
1659
                    .await?;
×
1660

1661
                // If this peer failed the last synchronization attempt, we only
1662
                // sanction, we don't disconnect.
1663
                Ok(KEEP_CONNECTION_ALIVE)
×
1664
            }
1665
            MainToPeerTask::MakePeerDiscoveryRequest => {
1666
                peer.send(PeerMessage::PeerListRequest).await?;
×
1667
                Ok(KEEP_CONNECTION_ALIVE)
×
1668
            }
1669
            MainToPeerTask::Disconnect(peer_address) => {
×
1670
                log_slow_scope!(fn_name!() + "::MainToPeerTask::Disconnect");
×
1671

1672
                // Only disconnect from the peer the main task requested a disconnect for.
1673
                if peer_address != self.peer_address {
×
1674
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1675
                }
×
1676
                self.register_peer_disconnection().await;
×
1677

1678
                Ok(DISCONNECT_CONNECTION)
×
1679
            }
1680
            MainToPeerTask::DisconnectAll() => {
1681
                self.register_peer_disconnection().await;
×
1682

1683
                Ok(DISCONNECT_CONNECTION)
×
1684
            }
1685
            MainToPeerTask::MakeSpecificPeerDiscoveryRequest(target_socket_addr) => {
×
1686
                if target_socket_addr == self.peer_address {
×
1687
                    peer.send(PeerMessage::PeerListRequest).await?;
×
1688
                }
×
1689
                Ok(KEEP_CONNECTION_ALIVE)
×
1690
            }
1691
            MainToPeerTask::TransactionNotification(transaction_notification) => {
6✔
1692
                debug!("Sending PeerMessage::TransactionNotification");
6✔
1693
                peer.send(PeerMessage::TransactionNotification(
6✔
1694
                    transaction_notification,
6✔
1695
                ))
6✔
1696
                .await?;
6✔
1697
                debug!("Sent PeerMessage::TransactionNotification");
6✔
1698
                Ok(KEEP_CONNECTION_ALIVE)
6✔
1699
            }
1700
            MainToPeerTask::BlockProposalNotification(block_proposal_notification) => {
×
1701
                debug!("Sending PeerMessage::BlockProposalNotification");
×
1702
                peer.send(PeerMessage::BlockProposalNotification(
×
1703
                    block_proposal_notification,
×
1704
                ))
×
1705
                .await?;
×
1706
                debug!("Sent PeerMessage::BlockProposalNotification");
×
1707
                Ok(KEEP_CONNECTION_ALIVE)
×
1708
            }
1709
        }
1710
    }
30✔
1711

1712
    /// Loop for the peer tasks. Awaits either a message from the peer over TCP,
1713
    /// or a message from main over the main-to-peer-tasks broadcast channel.
1714
    async fn run<S>(
50✔
1715
        &mut self,
50✔
1716
        mut peer: S,
50✔
1717
        mut from_main_rx: broadcast::Receiver<MainToPeerTask>,
50✔
1718
        peer_state_info: &mut MutablePeerState,
50✔
1719
    ) -> Result<()>
50✔
1720
    where
50✔
1721
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
50✔
1722
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
50✔
1723
        <S as TryStream>::Error: std::error::Error,
50✔
1724
    {
50✔
1725
        loop {
1726
            select! {
191✔
1727
                // Handle peer messages
1728
                peer_message = peer.try_next() => {
191✔
1729
                    let peer_address = self.peer_address;
155✔
1730
                    let peer_message = match peer_message {
155✔
1731
                        Ok(message) => message,
154✔
1732
                        Err(err) => {
1✔
1733
                            // Don't disconnect if message type is unknown, as
1734
                            // this allows the adding of new message types in
1735
                            // the future. Consider only keeping connection open
1736
                            // if this is a deserialization error, and close
1737
                            // otherwise.
1738
                            let msg = format!("Error when receiving from peer: {peer_address}");
1✔
1739
                            warn!("{msg}. Error: {err}");
1✔
1740
                            self.punish(NegativePeerSanction::InvalidMessage).await?;
1✔
1741
                            continue;
1✔
1742
                        }
1743
                    };
1744
                    let Some(peer_message) = peer_message else {
154✔
UNCOV
1745
                        info!("Peer {peer_address} closed connection.");
×
UNCOV
1746
                        break;
×
1747
                    };
1748

1749
                    let syncing =
154✔
1750
                        self.global_state_lock.lock(|s| s.net.sync_anchor.is_some()).await;
154✔
1751
                    let message_type = peer_message.get_type();
154✔
1752
                    if peer_message.ignore_during_sync() && syncing {
154✔
1753
                        debug!(
×
1754
                            "Ignoring {message_type} message when syncing, from {peer_address}",
×
1755
                        );
1756
                        continue;
×
1757
                    }
154✔
1758
                    if peer_message.ignore_when_not_sync() && !syncing {
154✔
1759
                        debug!(
×
1760
                            "Ignoring {message_type} message when not syncing, from {peer_address}",
×
1761
                        );
1762
                        continue;
×
1763
                    }
154✔
1764

1765
                    match self
154✔
1766
                        .handle_peer_message(peer_message, &mut peer, peer_state_info)
154✔
1767
                        .await
154✔
1768
                    {
1769
                        Ok(false) => {}
110✔
1770
                        Ok(true) => {
1771
                            info!("Closing connection to {peer_address}");
43✔
1772
                            break;
43✔
1773
                        }
1774
                        Err(err) => {
1✔
1775
                            warn!("Closing connection to {peer_address} because of error {err}.");
1✔
1776
                            bail!("{err}");
1✔
1777
                        }
1778
                    };
1779
                }
1780

1781
                // Handle messages from main task
1782
                main_msg_res = from_main_rx.recv() => {
191✔
1783
                    let main_msg = main_msg_res.unwrap_or_else(|err| {
30✔
1784
                        let err_msg = format!("Failed to read from main loop: {err}");
×
1785
                        error!(err_msg);
×
1786
                        panic!("{err_msg}");
×
1787
                    });
1788
                    let close_connection = self
30✔
1789
                        .handle_main_task_message(main_msg, &mut peer, peer_state_info)
30✔
1790
                        .await
30✔
1791
                        .unwrap_or_else(|err| {
30✔
1792
                            warn!("handle_main_task_message returned an error: {err}");
×
1793
                            true
×
1794
                        });
×
1795

1796
                    if close_connection {
30✔
1797
                        info!(
×
1798
                            "handle_main_task_message is closing the connection to {}",
×
1799
                            self.peer_address
1800
                        );
1801
                        break;
×
1802
                    }
30✔
1803
                }
1804
            }
1805
        }
1806

1807
        Ok(())
43✔
1808
    }
44✔
1809

1810
    /// Function called before entering the peer loop. Reads the potentially stored
1811
    /// peer standing from the database and does other book-keeping before entering
1812
    /// its final resting place: the `peer_loop`. Note that the peer has already been
1813
    /// accepted for a connection for this loop to be entered. So we don't need
1814
    /// to check the standing again.
1815
    ///
1816
    /// Locking:
1817
    ///   * acquires `global_state_lock` for write
1818
    pub(crate) async fn run_wrapper<S>(
39✔
1819
        &mut self,
39✔
1820
        mut peer: S,
39✔
1821
        from_main_rx: broadcast::Receiver<MainToPeerTask>,
39✔
1822
    ) -> Result<()>
39✔
1823
    where
39✔
1824
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
39✔
1825
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
39✔
1826
        <S as TryStream>::Error: std::error::Error,
39✔
1827
    {
39✔
1828
        const TIME_DIFFERENCE_WARN_THRESHOLD_IN_SECONDS: i128 = 120;
1829

1830
        let cli_args = self.global_state_lock.cli().clone();
39✔
1831

1832
        let standing = self
39✔
1833
            .global_state_lock
39✔
1834
            .lock_guard()
39✔
1835
            .await
39✔
1836
            .net
1837
            .peer_databases
1838
            .peer_standings
1839
            .get(self.peer_address.ip())
39✔
1840
            .await
39✔
1841
            .unwrap_or_else(|| PeerStanding::new(cli_args.peer_tolerance));
38✔
1842

1843
        // Add peer to peer map
1844
        let peer_connection_info = PeerConnectionInfo::new(
38✔
1845
            self.peer_handshake_data.listen_port,
38✔
1846
            self.peer_address,
38✔
1847
            self.inbound_connection,
38✔
1848
        );
1849
        let new_peer = PeerInfo::new(
38✔
1850
            peer_connection_info,
38✔
1851
            &self.peer_handshake_data,
38✔
1852
            SystemTime::now(),
38✔
1853
            cli_args.peer_tolerance,
38✔
1854
        )
1855
        .with_standing(standing);
38✔
1856

1857
        // If timestamps are different, we currently just log a warning.
1858
        let peer_clock_ahead_in_seconds = new_peer.time_difference_in_seconds();
38✔
1859
        let own_clock_ahead_in_seconds = -peer_clock_ahead_in_seconds;
38✔
1860
        if peer_clock_ahead_in_seconds > TIME_DIFFERENCE_WARN_THRESHOLD_IN_SECONDS
38✔
1861
            || own_clock_ahead_in_seconds > TIME_DIFFERENCE_WARN_THRESHOLD_IN_SECONDS
38✔
1862
        {
1863
            let own_datetime_utc: DateTime<Utc> =
×
1864
                new_peer.own_timestamp_connection_established.into();
×
1865
            let peer_datetime_utc: DateTime<Utc> =
×
1866
                new_peer.peer_timestamp_connection_established.into();
×
1867
            warn!(
×
1868
                "New peer {} disagrees with us about time. Peer reports time {} but our clock at handshake was {}.",
×
1869
                new_peer.connected_address(),
×
1870
                peer_datetime_utc.format("%Y-%m-%d %H:%M:%S"),
×
1871
                own_datetime_utc.format("%Y-%m-%d %H:%M:%S"));
×
1872
        }
38✔
1873

1874
        // Multiple tasks might attempt to set up a connection concurrently. So
1875
        // even though we've checked that this connection is allowed, this check
1876
        // could have been invalidated by another task, for one accepting an
1877
        // incoming connection from a peer we're currently connecting to. So we
1878
        // need to make the a check again while holding a write-lock, since
1879
        // we're modifying `peer_map` here. Holding a read-lock doesn't work
1880
        // since it would have to be dropped before acquiring the write-lock.
1881
        {
1882
            let mut global_state = self.global_state_lock.lock_guard_mut().await;
38✔
1883
            let peer_map = &mut global_state.net.peer_map;
38✔
1884
            if peer_map
38✔
1885
                .values()
38✔
1886
                .any(|pi| pi.instance_id() == self.peer_handshake_data.instance_id)
38✔
1887
            {
1888
                bail!("Attempted to connect to already connected peer. Aborting connection.");
×
1889
            }
38✔
1890

1891
            if peer_map.len() >= cli_args.max_num_peers {
38✔
1892
                bail!("Attempted to connect to more peers than allowed. Aborting connection.");
×
1893
            }
38✔
1894

1895
            if peer_map.contains_key(&self.peer_address) {
38✔
1896
                // This shouldn't be possible, unless the peer reports a different instance ID than
1897
                // for the other connection. Only a malignant client would do that.
1898
                bail!("Already connected to peer. Aborting connection");
×
1899
            }
38✔
1900

1901
            peer_map.insert(self.peer_address, new_peer);
38✔
1902
        }
1903

1904
        // `MutablePeerState` contains the part of the peer-loop's state that is mutable
1905
        let mut peer_state = MutablePeerState::new(self.peer_handshake_data.tip_header.height);
38✔
1906

1907
        // If peer indicates more canonical block, request a block notification to catch up ASAP
1908
        if self.peer_handshake_data.tip_header.cumulative_proof_of_work
38✔
1909
            > self
38✔
1910
                .global_state_lock
38✔
1911
                .lock_guard()
38✔
1912
                .await
38✔
1913
                .chain
1914
                .light_state()
38✔
1915
                .kernel
1916
                .header
1917
                .cumulative_proof_of_work
1918
        {
1919
            // Send block notification request to catch up ASAP, in case we're
1920
            // behind the newly-connected peer.
1921
            peer.send(PeerMessage::BlockNotificationRequest).await?;
×
1922
        }
38✔
1923

1924
        let res = self.run(peer, from_main_rx, &mut peer_state).await;
38✔
1925
        debug!("Exited peer loop for {}", self.peer_address);
32✔
1926

1927
        close_peer_connected_callback(
32✔
1928
            self.global_state_lock.clone(),
32✔
1929
            self.peer_address,
32✔
1930
            &self.to_main_tx,
32✔
1931
        )
32✔
1932
        .await;
32✔
1933

1934
        debug!("Ending peer loop for {}", self.peer_address);
32✔
1935

1936
        // Return any error that `run` returned. Returning and not suppressing errors is a quite nice
1937
        // feature to have for testing purposes.
1938
        res
32✔
1939
    }
32✔
1940

1941
    /// Register graceful peer disconnection in the global state.
1942
    ///
1943
    /// See also [`NetworkingState::register_peer_disconnection`][1].
1944
    ///
1945
    /// # Locking:
1946
    ///   * acquires `global_state_lock` for write
1947
    ///
1948
    /// [1]: crate::models::state::networking_state::NetworkingState::register_peer_disconnection
1949
    async fn register_peer_disconnection(&mut self) {
×
1950
        let peer_id = self.peer_handshake_data.instance_id;
×
1951
        self.global_state_lock
×
1952
            .lock_guard_mut()
×
1953
            .await
×
1954
            .net
1955
            .register_peer_disconnection(peer_id, SystemTime::now());
×
1956
    }
×
1957
}
1958

1959
#[cfg(test)]
1960
#[cfg_attr(coverage_nightly, coverage(off))]
1961
mod tests {
1962
    use macro_rules_attr::apply;
1963
    use rand::rngs::StdRng;
1964
    use rand::Rng;
1965
    use rand::SeedableRng;
1966
    use tokio::sync::mpsc::error::TryRecvError;
1967
    use tracing_test::traced_test;
1968

1969
    use super::*;
1970
    use crate::config_models::cli_args;
1971
    use crate::config_models::network::Network;
1972
    use crate::models::blockchain::type_scripts::native_currency_amount::NativeCurrencyAmount;
1973
    use crate::models::peer::peer_block_notifications::PeerBlockNotification;
1974
    use crate::models::peer::transaction_notification::TransactionNotification;
1975
    use crate::models::peer::Sanction;
1976
    use crate::models::state::mempool::upgrade_priority::UpgradePriority;
1977
    use crate::models::state::tx_creation_config::TxCreationConfig;
1978
    use crate::models::state::tx_proving_capability::TxProvingCapability;
1979
    use crate::models::state::wallet::wallet_entropy::WalletEntropy;
1980
    use crate::tests::shared::blocks::fake_valid_block_for_tests;
1981
    use crate::tests::shared::blocks::fake_valid_sequence_of_blocks_for_tests;
1982
    use crate::tests::shared::globalstate::get_dummy_handshake_data_for_genesis;
1983
    use crate::tests::shared::globalstate::get_dummy_peer_connection_data_genesis;
1984
    use crate::tests::shared::globalstate::get_dummy_socket_address;
1985
    use crate::tests::shared::globalstate::get_test_genesis_setup;
1986
    use crate::tests::shared::mock_tx::invalid_empty_single_proof_transaction;
1987
    use crate::tests::shared::Action;
1988
    use crate::tests::shared::Mock;
1989
    use crate::tests::shared_tokio_runtime;
1990

1991
    #[traced_test]
1992
    #[apply(shared_tokio_runtime)]
1993
    async fn no_disconnect_on_invalid_message() {
1994
        // This test is intended to simulate a deserialization error, which
1995
        // would occur if the node is connected to a newer version of the
1996
        // software which has new variants of `PeerMessage`. The intended
1997
        // behavior is that a small punishment is applied but that the
1998
        // connection stays open.
1999
        let mock = Mock::new(vec![Action::ReadError, Action::Read(PeerMessage::Bye)]);
2000

2001
        let (peer_broadcast_tx, _from_main_rx_clone, to_main_tx, _to_main_rx1, state_lock, hsd) =
2002
            get_test_genesis_setup(Network::Main, 1, cli_args::Args::default())
2003
                .await
2004
                .unwrap();
2005

2006
        let peer_address = get_dummy_socket_address(2);
2007
        let from_main_rx_clone = peer_broadcast_tx.subscribe();
2008
        let mut peer_loop_handler =
2009
            PeerLoopHandler::new(to_main_tx, state_lock.clone(), peer_address, hsd, true, 1);
2010
        peer_loop_handler
2011
            .run_wrapper(mock, from_main_rx_clone)
2012
            .await
2013
            .unwrap();
2014

2015
        let peer_standing = state_lock
2016
            .lock_guard()
2017
            .await
2018
            .net
2019
            .get_peer_standing_from_database(peer_address.ip())
2020
            .await;
2021
        assert_eq!(
2022
            NegativePeerSanction::InvalidMessage.severity(),
2023
            peer_standing.unwrap().standing
2024
        );
2025
        assert_eq!(
2026
            NegativePeerSanction::InvalidMessage,
2027
            peer_standing.unwrap().latest_punishment.unwrap().0
2028
        );
2029
    }
2030

2031
    #[traced_test]
2032
    #[apply(shared_tokio_runtime)]
2033
    async fn test_peer_loop_bye() -> Result<()> {
2034
        let mock = Mock::new(vec![Action::Read(PeerMessage::Bye)]);
2035

2036
        let (peer_broadcast_tx, _from_main_rx_clone, to_main_tx, _to_main_rx1, state_lock, hsd) =
2037
            get_test_genesis_setup(Network::Main, 2, cli_args::Args::default()).await?;
2038

2039
        let peer_address = get_dummy_socket_address(2);
2040
        let from_main_rx_clone = peer_broadcast_tx.subscribe();
2041
        let mut peer_loop_handler =
2042
            PeerLoopHandler::new(to_main_tx, state_lock.clone(), peer_address, hsd, true, 1);
2043
        peer_loop_handler
2044
            .run_wrapper(mock, from_main_rx_clone)
2045
            .await?;
2046

2047
        assert_eq!(
2048
            2,
2049
            state_lock.lock_guard().await.net.peer_map.len(),
2050
            "peer map length must be back to 2 after goodbye"
2051
        );
2052

2053
        Ok(())
2054
    }
2055

2056
    #[traced_test]
2057
    #[apply(shared_tokio_runtime)]
2058
    async fn test_peer_loop_peer_list() {
2059
        let network = Network::Main;
2060
        let (peer_broadcast_tx, _from_main_rx_clone, to_main_tx, _to_main_rx1, state_lock, _hsd) =
2061
            get_test_genesis_setup(network, 2, cli_args::Args::default())
2062
                .await
2063
                .unwrap();
2064

2065
        let mut peer_infos = state_lock
2066
            .lock_guard()
2067
            .await
2068
            .net
2069
            .peer_map
2070
            .clone()
2071
            .into_values()
2072
            .collect::<Vec<_>>();
2073
        peer_infos.sort_by_cached_key(|x| x.connected_address());
2074
        let (peer_address0, instance_id0) = (
2075
            peer_infos[0].connected_address(),
2076
            peer_infos[0].instance_id(),
2077
        );
2078
        let (peer_address1, instance_id1) = (
2079
            peer_infos[1].connected_address(),
2080
            peer_infos[1].instance_id(),
2081
        );
2082

2083
        let (hsd2, sa2) = get_dummy_peer_connection_data_genesis(network, 2);
2084
        let expected_response = vec![
2085
            (peer_address0, instance_id0),
2086
            (peer_address1, instance_id1),
2087
            (sa2, hsd2.instance_id),
2088
        ];
2089
        let mock = Mock::new(vec![
2090
            Action::Read(PeerMessage::PeerListRequest),
2091
            Action::Write(PeerMessage::PeerListResponse(expected_response)),
2092
            Action::Read(PeerMessage::Bye),
2093
        ]);
2094

2095
        let from_main_rx_clone = peer_broadcast_tx.subscribe();
2096

2097
        let mut peer_loop_handler =
2098
            PeerLoopHandler::new(to_main_tx, state_lock.clone(), sa2, hsd2, true, 0);
2099
        peer_loop_handler
2100
            .run_wrapper(mock, from_main_rx_clone)
2101
            .await
2102
            .unwrap();
2103

2104
        assert_eq!(
2105
            2,
2106
            state_lock.lock_guard().await.net.peer_map.len(),
2107
            "peer map must have length 2 after saying goodbye to peer 2"
2108
        );
2109
    }
2110

2111
    #[traced_test]
2112
    #[apply(shared_tokio_runtime)]
2113
    async fn node_does_not_record_disconnection_time_when_peer_initiates_disconnect() -> Result<()>
2114
    {
2115
        let args = cli_args::Args::default();
2116
        let network = args.network;
2117
        let (from_main_tx, from_main_rx, to_main_tx, to_main_rx, state_lock, _) =
2118
            get_test_genesis_setup(network, 0, args).await?;
2119

2120
        let peer_address = get_dummy_socket_address(0);
2121
        let peer_handshake_data = get_dummy_handshake_data_for_genesis(network);
2122
        let peer_id = peer_handshake_data.instance_id;
2123
        let mut peer_loop_handler = PeerLoopHandler::new(
2124
            to_main_tx,
2125
            state_lock.clone(),
2126
            peer_address,
2127
            peer_handshake_data,
2128
            true,
2129
            1,
2130
        );
2131
        let mock = Mock::new(vec![Action::Read(PeerMessage::Bye)]);
2132
        peer_loop_handler.run_wrapper(mock, from_main_rx).await?;
2133

2134
        let global_state = state_lock.lock_guard().await;
2135
        assert!(global_state
2136
            .net
2137
            .last_disconnection_time_of_peer(peer_id)
2138
            .is_none());
2139

2140
        drop(to_main_rx);
2141
        drop(from_main_tx);
2142

2143
        Ok(())
2144
    }
2145

2146
    mod blocks {
2147
        use super::*;
2148

2149
        #[traced_test]
2150
        #[apply(shared_tokio_runtime)]
2151
        async fn different_genesis_test() -> Result<()> {
2152
            // In this scenario a peer provides another genesis block than what has been
2153
            // hardcoded. This should lead to the closing of the connection to this peer
2154
            // and a ban.
2155

2156
            let network = Network::Main;
2157
            let (
2158
                _peer_broadcast_tx,
2159
                from_main_rx_clone,
2160
                to_main_tx,
2161
                mut to_main_rx1,
2162
                state_lock,
2163
                hsd,
2164
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2165
            assert_eq!(1000, state_lock.cli().peer_tolerance);
2166
            let peer_address = get_dummy_socket_address(0);
2167

2168
            // Although the database is empty, `get_latest_block` still returns the genesis block,
2169
            // since that block is hardcoded.
2170
            let mut different_genesis_block = state_lock
2171
                .lock_guard()
2172
                .await
2173
                .chain
2174
                .archival_state()
2175
                .get_tip()
2176
                .await;
2177

2178
            different_genesis_block.set_header_nonce(StdRng::seed_from_u64(5550001).random());
2179
            let [block_1_with_different_genesis] = fake_valid_sequence_of_blocks_for_tests(
2180
                &different_genesis_block,
2181
                Timestamp::hours(1),
2182
                StdRng::seed_from_u64(5550001).random(),
2183
                network,
2184
            )
2185
            .await;
2186
            let mock = Mock::new(vec![Action::Read(PeerMessage::Block(Box::new(
2187
                block_1_with_different_genesis.try_into().unwrap(),
2188
            )))]);
2189

2190
            let mut peer_loop_handler = PeerLoopHandler::new(
2191
                to_main_tx.clone(),
2192
                state_lock.clone(),
2193
                peer_address,
2194
                hsd,
2195
                true,
2196
                1,
2197
            );
2198
            let res = peer_loop_handler
2199
                .run_wrapper(mock, from_main_rx_clone)
2200
                .await;
2201
            assert!(
2202
                res.is_err(),
2203
                "run_wrapper must return failure when genesis is different"
2204
            );
2205

2206
            match to_main_rx1.recv().await {
2207
                Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
2208
                _ => bail!("Must receive remove of peer block max height"),
2209
            }
2210

2211
            // Verify that no further message was sent to main loop
2212
            match to_main_rx1.try_recv() {
2213
                Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (),
2214
                _ => bail!("Block notification must not be sent for block with invalid PoW"),
2215
            };
2216

2217
            drop(to_main_tx);
2218

2219
            let peer_standing = state_lock
2220
                .lock_guard()
2221
                .await
2222
                .net
2223
                .get_peer_standing_from_database(peer_address.ip())
2224
                .await;
2225
            assert_eq!(
2226
                -i32::from(state_lock.cli().peer_tolerance),
2227
                peer_standing.unwrap().standing
2228
            );
2229
            assert_eq!(
2230
                NegativePeerSanction::DifferentGenesis,
2231
                peer_standing.unwrap().latest_punishment.unwrap().0
2232
            );
2233

2234
            Ok(())
2235
        }
2236

2237
        #[traced_test]
2238
        #[apply(shared_tokio_runtime)]
2239
        async fn block_without_valid_pow_test() -> Result<()> {
2240
            // In this scenario, a block without a valid PoW is received. This block should be rejected
2241
            // by the peer loop and a notification should never reach the main loop.
2242

2243
            let network = Network::Main;
2244
            let (
2245
                peer_broadcast_tx,
2246
                _from_main_rx_clone,
2247
                to_main_tx,
2248
                mut to_main_rx1,
2249
                state_lock,
2250
                hsd,
2251
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2252
            let peer_address = get_dummy_socket_address(0);
2253
            let genesis_block: Block = state_lock
2254
                .lock_guard()
2255
                .await
2256
                .chain
2257
                .archival_state()
2258
                .get_tip()
2259
                .await;
2260

2261
            // Make a with hash above what the implied threshold from
2262
            let [mut block_without_valid_pow] = fake_valid_sequence_of_blocks_for_tests(
2263
                &genesis_block,
2264
                Timestamp::hours(1),
2265
                StdRng::seed_from_u64(5550001).random(),
2266
                network,
2267
            )
2268
            .await;
2269

2270
            // This *probably* is invalid PoW -- and needs to be for this test to
2271
            // work.
2272
            block_without_valid_pow.set_header_nonce(Digest::default());
2273

2274
            // Sending an invalid block will not necessarily result in a ban. This depends on the peer
2275
            // tolerance that is set in the client. For this reason, we include a "Bye" here.
2276
            let mock = Mock::new(vec![
2277
                Action::Read(PeerMessage::Block(Box::new(
2278
                    block_without_valid_pow.clone().try_into().unwrap(),
2279
                ))),
2280
                Action::Read(PeerMessage::Bye),
2281
            ]);
2282

2283
            let from_main_rx_clone = peer_broadcast_tx.subscribe();
2284

2285
            let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
2286
                to_main_tx.clone(),
2287
                state_lock.clone(),
2288
                peer_address,
2289
                hsd,
2290
                true,
2291
                1,
2292
                block_without_valid_pow.header().timestamp,
2293
            );
2294
            peer_loop_handler
2295
                .run_wrapper(mock, from_main_rx_clone)
2296
                .await
2297
                .expect("sending (one) invalid block should not result in closed connection");
2298

2299
            match to_main_rx1.recv().await {
2300
                Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
2301
                _ => bail!("Must receive remove of peer block max height"),
2302
            }
2303

2304
            // Verify that no further message was sent to main loop
2305
            match to_main_rx1.try_recv() {
2306
                Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (),
2307
                _ => bail!("Block notification must not be sent for block with invalid PoW"),
2308
            };
2309

2310
            // We need to have the transmitter in scope until we have received from it
2311
            // otherwise the receiver will report the disconnected error when we attempt
2312
            // to read from it. And the purpose is to verify that the channel is empty,
2313
            // not that it has been closed.
2314
            drop(to_main_tx);
2315

2316
            // Verify that peer standing was stored in database
2317
            let standing = state_lock
2318
                .lock_guard()
2319
                .await
2320
                .net
2321
                .peer_databases
2322
                .peer_standings
2323
                .get(peer_address.ip())
2324
                .await
2325
                .unwrap();
2326
            assert!(
2327
                standing.standing < 0,
2328
                "Peer must be sanctioned for sending a bad block"
2329
            );
2330

2331
            Ok(())
2332
        }
2333

2334
        #[traced_test]
2335
        #[apply(shared_tokio_runtime)]
2336
        async fn test_peer_loop_block_with_block_in_db() -> Result<()> {
2337
            // The scenario tested here is that a client receives a block that is already
2338
            // known and stored. The expected behavior is to ignore the block and not send
2339
            // a message to the main task.
2340

2341
            let network = Network::Main;
2342
            let (
2343
                peer_broadcast_tx,
2344
                _from_main_rx_clone,
2345
                to_main_tx,
2346
                mut to_main_rx1,
2347
                mut alice,
2348
                hsd,
2349
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2350
            let peer_address = get_dummy_socket_address(0);
2351
            let genesis_block: Block = Block::genesis(network);
2352

2353
            let now = genesis_block.header().timestamp + Timestamp::hours(1);
2354
            let block_1 =
2355
                fake_valid_block_for_tests(&alice, StdRng::seed_from_u64(5550001).random()).await;
2356
            assert!(
2357
                block_1.is_valid(&genesis_block, now, network).await,
2358
                "Block must be valid for this test to make sense"
2359
            );
2360
            alice.set_new_tip(block_1.clone()).await?;
2361

2362
            let mock_peer_messages = Mock::new(vec![
2363
                Action::Read(PeerMessage::Block(Box::new(
2364
                    block_1.clone().try_into().unwrap(),
2365
                ))),
2366
                Action::Read(PeerMessage::Bye),
2367
            ]);
2368

2369
            let from_main_rx_clone = peer_broadcast_tx.subscribe();
2370

2371
            let mut alice_peer_loop_handler = PeerLoopHandler::with_mocked_time(
2372
                to_main_tx.clone(),
2373
                alice.clone(),
2374
                peer_address,
2375
                hsd,
2376
                false,
2377
                1,
2378
                block_1.header().timestamp,
2379
            );
2380
            alice_peer_loop_handler
2381
                .run_wrapper(mock_peer_messages, from_main_rx_clone)
2382
                .await?;
2383

2384
            match to_main_rx1.recv().await {
2385
                Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
2386
                other => bail!("Must receive remove of peer block max height. Got:\n {other:?}"),
2387
            }
2388
            match to_main_rx1.try_recv() {
2389
                Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (),
2390
                _ => bail!("Block notification must not be sent for block with invalid PoW"),
2391
            };
2392
            drop(to_main_tx);
2393

2394
            if !alice.lock_guard().await.net.peer_map.is_empty() {
2395
                bail!("peer map must be empty after closing connection gracefully");
2396
            }
2397

2398
            Ok(())
2399
        }
2400

2401
        #[traced_test]
2402
        #[apply(shared_tokio_runtime)]
2403
        async fn block_request_batch_simple() {
2404
            // Scenario: Six blocks (including genesis) are known. Peer requests
2405
            // from all possible starting points, and client responds with the
2406
            // correct list of blocks.
2407
            let network = Network::Main;
2408
            let (
2409
                _peer_broadcast_tx,
2410
                from_main_rx_clone,
2411
                to_main_tx,
2412
                _to_main_rx1,
2413
                mut state_lock,
2414
                handshake,
2415
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default())
2416
                .await
2417
                .unwrap();
2418
            let genesis_block: Block = Block::genesis(network);
2419
            let peer_address = get_dummy_socket_address(0);
2420
            let [block_1, block_2, block_3, block_4, block_5] =
2421
                fake_valid_sequence_of_blocks_for_tests(
2422
                    &genesis_block,
2423
                    Timestamp::hours(1),
2424
                    StdRng::seed_from_u64(5550001).random(),
2425
                    network,
2426
                )
2427
                .await;
2428
            let blocks = vec![
2429
                genesis_block,
2430
                block_1,
2431
                block_2,
2432
                block_3,
2433
                block_4,
2434
                block_5.clone(),
2435
            ];
2436
            for block in blocks.iter().skip(1) {
2437
                state_lock.set_new_tip(block.to_owned()).await.unwrap();
2438
            }
2439

2440
            let mmra = state_lock
2441
                .lock_guard()
2442
                .await
2443
                .chain
2444
                .archival_state()
2445
                .archival_block_mmr
2446
                .ammr()
2447
                .to_accumulator_async()
2448
                .await;
2449
            for i in 0..=4 {
2450
                let expected_response = {
2451
                    let state = state_lock.lock_guard().await;
2452
                    let blocks_for_response = blocks.iter().skip(i + 1).cloned().collect_vec();
2453
                    PeerLoopHandler::batch_response(&state, blocks_for_response, &mmra)
2454
                        .await
2455
                        .unwrap()
2456
                };
2457
                let mock = Mock::new(vec![
2458
                    Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch {
2459
                        known_blocks: vec![blocks[i].hash()],
2460
                        max_response_len: 14,
2461
                        anchor: mmra.clone(),
2462
                    })),
2463
                    Action::Write(PeerMessage::BlockResponseBatch(expected_response)),
2464
                    Action::Read(PeerMessage::Bye),
2465
                ]);
2466
                let mut peer_loop_handler = PeerLoopHandler::new(
2467
                    to_main_tx.clone(),
2468
                    state_lock.clone(),
2469
                    peer_address,
2470
                    handshake,
2471
                    false,
2472
                    1,
2473
                );
2474

2475
                peer_loop_handler
2476
                    .run_wrapper(mock, from_main_rx_clone.resubscribe())
2477
                    .await
2478
                    .unwrap();
2479
            }
2480
        }
2481

2482
        #[traced_test]
2483
        #[apply(shared_tokio_runtime)]
2484
        async fn block_request_batch_in_order_test() -> Result<()> {
2485
            // Scenario: A fork began at block 2, node knows two blocks of height 2 and two of height 3.
2486
            // A peer requests a batch of blocks starting from block 1. Ensure that the correct blocks
2487
            // are returned.
2488

2489
            let network = Network::Main;
2490
            let (
2491
                _peer_broadcast_tx,
2492
                from_main_rx_clone,
2493
                to_main_tx,
2494
                _to_main_rx1,
2495
                mut state_lock,
2496
                hsd,
2497
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2498
            let genesis_block: Block = Block::genesis(network);
2499
            let peer_address = get_dummy_socket_address(0);
2500
            let [block_1, block_2_a, block_3_a] = fake_valid_sequence_of_blocks_for_tests(
2501
                &genesis_block,
2502
                Timestamp::hours(1),
2503
                StdRng::seed_from_u64(5550001).random(),
2504
                network,
2505
            )
2506
            .await;
2507
            let [block_2_b, block_3_b] = fake_valid_sequence_of_blocks_for_tests(
2508
                &block_1,
2509
                Timestamp::hours(1),
2510
                StdRng::seed_from_u64(5550002).random(),
2511
                network,
2512
            )
2513
            .await;
2514
            assert_ne!(block_2_b.hash(), block_2_a.hash());
2515

2516
            state_lock.set_new_tip(block_1.clone()).await?;
2517
            state_lock.set_new_tip(block_2_a.clone()).await?;
2518
            state_lock.set_new_tip(block_2_b.clone()).await?;
2519
            state_lock.set_new_tip(block_3_b.clone()).await?;
2520
            state_lock.set_new_tip(block_3_a.clone()).await?;
2521

2522
            let anchor = state_lock
2523
                .lock_guard()
2524
                .await
2525
                .chain
2526
                .archival_state()
2527
                .archival_block_mmr
2528
                .ammr()
2529
                .to_accumulator_async()
2530
                .await;
2531
            let response_1 = {
2532
                let state_lock = state_lock.lock_guard().await;
2533
                PeerLoopHandler::batch_response(
2534
                    &state_lock,
2535
                    vec![block_1.clone(), block_2_a.clone(), block_3_a.clone()],
2536
                    &anchor,
2537
                )
2538
                .await
2539
                .unwrap()
2540
            };
2541

2542
            let mut mock = Mock::new(vec![
2543
                Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch {
2544
                    known_blocks: vec![genesis_block.hash()],
2545
                    max_response_len: 14,
2546
                    anchor: anchor.clone(),
2547
                })),
2548
                Action::Write(PeerMessage::BlockResponseBatch(response_1)),
2549
                Action::Read(PeerMessage::Bye),
2550
            ]);
2551

2552
            let mut peer_loop_handler_1 = PeerLoopHandler::with_mocked_time(
2553
                to_main_tx.clone(),
2554
                state_lock.clone(),
2555
                peer_address,
2556
                hsd,
2557
                false,
2558
                1,
2559
                block_3_a.header().timestamp,
2560
            );
2561

2562
            peer_loop_handler_1
2563
                .run_wrapper(mock, from_main_rx_clone.resubscribe())
2564
                .await?;
2565

2566
            // Peer knows block 2_b, verify that canonical chain with 2_a is returned
2567
            let response_2 = {
2568
                let state_lock = state_lock.lock_guard().await;
2569
                PeerLoopHandler::batch_response(
2570
                    &state_lock,
2571
                    vec![block_2_a, block_3_a.clone()],
2572
                    &anchor,
2573
                )
2574
                .await
2575
                .unwrap()
2576
            };
2577
            mock = Mock::new(vec![
2578
                Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch {
2579
                    known_blocks: vec![block_2_b.hash(), block_1.hash(), genesis_block.hash()],
2580
                    max_response_len: 14,
2581
                    anchor,
2582
                })),
2583
                Action::Write(PeerMessage::BlockResponseBatch(response_2)),
2584
                Action::Read(PeerMessage::Bye),
2585
            ]);
2586

2587
            let mut peer_loop_handler_2 = PeerLoopHandler::with_mocked_time(
2588
                to_main_tx.clone(),
2589
                state_lock.clone(),
2590
                peer_address,
2591
                hsd,
2592
                false,
2593
                1,
2594
                block_3_a.header().timestamp,
2595
            );
2596

2597
            peer_loop_handler_2
2598
                .run_wrapper(mock, from_main_rx_clone)
2599
                .await?;
2600

2601
            Ok(())
2602
        }
2603

2604
        #[traced_test]
2605
        #[apply(shared_tokio_runtime)]
2606
        async fn block_request_batch_out_of_order_test() -> Result<()> {
2607
            // Scenario: A fork began at block 2, node knows two blocks of height 2 and two of height 3.
2608
            // A peer requests a batch of blocks starting from block 1, but the peer supplies their
2609
            // hashes in a wrong order. Ensure that the correct blocks are returned, in the right order.
2610
            // The blocks will be supplied in the correct order but starting from the first digest in
2611
            // the list that is known and canonical.
2612

2613
            let network = Network::Main;
2614
            let (
2615
                _peer_broadcast_tx,
2616
                from_main_rx_clone,
2617
                to_main_tx,
2618
                _to_main_rx1,
2619
                mut state_lock,
2620
                hsd,
2621
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2622
            let genesis_block = Block::genesis(network);
2623
            let peer_address = get_dummy_socket_address(0);
2624
            let [block_1, block_2_a, block_3_a] = fake_valid_sequence_of_blocks_for_tests(
2625
                &genesis_block,
2626
                Timestamp::hours(1),
2627
                StdRng::seed_from_u64(5550001).random(),
2628
                network,
2629
            )
2630
            .await;
2631
            let [block_2_b, block_3_b] = fake_valid_sequence_of_blocks_for_tests(
2632
                &block_1,
2633
                Timestamp::hours(1),
2634
                StdRng::seed_from_u64(5550002).random(),
2635
                network,
2636
            )
2637
            .await;
2638
            assert_ne!(block_2_a.hash(), block_2_b.hash());
2639

2640
            state_lock.set_new_tip(block_1.clone()).await?;
2641
            state_lock.set_new_tip(block_2_a.clone()).await?;
2642
            state_lock.set_new_tip(block_2_b.clone()).await?;
2643
            state_lock.set_new_tip(block_3_b.clone()).await?;
2644
            state_lock.set_new_tip(block_3_a.clone()).await?;
2645

2646
            // Peer knows block 2_b, verify that canonical chain with 2_a is returned
2647
            let mut expected_anchor = block_3_a.body().block_mmr_accumulator.clone();
2648
            expected_anchor.append(block_3_a.hash());
2649
            let state_anchor = state_lock
2650
                .lock_guard()
2651
                .await
2652
                .chain
2653
                .archival_state()
2654
                .archival_block_mmr
2655
                .ammr()
2656
                .to_accumulator_async()
2657
                .await;
2658
            assert_eq!(
2659
                expected_anchor, state_anchor,
2660
                "Catching assumption about MMRA in tip and in archival state"
2661
            );
2662

2663
            let response = {
2664
                let state_lock = state_lock.lock_guard().await;
2665
                PeerLoopHandler::batch_response(
2666
                    &state_lock,
2667
                    vec![block_1.clone(), block_2_a, block_3_a.clone()],
2668
                    &expected_anchor,
2669
                )
2670
                .await
2671
                .unwrap()
2672
            };
2673
            let mock = Mock::new(vec![
2674
                Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch {
2675
                    known_blocks: vec![block_2_b.hash(), genesis_block.hash(), block_1.hash()],
2676
                    max_response_len: 14,
2677
                    anchor: expected_anchor,
2678
                })),
2679
                // Since genesis block is the 1st known in the list of known blocks,
2680
                // it's immediate descendent, block_1, is the first one returned.
2681
                Action::Write(PeerMessage::BlockResponseBatch(response)),
2682
                Action::Read(PeerMessage::Bye),
2683
            ]);
2684

2685
            let mut peer_loop_handler_2 = PeerLoopHandler::with_mocked_time(
2686
                to_main_tx.clone(),
2687
                state_lock.clone(),
2688
                peer_address,
2689
                hsd,
2690
                false,
2691
                1,
2692
                block_3_a.header().timestamp,
2693
            );
2694

2695
            peer_loop_handler_2
2696
                .run_wrapper(mock, from_main_rx_clone)
2697
                .await?;
2698

2699
            Ok(())
2700
        }
2701

2702
        #[traced_test]
2703
        #[apply(shared_tokio_runtime)]
2704
        async fn request_unknown_height_doesnt_crash() {
2705
            // Scenario: Only genesis block is known. Peer requests block of height
2706
            // 2.
2707
            let network = Network::Main;
2708
            let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, state_lock, hsd) =
2709
                get_test_genesis_setup(network, 0, cli_args::Args::default())
2710
                    .await
2711
                    .unwrap();
2712
            let peer_address = get_dummy_socket_address(0);
2713
            let mock = Mock::new(vec![
2714
                Action::Read(PeerMessage::BlockRequestByHeight(2.into())),
2715
                Action::Read(PeerMessage::Bye),
2716
            ]);
2717

2718
            let mut peer_loop_handler = PeerLoopHandler::new(
2719
                to_main_tx.clone(),
2720
                state_lock.clone(),
2721
                peer_address,
2722
                hsd,
2723
                false,
2724
                1,
2725
            );
2726

2727
            // This will return error if seen read/write order does not match that of the
2728
            // mocked object.
2729
            peer_loop_handler
2730
                .run_wrapper(mock, from_main_rx_clone)
2731
                .await
2732
                .unwrap();
2733

2734
            // Verify that peer is sanctioned for this nonsense.
2735
            assert!(state_lock
2736
                .lock_guard()
2737
                .await
2738
                .net
2739
                .get_peer_standing_from_database(peer_address.ip())
2740
                .await
2741
                .unwrap()
2742
                .standing
2743
                .is_negative());
2744
        }
2745

2746
        #[traced_test]
2747
        #[apply(shared_tokio_runtime)]
2748
        async fn find_canonical_chain_when_multiple_blocks_at_same_height_test() -> Result<()> {
2749
            // Scenario: A fork began at block 2, node knows two blocks of height 2 and two of height 3.
2750
            // A peer requests a block at height 2. Verify that the correct block at height 2 is
2751
            // returned.
2752

2753
            let network = Network::Main;
2754
            let (
2755
                _peer_broadcast_tx,
2756
                from_main_rx_clone,
2757
                to_main_tx,
2758
                _to_main_rx1,
2759
                mut state_lock,
2760
                hsd,
2761
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2762
            let genesis_block = Block::genesis(network);
2763
            let peer_address = get_dummy_socket_address(0);
2764

2765
            let [block_1, block_2_a, block_3_a] = fake_valid_sequence_of_blocks_for_tests(
2766
                &genesis_block,
2767
                Timestamp::hours(1),
2768
                StdRng::seed_from_u64(5550001).random(),
2769
                network,
2770
            )
2771
            .await;
2772
            let [block_2_b, block_3_b] = fake_valid_sequence_of_blocks_for_tests(
2773
                &block_1,
2774
                Timestamp::hours(1),
2775
                StdRng::seed_from_u64(5550002).random(),
2776
                network,
2777
            )
2778
            .await;
2779
            assert_ne!(block_2_a.hash(), block_2_b.hash());
2780

2781
            state_lock.set_new_tip(block_1.clone()).await?;
2782
            state_lock.set_new_tip(block_2_a.clone()).await?;
2783
            state_lock.set_new_tip(block_2_b.clone()).await?;
2784
            state_lock.set_new_tip(block_3_b.clone()).await?;
2785
            state_lock.set_new_tip(block_3_a.clone()).await?;
2786

2787
            let mock = Mock::new(vec![
2788
                Action::Read(PeerMessage::BlockRequestByHeight(2.into())),
2789
                Action::Write(PeerMessage::Block(Box::new(block_2_a.try_into().unwrap()))),
2790
                Action::Read(PeerMessage::BlockRequestByHeight(3.into())),
2791
                Action::Write(PeerMessage::Block(Box::new(
2792
                    block_3_a.clone().try_into().unwrap(),
2793
                ))),
2794
                Action::Read(PeerMessage::Bye),
2795
            ]);
2796

2797
            let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
2798
                to_main_tx.clone(),
2799
                state_lock.clone(),
2800
                peer_address,
2801
                hsd,
2802
                false,
2803
                1,
2804
                block_3_a.header().timestamp,
2805
            );
2806

2807
            // This will return error if seen read/write order does not match that of the
2808
            // mocked object.
2809
            peer_loop_handler
2810
                .run_wrapper(mock, from_main_rx_clone)
2811
                .await?;
2812

2813
            Ok(())
2814
        }
2815

2816
        #[traced_test]
2817
        #[apply(shared_tokio_runtime)]
2818
        async fn receival_of_block_notification_height_1() {
2819
            // Scenario: client only knows genesis block. Then receives block
2820
            // notification of height 1. Must request block 1.
2821
            let network = Network::Main;
2822
            let mut rng = StdRng::seed_from_u64(5552401);
2823
            let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, to_main_rx1, state_lock, hsd) =
2824
                get_test_genesis_setup(network, 0, cli_args::Args::default())
2825
                    .await
2826
                    .unwrap();
2827
            let block_1 = fake_valid_block_for_tests(&state_lock, rng.random()).await;
2828
            let notification_height1 = (&block_1).into();
2829
            let mock = Mock::new(vec![
2830
                Action::Read(PeerMessage::BlockNotification(notification_height1)),
2831
                Action::Write(PeerMessage::BlockRequestByHeight(1u64.into())),
2832
                Action::Read(PeerMessage::Bye),
2833
            ]);
2834

2835
            let peer_address = get_dummy_socket_address(0);
2836
            let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
2837
                to_main_tx.clone(),
2838
                state_lock.clone(),
2839
                peer_address,
2840
                hsd,
2841
                false,
2842
                1,
2843
                block_1.header().timestamp,
2844
            );
2845
            peer_loop_handler
2846
                .run_wrapper(mock, from_main_rx_clone)
2847
                .await
2848
                .unwrap();
2849

2850
            drop(to_main_rx1);
2851
        }
2852

2853
        #[traced_test]
2854
        #[apply(shared_tokio_runtime)]
2855
        async fn receive_block_request_by_height_block_7() {
2856
            // Scenario: client only knows blocks up to height 7. Then receives block-
2857
            // request-by-height for height 7. Must respond with block 7.
2858
            let network = Network::Main;
2859
            let mut rng = StdRng::seed_from_u64(5552401);
2860
            let (
2861
                _peer_broadcast_tx,
2862
                from_main_rx_clone,
2863
                to_main_tx,
2864
                to_main_rx1,
2865
                mut state_lock,
2866
                hsd,
2867
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default())
2868
                .await
2869
                .unwrap();
2870
            let genesis_block = Block::genesis(network);
2871
            let blocks: [Block; 7] = fake_valid_sequence_of_blocks_for_tests(
2872
                &genesis_block,
2873
                Timestamp::hours(1),
2874
                rng.random(),
2875
                network,
2876
            )
2877
            .await;
2878
            let block7 = blocks.last().unwrap().to_owned();
2879
            let tip_height: u64 = block7.header().height.into();
2880
            assert_eq!(7, tip_height);
2881

2882
            for block in &blocks {
2883
                state_lock.set_new_tip(block.to_owned()).await.unwrap();
2884
            }
2885

2886
            let block7_response = PeerMessage::Block(Box::new(block7.try_into().unwrap()));
2887
            let mock = Mock::new(vec![
2888
                Action::Read(PeerMessage::BlockRequestByHeight(7u64.into())),
2889
                Action::Write(block7_response),
2890
                Action::Read(PeerMessage::Bye),
2891
            ]);
2892

2893
            let peer_address = get_dummy_socket_address(0);
2894
            let mut peer_loop_handler = PeerLoopHandler::new(
2895
                to_main_tx.clone(),
2896
                state_lock.clone(),
2897
                peer_address,
2898
                hsd,
2899
                false,
2900
                1,
2901
            );
2902
            peer_loop_handler
2903
                .run_wrapper(mock, from_main_rx_clone)
2904
                .await
2905
                .unwrap();
2906

2907
            drop(to_main_rx1);
2908
        }
2909

2910
        #[traced_test]
2911
        #[apply(shared_tokio_runtime)]
2912
        async fn test_peer_loop_receival_of_first_block() -> Result<()> {
2913
            // Scenario: client only knows genesis block. Then receives block 1.
2914

2915
            let network = Network::Main;
2916
            let mut rng = StdRng::seed_from_u64(5550001);
2917
            let (
2918
                _peer_broadcast_tx,
2919
                from_main_rx_clone,
2920
                to_main_tx,
2921
                mut to_main_rx1,
2922
                state_lock,
2923
                hsd,
2924
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2925
            let peer_address = get_dummy_socket_address(0);
2926

2927
            let block_1 = fake_valid_block_for_tests(&state_lock, rng.random()).await;
2928
            let mock = Mock::new(vec![
2929
                Action::Read(PeerMessage::Block(Box::new(
2930
                    block_1.clone().try_into().unwrap(),
2931
                ))),
2932
                Action::Read(PeerMessage::Bye),
2933
            ]);
2934

2935
            let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
2936
                to_main_tx.clone(),
2937
                state_lock.clone(),
2938
                peer_address,
2939
                hsd,
2940
                false,
2941
                1,
2942
                block_1.header().timestamp,
2943
            );
2944
            peer_loop_handler
2945
                .run_wrapper(mock, from_main_rx_clone)
2946
                .await?;
2947

2948
            // Verify that a block was sent to `main_loop`
2949
            match to_main_rx1.recv().await {
2950
                Some(PeerTaskToMain::NewBlocks(_block)) => (),
2951
                _ => bail!("Did not find msg sent to main task"),
2952
            };
2953

2954
            match to_main_rx1.recv().await {
2955
                Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
2956
                _ => bail!("Must receive remove of peer block max height"),
2957
            }
2958

2959
            if !state_lock.lock_guard().await.net.peer_map.is_empty() {
2960
                bail!("peer map must be empty after closing connection gracefully");
2961
            }
2962

2963
            Ok(())
2964
        }
2965

2966
        #[traced_test]
2967
        #[apply(shared_tokio_runtime)]
2968
        async fn test_peer_loop_receival_of_second_block_no_blocks_in_db() -> Result<()> {
2969
            // In this scenario, the client only knows the genesis block (block 0) and then
2970
            // receives block 2, meaning that block 1 will have to be requested.
2971

2972
            let network = Network::Main;
2973
            let (
2974
                _peer_broadcast_tx,
2975
                from_main_rx_clone,
2976
                to_main_tx,
2977
                mut to_main_rx1,
2978
                state_lock,
2979
                hsd,
2980
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2981
            let peer_address = get_dummy_socket_address(0);
2982
            let genesis_block: Block = state_lock
2983
                .lock_guard()
2984
                .await
2985
                .chain
2986
                .archival_state()
2987
                .get_tip()
2988
                .await;
2989
            let [block_1, block_2] = fake_valid_sequence_of_blocks_for_tests(
2990
                &genesis_block,
2991
                Timestamp::hours(1),
2992
                StdRng::seed_from_u64(5550001).random(),
2993
                network,
2994
            )
2995
            .await;
2996

2997
            let mock = Mock::new(vec![
2998
                Action::Read(PeerMessage::Block(Box::new(
2999
                    block_2.clone().try_into().unwrap(),
3000
                ))),
3001
                Action::Write(PeerMessage::BlockRequestByHash(block_1.hash())),
3002
                Action::Read(PeerMessage::Block(Box::new(
3003
                    block_1.clone().try_into().unwrap(),
3004
                ))),
3005
                Action::Read(PeerMessage::Bye),
3006
            ]);
3007

3008
            let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
3009
                to_main_tx.clone(),
3010
                state_lock.clone(),
3011
                peer_address,
3012
                hsd,
3013
                true,
3014
                1,
3015
                block_2.header().timestamp,
3016
            );
3017
            peer_loop_handler
3018
                .run_wrapper(mock, from_main_rx_clone)
3019
                .await?;
3020

3021
            match to_main_rx1.recv().await {
3022
                Some(PeerTaskToMain::NewBlocks(blocks)) => {
3023
                    if blocks[0].hash() != block_1.hash() {
3024
                        bail!("1st received block by main loop must be block 1");
3025
                    }
3026
                    if blocks[1].hash() != block_2.hash() {
3027
                        bail!("2nd received block by main loop must be block 2");
3028
                    }
3029
                }
3030
                _ => bail!("Did not find msg sent to main task 1"),
3031
            };
3032
            match to_main_rx1.recv().await {
3033
                Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
3034
                _ => bail!("Must receive remove of peer block max height"),
3035
            }
3036

3037
            if !state_lock.lock_guard().await.net.peer_map.is_empty() {
3038
                bail!("peer map must be empty after closing connection gracefully");
3039
            }
3040

3041
            Ok(())
3042
        }
3043

3044
        #[traced_test]
3045
        #[apply(shared_tokio_runtime)]
3046
        async fn prevent_ram_exhaustion_test() -> Result<()> {
3047
            // In this scenario the peer sends more blocks than the client allows to store in the
3048
            // fork-reconciliation field. This should result in abandonment of the fork-reconciliation
3049
            // process as the alternative is that the program will crash because it runs out of RAM.
3050

3051
            let network = Network::Main;
3052
            let mut rng = StdRng::seed_from_u64(5550001);
3053
            let (
3054
                _peer_broadcast_tx,
3055
                from_main_rx_clone,
3056
                to_main_tx,
3057
                mut to_main_rx1,
3058
                mut state_lock,
3059
                _hsd,
3060
            ) = get_test_genesis_setup(network, 1, cli_args::Args::default()).await?;
3061
            let genesis_block = Block::genesis(network);
3062

3063
            // Restrict max number of blocks held in memory to 2.
3064
            let mut cli = state_lock.cli().clone();
3065
            cli.sync_mode_threshold = 2;
3066
            state_lock.set_cli(cli).await;
3067

3068
            let (hsd1, peer_address1) = get_dummy_peer_connection_data_genesis(network, 1);
3069
            let [block_1, _block_2, block_3, block_4] = fake_valid_sequence_of_blocks_for_tests(
3070
                &genesis_block,
3071
                Timestamp::hours(1),
3072
                rng.random(),
3073
                network,
3074
            )
3075
            .await;
3076
            state_lock.set_new_tip(block_1.clone()).await?;
3077

3078
            let mock = Mock::new(vec![
3079
                Action::Read(PeerMessage::Block(Box::new(
3080
                    block_4.clone().try_into().unwrap(),
3081
                ))),
3082
                Action::Write(PeerMessage::BlockRequestByHash(block_3.hash())),
3083
                Action::Read(PeerMessage::Block(Box::new(
3084
                    block_3.clone().try_into().unwrap(),
3085
                ))),
3086
                Action::Read(PeerMessage::Bye),
3087
            ]);
3088

3089
            let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
3090
                to_main_tx.clone(),
3091
                state_lock.clone(),
3092
                peer_address1,
3093
                hsd1,
3094
                true,
3095
                1,
3096
                block_4.header().timestamp,
3097
            );
3098
            peer_loop_handler
3099
                .run_wrapper(mock, from_main_rx_clone)
3100
                .await?;
3101

3102
            match to_main_rx1.recv().await {
3103
                Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
3104
                _ => bail!("Must receive remove of peer block max height"),
3105
            }
3106

3107
            // Verify that no block is sent to main loop.
3108
            match to_main_rx1.try_recv() {
3109
            Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (),
3110
            _ => bail!("Peer must not handle more fork-reconciliation blocks than specified in CLI arguments"),
3111
        };
3112
            drop(to_main_tx);
3113

3114
            // Verify that peer is sanctioned for failed fork reconciliation attempt
3115
            assert!(state_lock
3116
                .lock_guard()
3117
                .await
3118
                .net
3119
                .get_peer_standing_from_database(peer_address1.ip())
3120
                .await
3121
                .unwrap()
3122
                .standing
3123
                .is_negative());
3124

3125
            Ok(())
3126
        }
3127

3128
        #[traced_test]
3129
        #[apply(shared_tokio_runtime)]
3130
        async fn test_peer_loop_receival_of_fourth_block_one_block_in_db() {
3131
            // In this scenario, the client know the genesis block (block 0) and block 1, it
3132
            // then receives block 4, meaning that block 3 and 2 will have to be requested.
3133

3134
            let network = Network::Main;
3135
            let (
3136
                _peer_broadcast_tx,
3137
                from_main_rx_clone,
3138
                to_main_tx,
3139
                mut to_main_rx1,
3140
                mut state_lock,
3141
                hsd,
3142
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default())
3143
                .await
3144
                .unwrap();
3145
            let peer_address: SocketAddr = get_dummy_socket_address(0);
3146
            let genesis_block = Block::genesis(network);
3147
            let [block_1, block_2, block_3, block_4] = fake_valid_sequence_of_blocks_for_tests(
3148
                &genesis_block,
3149
                Timestamp::hours(1),
3150
                StdRng::seed_from_u64(5550001).random(),
3151
                network,
3152
            )
3153
            .await;
3154
            state_lock.set_new_tip(block_1.clone()).await.unwrap();
3155

3156
            let mock = Mock::new(vec![
3157
                Action::Read(PeerMessage::Block(Box::new(
3158
                    block_4.clone().try_into().unwrap(),
3159
                ))),
3160
                Action::Write(PeerMessage::BlockRequestByHash(block_3.hash())),
3161
                Action::Read(PeerMessage::Block(Box::new(
3162
                    block_3.clone().try_into().unwrap(),
3163
                ))),
3164
                Action::Write(PeerMessage::BlockRequestByHash(block_2.hash())),
3165
                Action::Read(PeerMessage::Block(Box::new(
3166
                    block_2.clone().try_into().unwrap(),
3167
                ))),
3168
                Action::Read(PeerMessage::Bye),
3169
            ]);
3170

3171
            let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
3172
                to_main_tx.clone(),
3173
                state_lock.clone(),
3174
                peer_address,
3175
                hsd,
3176
                true,
3177
                1,
3178
                block_4.header().timestamp,
3179
            );
3180
            peer_loop_handler
3181
                .run_wrapper(mock, from_main_rx_clone)
3182
                .await
3183
                .unwrap();
3184

3185
            let Some(PeerTaskToMain::NewBlocks(blocks)) = to_main_rx1.recv().await else {
3186
                panic!("Did not find msg sent to main task");
3187
            };
3188
            assert_eq!(blocks[0].hash(), block_2.hash());
3189
            assert_eq!(blocks[1].hash(), block_3.hash());
3190
            assert_eq!(blocks[2].hash(), block_4.hash());
3191

3192
            let Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) = to_main_rx1.recv().await else {
3193
                panic!("Must receive remove of peer block max height");
3194
            };
3195

3196
            assert!(
3197
                state_lock.lock_guard().await.net.peer_map.is_empty(),
3198
                "peer map must be empty after closing connection gracefully"
3199
            );
3200
        }
3201

3202
        #[traced_test]
3203
        #[apply(shared_tokio_runtime)]
3204
        async fn test_peer_loop_receival_of_third_block_no_blocks_in_db() -> Result<()> {
3205
            // In this scenario, the client only knows the genesis block (block 0) and then
3206
            // receives block 3, meaning that block 2 and 1 will have to be requested.
3207

3208
            let network = Network::Main;
3209
            let (
3210
                _peer_broadcast_tx,
3211
                from_main_rx_clone,
3212
                to_main_tx,
3213
                mut to_main_rx1,
3214
                state_lock,
3215
                hsd,
3216
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
3217
            let peer_address = get_dummy_socket_address(0);
3218
            let genesis_block = Block::genesis(network);
3219

3220
            let [block_1, block_2, block_3] = fake_valid_sequence_of_blocks_for_tests(
3221
                &genesis_block,
3222
                Timestamp::hours(1),
3223
                StdRng::seed_from_u64(5550001).random(),
3224
                network,
3225
            )
3226
            .await;
3227

3228
            let mock = Mock::new(vec![
3229
                Action::Read(PeerMessage::Block(Box::new(
3230
                    block_3.clone().try_into().unwrap(),
3231
                ))),
3232
                Action::Write(PeerMessage::BlockRequestByHash(block_2.hash())),
3233
                Action::Read(PeerMessage::Block(Box::new(
3234
                    block_2.clone().try_into().unwrap(),
3235
                ))),
3236
                Action::Write(PeerMessage::BlockRequestByHash(block_1.hash())),
3237
                Action::Read(PeerMessage::Block(Box::new(
3238
                    block_1.clone().try_into().unwrap(),
3239
                ))),
3240
                Action::Read(PeerMessage::Bye),
3241
            ]);
3242

3243
            let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
3244
                to_main_tx.clone(),
3245
                state_lock.clone(),
3246
                peer_address,
3247
                hsd,
3248
                true,
3249
                1,
3250
                block_3.header().timestamp,
3251
            );
3252
            peer_loop_handler
3253
                .run_wrapper(mock, from_main_rx_clone)
3254
                .await?;
3255

3256
            match to_main_rx1.recv().await {
3257
                Some(PeerTaskToMain::NewBlocks(blocks)) => {
3258
                    if blocks[0].hash() != block_1.hash() {
3259
                        bail!("1st received block by main loop must be block 1");
3260
                    }
3261
                    if blocks[1].hash() != block_2.hash() {
3262
                        bail!("2nd received block by main loop must be block 2");
3263
                    }
3264
                    if blocks[2].hash() != block_3.hash() {
3265
                        bail!("3rd received block by main loop must be block 3");
3266
                    }
3267
                }
3268
                _ => bail!("Did not find msg sent to main task"),
3269
            };
3270
            match to_main_rx1.recv().await {
3271
                Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
3272
                _ => bail!("Must receive remove of peer block max height"),
3273
            }
3274

3275
            if !state_lock.lock_guard().await.net.peer_map.is_empty() {
3276
                bail!("peer map must be empty after closing connection gracefully");
3277
            }
3278

3279
            Ok(())
3280
        }
3281

3282
        #[traced_test]
3283
        #[apply(shared_tokio_runtime)]
3284
        async fn test_block_reconciliation_interrupted_by_block_notification() -> Result<()> {
3285
            // In this scenario, the client know the genesis block (block 0) and block 1, it
3286
            // then receives block 4, meaning that block 3, 2, and 1 will have to be requested.
3287
            // But the requests are interrupted by the peer sending another message: a new block
3288
            // notification.
3289

3290
            let network = Network::Main;
3291
            let (
3292
                _peer_broadcast_tx,
3293
                from_main_rx_clone,
3294
                to_main_tx,
3295
                mut to_main_rx1,
3296
                mut state_lock,
3297
                hsd,
3298
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
3299
            let peer_socket_address: SocketAddr = get_dummy_socket_address(0);
3300
            let genesis_block: Block = state_lock
3301
                .lock_guard()
3302
                .await
3303
                .chain
3304
                .archival_state()
3305
                .get_tip()
3306
                .await;
3307

3308
            let [block_1, block_2, block_3, block_4, block_5] =
3309
                fake_valid_sequence_of_blocks_for_tests(
3310
                    &genesis_block,
3311
                    Timestamp::hours(1),
3312
                    StdRng::seed_from_u64(5550001).random(),
3313
                    network,
3314
                )
3315
                .await;
3316
            state_lock.set_new_tip(block_1.clone()).await?;
3317

3318
            let mock = Mock::new(vec![
3319
                Action::Read(PeerMessage::Block(Box::new(
3320
                    block_4.clone().try_into().unwrap(),
3321
                ))),
3322
                Action::Write(PeerMessage::BlockRequestByHash(block_3.hash())),
3323
                Action::Read(PeerMessage::Block(Box::new(
3324
                    block_3.clone().try_into().unwrap(),
3325
                ))),
3326
                Action::Write(PeerMessage::BlockRequestByHash(block_2.hash())),
3327
                //
3328
                // Now make the interruption of the block reconciliation process
3329
                Action::Read(PeerMessage::BlockNotification((&block_5).into())),
3330
                //
3331
                // Complete the block reconciliation process by requesting the last block
3332
                // in this process, to get back to a mutually known block.
3333
                Action::Read(PeerMessage::Block(Box::new(
3334
                    block_2.clone().try_into().unwrap(),
3335
                ))),
3336
                //
3337
                // Then anticipate the request of the block that was announced
3338
                // in the interruption.
3339
                // Note that we cannot anticipate the response, as only the main
3340
                // task writes to the database. And the database needs to be updated
3341
                // for the handling of block 5 to be done correctly.
3342
                Action::Write(PeerMessage::BlockRequestByHeight(
3343
                    block_5.kernel.header.height,
3344
                )),
3345
                Action::Read(PeerMessage::Bye),
3346
            ]);
3347

3348
            let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
3349
                to_main_tx.clone(),
3350
                state_lock.clone(),
3351
                peer_socket_address,
3352
                hsd,
3353
                false,
3354
                1,
3355
                block_5.header().timestamp,
3356
            );
3357
            peer_loop_handler
3358
                .run_wrapper(mock, from_main_rx_clone)
3359
                .await?;
3360

3361
            match to_main_rx1.recv().await {
3362
                Some(PeerTaskToMain::NewBlocks(blocks)) => {
3363
                    if blocks[0].hash() != block_2.hash() {
3364
                        bail!("1st received block by main loop must be block 1");
3365
                    }
3366
                    if blocks[1].hash() != block_3.hash() {
3367
                        bail!("2nd received block by main loop must be block 2");
3368
                    }
3369
                    if blocks[2].hash() != block_4.hash() {
3370
                        bail!("3rd received block by main loop must be block 3");
3371
                    }
3372
                }
3373
                _ => bail!("Did not find msg sent to main task"),
3374
            };
3375
            match to_main_rx1.recv().await {
3376
                Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
3377
                _ => bail!("Must receive remove of peer block max height"),
3378
            }
3379

3380
            if !state_lock.lock_guard().await.net.peer_map.is_empty() {
3381
                bail!("peer map must be empty after closing connection gracefully");
3382
            }
3383

3384
            Ok(())
3385
        }
3386

3387
        #[traced_test]
3388
        #[apply(shared_tokio_runtime)]
3389
        async fn test_block_reconciliation_interrupted_by_peer_list_request() -> Result<()> {
3390
            // In this scenario, the client knows the genesis block (block 0) and block 1, it
3391
            // then receives block 4, meaning that block 3, 2, and 1 will have to be requested.
3392
            // But the requests are interrupted by the peer sending another message: a request
3393
            // for a list of peers.
3394

3395
            let network = Network::Main;
3396
            let (
3397
                _peer_broadcast_tx,
3398
                from_main_rx_clone,
3399
                to_main_tx,
3400
                mut to_main_rx1,
3401
                mut state_lock,
3402
                _hsd,
3403
            ) = get_test_genesis_setup(network, 1, cli_args::Args::default()).await?;
3404
            let genesis_block = Block::genesis(network);
3405
            let peer_infos: Vec<PeerInfo> = state_lock
3406
                .lock_guard()
3407
                .await
3408
                .net
3409
                .peer_map
3410
                .clone()
3411
                .into_values()
3412
                .collect::<Vec<_>>();
3413

3414
            let [block_1, block_2, block_3, block_4] = fake_valid_sequence_of_blocks_for_tests(
3415
                &genesis_block,
3416
                Timestamp::hours(1),
3417
                StdRng::seed_from_u64(5550001).random(),
3418
                network,
3419
            )
3420
            .await;
3421
            state_lock.set_new_tip(block_1.clone()).await?;
3422

3423
            let (hsd_1, sa_1) = get_dummy_peer_connection_data_genesis(network, 1);
3424
            let expected_peer_list_resp = vec![
3425
                (
3426
                    peer_infos[0].listen_address().unwrap(),
3427
                    peer_infos[0].instance_id(),
3428
                ),
3429
                (sa_1, hsd_1.instance_id),
3430
            ];
3431
            let mock = Mock::new(vec![
3432
                Action::Read(PeerMessage::Block(Box::new(
3433
                    block_4.clone().try_into().unwrap(),
3434
                ))),
3435
                Action::Write(PeerMessage::BlockRequestByHash(block_3.hash())),
3436
                Action::Read(PeerMessage::Block(Box::new(
3437
                    block_3.clone().try_into().unwrap(),
3438
                ))),
3439
                Action::Write(PeerMessage::BlockRequestByHash(block_2.hash())),
3440
                //
3441
                // Now make the interruption of the block reconciliation process
3442
                Action::Read(PeerMessage::PeerListRequest),
3443
                //
3444
                // Answer the request for a peer list
3445
                Action::Write(PeerMessage::PeerListResponse(expected_peer_list_resp)),
3446
                //
3447
                // Complete the block reconciliation process by requesting the last block
3448
                // in this process, to get back to a mutually known block.
3449
                Action::Read(PeerMessage::Block(Box::new(
3450
                    block_2.clone().try_into().unwrap(),
3451
                ))),
3452
                Action::Read(PeerMessage::Bye),
3453
            ]);
3454

3455
            let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
3456
                to_main_tx,
3457
                state_lock.clone(),
3458
                sa_1,
3459
                hsd_1,
3460
                true,
3461
                1,
3462
                block_4.header().timestamp,
3463
            );
3464
            peer_loop_handler
3465
                .run_wrapper(mock, from_main_rx_clone)
3466
                .await?;
3467

3468
            // Verify that blocks are sent to `main_loop` in expected ordering
3469
            match to_main_rx1.recv().await {
3470
                Some(PeerTaskToMain::NewBlocks(blocks)) => {
3471
                    if blocks[0].hash() != block_2.hash() {
3472
                        bail!("1st received block by main loop must be block 1");
3473
                    }
3474
                    if blocks[1].hash() != block_3.hash() {
3475
                        bail!("2nd received block by main loop must be block 2");
3476
                    }
3477
                    if blocks[2].hash() != block_4.hash() {
3478
                        bail!("3rd received block by main loop must be block 3");
3479
                    }
3480
                }
3481
                _ => bail!("Did not find msg sent to main task"),
3482
            };
3483

3484
            assert_eq!(
3485
                1,
3486
                state_lock.lock_guard().await.net.peer_map.len(),
3487
                "One peer must remain in peer list after peer_1 closed gracefully"
3488
            );
3489

3490
            Ok(())
3491
        }
3492
    }
3493

3494
    mod transactions {
3495
        use crate::main_loop::proof_upgrader::PrimitiveWitnessToProofCollection;
3496
        use crate::main_loop::proof_upgrader::PrimitiveWitnessToSingleProof;
3497
        use crate::models::blockchain::transaction::primitive_witness::PrimitiveWitness;
3498
        use crate::models::proof_abstractions::tasm::program::TritonVmProofJobOptions;
3499
        use crate::tests::shared::blocks::fake_deterministic_successor;
3500
        use crate::tests::shared::mock_tx::genesis_tx_with_proof_type;
3501
        use crate::triton_vm_job_queue::vm_job_queue;
3502

3503
        use super::*;
3504

3505
        #[traced_test]
3506
        #[apply(shared_tokio_runtime)]
3507
        async fn receive_transaction_request() {
3508
            let network = Network::Main;
3509
            let dummy_tx = invalid_empty_single_proof_transaction();
3510
            let txid = dummy_tx.kernel.txid();
3511

3512
            for transaction_is_known in [false, true] {
3513
                let (_peer_broadcast_tx, from_main_rx, to_main_tx, _, mut state_lock, _hsd) =
3514
                    get_test_genesis_setup(network, 1, cli_args::Args::default())
3515
                        .await
3516
                        .unwrap();
3517
                if transaction_is_known {
3518
                    state_lock
3519
                        .lock_guard_mut()
3520
                        .await
3521
                        .mempool_insert(dummy_tx.clone(), UpgradePriority::Irrelevant)
3522
                        .await;
3523
                }
3524

3525
                let mock = if transaction_is_known {
3526
                    Mock::new(vec![
3527
                        Action::Read(PeerMessage::TransactionRequest(txid)),
3528
                        Action::Write(PeerMessage::Transaction(Box::new(
3529
                            (&dummy_tx).try_into().unwrap(),
3530
                        ))),
3531
                        Action::Read(PeerMessage::Bye),
3532
                    ])
3533
                } else {
3534
                    Mock::new(vec![
3535
                        Action::Read(PeerMessage::TransactionRequest(txid)),
3536
                        Action::Read(PeerMessage::Bye),
3537
                    ])
3538
                };
3539

3540
                let hsd = get_dummy_handshake_data_for_genesis(network);
3541
                let mut peer_state = MutablePeerState::new(hsd.tip_header.height);
3542
                let mut peer_loop_handler = PeerLoopHandler::new(
3543
                    to_main_tx,
3544
                    state_lock,
3545
                    get_dummy_socket_address(0),
3546
                    hsd,
3547
                    true,
3548
                    1,
3549
                );
3550

3551
                peer_loop_handler
3552
                    .run(mock, from_main_rx, &mut peer_state)
3553
                    .await
3554
                    .unwrap();
3555
            }
3556
        }
3557

3558
        #[traced_test]
3559
        #[apply(shared_tokio_runtime)]
3560
        async fn empty_mempool_request_tx_test() {
3561
            // In this scenario the client receives a transaction notification from
3562
            // a peer of a transaction it doesn't know; the client must then request it.
3563

3564
            let network = Network::Main;
3565
            let (
3566
                _peer_broadcast_tx,
3567
                from_main_rx_clone,
3568
                to_main_tx,
3569
                mut to_main_rx1,
3570
                state_lock,
3571
                _hsd,
3572
            ) = get_test_genesis_setup(network, 1, cli_args::Args::default())
3573
                .await
3574
                .unwrap();
3575

3576
            let spending_key = state_lock
3577
                .lock_guard()
3578
                .await
3579
                .wallet_state
3580
                .wallet_entropy
3581
                .nth_symmetric_key_for_tests(0);
3582
            let genesis_block = Block::genesis(network);
3583
            let now = genesis_block.kernel.header.timestamp;
3584
            let config = TxCreationConfig::default()
3585
                .recover_change_off_chain(spending_key.into())
3586
                .with_prover_capability(TxProvingCapability::ProofCollection);
3587
            let consensus_rule_set = ConsensusRuleSet::infer_from(network, BlockHeight::genesis());
3588
            let transaction_1: Transaction = state_lock
3589
                .api()
3590
                .tx_initiator_internal()
3591
                .create_transaction(
3592
                    Default::default(),
3593
                    NativeCurrencyAmount::coins(0),
3594
                    now,
3595
                    config,
3596
                    consensus_rule_set,
3597
                )
3598
                .await
3599
                .unwrap()
3600
                .transaction
3601
                .into();
3602

3603
            // Build the resulting transaction notification
3604
            let tx_notification: TransactionNotification = (&transaction_1).try_into().unwrap();
3605
            let mock = Mock::new(vec![
3606
                Action::Read(PeerMessage::TransactionNotification(tx_notification)),
3607
                Action::Write(PeerMessage::TransactionRequest(tx_notification.txid)),
3608
                Action::Read(PeerMessage::Transaction(Box::new(
3609
                    (&transaction_1).try_into().unwrap(),
3610
                ))),
3611
                Action::Read(PeerMessage::Bye),
3612
            ]);
3613

3614
            let (hsd_1, _sa_1) = get_dummy_peer_connection_data_genesis(network, 1);
3615

3616
            // Mock a timestamp to allow transaction to be considered valid
3617
            let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
3618
                to_main_tx,
3619
                state_lock.clone(),
3620
                get_dummy_socket_address(0),
3621
                hsd_1,
3622
                true,
3623
                1,
3624
                now,
3625
            );
3626

3627
            let mut peer_state = MutablePeerState::new(hsd_1.tip_header.height);
3628

3629
            assert!(
3630
                state_lock.lock_guard().await.mempool.is_empty(),
3631
                "Mempool must be empty at init"
3632
            );
3633
            peer_loop_handler
3634
                .run(mock, from_main_rx_clone, &mut peer_state)
3635
                .await
3636
                .unwrap();
3637

3638
            // Transaction must be sent to `main_loop`. The transaction is stored to the mempool
3639
            // by the `main_loop`.
3640
            match to_main_rx1.recv().await {
3641
                Some(PeerTaskToMain::Transaction(_)) => (),
3642
                _ => panic!("Must receive remove of peer block max height"),
3643
            };
3644
        }
3645

3646
        #[traced_test]
3647
        #[apply(shared_tokio_runtime)]
3648
        async fn populated_mempool_request_tx_test() -> Result<()> {
3649
            // In this scenario the peer is informed of a transaction that it already knows
3650

3651
            let network = Network::Main;
3652
            let (
3653
                _peer_broadcast_tx,
3654
                from_main_rx_clone,
3655
                to_main_tx,
3656
                mut to_main_rx1,
3657
                mut state_lock,
3658
                _hsd,
3659
            ) = get_test_genesis_setup(network, 1, cli_args::Args::default())
3660
                .await
3661
                .unwrap();
3662
            let spending_key = state_lock
3663
                .lock_guard()
3664
                .await
3665
                .wallet_state
3666
                .wallet_entropy
3667
                .nth_symmetric_key_for_tests(0);
3668

3669
            let genesis_block = Block::genesis(network);
3670
            let now = genesis_block.kernel.header.timestamp;
3671
            let config = TxCreationConfig::default()
3672
                .recover_change_off_chain(spending_key.into())
3673
                .with_prover_capability(TxProvingCapability::ProofCollection);
3674
            let consensus_rule_set = ConsensusRuleSet::infer_from(network, BlockHeight::genesis());
3675
            let transaction_1: Transaction = state_lock
3676
                .api()
3677
                .tx_initiator_internal()
3678
                .create_transaction(
3679
                    Default::default(),
3680
                    NativeCurrencyAmount::coins(0),
3681
                    now,
3682
                    config,
3683
                    consensus_rule_set,
3684
                )
3685
                .await
3686
                .unwrap()
3687
                .transaction
3688
                .into();
3689

3690
            let (hsd_1, _sa_1) = get_dummy_peer_connection_data_genesis(network, 1);
3691
            let mut peer_loop_handler = PeerLoopHandler::new(
3692
                to_main_tx,
3693
                state_lock.clone(),
3694
                get_dummy_socket_address(0),
3695
                hsd_1,
3696
                true,
3697
                1,
3698
            );
3699
            let mut peer_state = MutablePeerState::new(hsd_1.tip_header.height);
3700

3701
            assert!(
3702
                state_lock.lock_guard().await.mempool.is_empty(),
3703
                "Mempool must be empty at init"
3704
            );
3705
            state_lock
3706
                .lock_guard_mut()
3707
                .await
3708
                .mempool_insert(transaction_1.clone(), UpgradePriority::Irrelevant)
3709
                .await;
3710
            assert!(
3711
                !state_lock.lock_guard().await.mempool.is_empty(),
3712
                "Mempool must be non-empty after insertion"
3713
            );
3714

3715
            // Run the peer loop and verify expected exchange -- namely that the
3716
            // tx notification is received and the the transaction is *not*
3717
            // requested.
3718
            let tx_notification: TransactionNotification = (&transaction_1).try_into().unwrap();
3719
            let mock = Mock::new(vec![
3720
                Action::Read(PeerMessage::TransactionNotification(tx_notification)),
3721
                Action::Read(PeerMessage::Bye),
3722
            ]);
3723
            peer_loop_handler
3724
                .run(mock, from_main_rx_clone, &mut peer_state)
3725
                .await
3726
                .unwrap();
3727

3728
            // nothing is allowed to be sent to `main_loop`
3729
            match to_main_rx1.try_recv() {
3730
                Err(TryRecvError::Empty) => (),
3731
                Err(TryRecvError::Disconnected) => panic!("to_main channel must still be open"),
3732
                Ok(_) => panic!("to_main channel must be empty"),
3733
            };
3734
            Ok(())
3735
        }
3736

3737
        #[traced_test]
3738
        #[apply(shared_tokio_runtime)]
3739
        async fn accepts_tx_with_updated_mutator_set() {
3740
            // Scenario: node has transaction in mempool and receives
3741
            // transaction notification for the same transaction with an updated
3742
            // mutator set. The node must request the new transaction and it
3743
            // must be passed on to main loop.
3744
            //
3745
            // Both ProofCollection and SingleProof backed transactions are
3746
            // tested.
3747

3748
            enum ProofType {
3749
                ProofCollection,
3750
                SingleProof,
3751
            }
3752

3753
            let network = Network::Main;
3754

3755
            for proof_type in [ProofType::ProofCollection, ProofType::SingleProof] {
3756
                let proof_job_options = TritonVmProofJobOptions::default();
3757
                let upgrade =
3758
                    async |primitive_witness: PrimitiveWitness,
3759
                           consensus_rule_set: ConsensusRuleSet| {
3760
                        match proof_type {
3761
                            ProofType::ProofCollection => {
3762
                                PrimitiveWitnessToProofCollection { primitive_witness }
3763
                                    .upgrade(vm_job_queue(), &proof_job_options)
3764
                                    .await
3765
                                    .unwrap()
3766
                            }
3767
                            ProofType::SingleProof => {
3768
                                PrimitiveWitnessToSingleProof { primitive_witness }
3769
                                    .upgrade(vm_job_queue(), &proof_job_options, consensus_rule_set)
3770
                                    .await
3771
                                    .unwrap()
3772
                            }
3773
                        }
3774
                    };
3775

3776
                let (
3777
                    _peer_broadcast_tx,
3778
                    from_main_rx_clone,
3779
                    to_main_tx,
3780
                    mut to_main_rx1,
3781
                    mut state_lock,
3782
                    _hsd,
3783
                ) = get_test_genesis_setup(network, 1, cli_args::Args::default())
3784
                    .await
3785
                    .unwrap();
3786
                let consensus_rule_set =
3787
                    ConsensusRuleSet::infer_from(network, BlockHeight::genesis());
3788
                let fee = NativeCurrencyAmount::from_nau(500);
3789
                let pw_genesis =
3790
                    genesis_tx_with_proof_type(TxProvingCapability::PrimitiveWitness, network, fee)
3791
                        .await
3792
                        .proof
3793
                        .clone()
3794
                        .into_primitive_witness();
3795

3796
                let tx_synced_to_genesis = upgrade(pw_genesis.clone(), consensus_rule_set).await;
3797

3798
                let genesis_block = Block::genesis(network);
3799
                let block1 = fake_deterministic_successor(&genesis_block, network).await;
3800
                state_lock
3801
                    .lock_guard_mut()
3802
                    .await
3803
                    .set_new_tip(block1.clone())
3804
                    .await
3805
                    .unwrap();
3806

3807
                state_lock
3808
                    .lock_guard_mut()
3809
                    .await
3810
                    .mempool_insert(tx_synced_to_genesis, UpgradePriority::Irrelevant)
3811
                    .await;
3812

3813
                // Mempool should now contain the unsynced transaction. Tip is block 1.
3814
                let pw_block1 =
3815
                    pw_genesis.update_with_new_ms_data(block1.mutator_set_update().unwrap());
3816
                let tx_synced_to_block1 = upgrade(pw_block1, consensus_rule_set).await;
3817

3818
                let tx_notification: TransactionNotification =
3819
                    (&tx_synced_to_block1).try_into().unwrap();
3820
                let mock = Mock::new(vec![
3821
                    Action::Read(PeerMessage::TransactionNotification(tx_notification)),
3822
                    Action::Write(PeerMessage::TransactionRequest(tx_notification.txid)),
3823
                    Action::Read(PeerMessage::Transaction(Box::new(
3824
                        (&tx_synced_to_block1).try_into().unwrap(),
3825
                    ))),
3826
                    Action::Read(PeerMessage::Bye),
3827
                ]);
3828

3829
                // Mock a timestamp to allow transaction to be considered valid
3830
                let now = tx_synced_to_block1.kernel.timestamp;
3831
                let (hsd_1, _sa_1) = get_dummy_peer_connection_data_genesis(network, 1);
3832
                let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
3833
                    to_main_tx,
3834
                    state_lock.clone(),
3835
                    get_dummy_socket_address(0),
3836
                    hsd_1,
3837
                    true,
3838
                    1,
3839
                    now,
3840
                );
3841

3842
                let mut peer_state = MutablePeerState::new(hsd_1.tip_header.height);
3843
                peer_loop_handler
3844
                    .run(mock, from_main_rx_clone, &mut peer_state)
3845
                    .await
3846
                    .unwrap();
3847

3848
                // Transaction must be sent to `main_loop`. The transaction is stored to the mempool
3849
                // by the `main_loop`.
3850
                match to_main_rx1.recv().await {
3851
                    Some(PeerTaskToMain::Transaction(_)) => (),
3852
                    _ => panic!("Main loop must receive new transaction"),
3853
                };
3854
            }
3855
        }
3856
    }
3857

3858
    mod block_proposals {
3859
        use super::*;
3860

3861
        struct TestSetup {
3862
            peer_loop_handler: PeerLoopHandler,
3863
            to_main_rx: mpsc::Receiver<PeerTaskToMain>,
3864
            from_main_rx: broadcast::Receiver<MainToPeerTask>,
3865
            peer_state: MutablePeerState,
3866
            to_main_tx: mpsc::Sender<PeerTaskToMain>,
3867
            genesis_block: Block,
3868
            peer_broadcast_tx: broadcast::Sender<MainToPeerTask>,
3869
        }
3870

3871
        async fn genesis_setup(network: Network) -> TestSetup {
3872
            let (peer_broadcast_tx, from_main_rx, to_main_tx, to_main_rx, alice, _hsd) =
3873
                get_test_genesis_setup(network, 0, cli_args::Args::default())
3874
                    .await
3875
                    .unwrap();
3876
            let peer_hsd = get_dummy_handshake_data_for_genesis(network);
3877
            let peer_loop_handler = PeerLoopHandler::new(
3878
                to_main_tx.clone(),
3879
                alice.clone(),
3880
                get_dummy_socket_address(0),
3881
                peer_hsd,
3882
                true,
3883
                1,
3884
            );
3885
            let peer_state = MutablePeerState::new(peer_hsd.tip_header.height);
3886

3887
            // (peer_loop_handler, to_main_rx1)
3888
            TestSetup {
3889
                peer_broadcast_tx,
3890
                peer_loop_handler,
3891
                to_main_rx,
3892
                from_main_rx,
3893
                peer_state,
3894
                to_main_tx,
3895
                genesis_block: Block::genesis(network),
3896
            }
3897
        }
3898

3899
        #[traced_test]
3900
        #[apply(shared_tokio_runtime)]
3901
        async fn accept_block_proposal_height_one() {
3902
            // Node knows genesis block, receives a block proposal for block 1
3903
            // and must accept this. Verify that main loop is informed of block
3904
            // proposal.
3905
            let TestSetup {
3906
                peer_broadcast_tx,
3907
                mut peer_loop_handler,
3908
                mut to_main_rx,
3909
                from_main_rx,
3910
                mut peer_state,
3911
                to_main_tx,
3912
                genesis_block,
3913
            } = genesis_setup(Network::Main).await;
3914
            let block1 = fake_valid_block_for_tests(
3915
                &peer_loop_handler.global_state_lock,
3916
                StdRng::seed_from_u64(5550001).random(),
3917
            )
3918
            .await;
3919

3920
            let mock = Mock::new(vec![
3921
                Action::Read(PeerMessage::BlockProposal(Box::new(block1))),
3922
                Action::Read(PeerMessage::Bye),
3923
            ]);
3924
            peer_loop_handler
3925
                .run(mock, from_main_rx, &mut peer_state)
3926
                .await
3927
                .unwrap();
3928

3929
            match to_main_rx.try_recv().unwrap() {
3930
                PeerTaskToMain::BlockProposal(block) => {
3931
                    assert_eq!(genesis_block.hash(), block.header().prev_block_digest);
3932
                }
3933
                _ => panic!("Expected main loop to be informed of block proposal"),
3934
            };
3935

3936
            drop(to_main_tx);
3937
            drop(peer_broadcast_tx);
3938
        }
3939

3940
        #[traced_test]
3941
        #[apply(shared_tokio_runtime)]
3942
        async fn accept_block_proposal_notification_height_one() {
3943
            // Node knows genesis block, receives a block proposal notification
3944
            // for block 1 and must accept this by requesting the block
3945
            // proposal from peer.
3946
            let TestSetup {
3947
                peer_broadcast_tx,
3948
                mut peer_loop_handler,
3949
                to_main_rx: _,
3950
                from_main_rx,
3951
                mut peer_state,
3952
                to_main_tx,
3953
                ..
3954
            } = genesis_setup(Network::Main).await;
3955
            let block1 = fake_valid_block_for_tests(
3956
                &peer_loop_handler.global_state_lock,
3957
                StdRng::seed_from_u64(5550001).random(),
3958
            )
3959
            .await;
3960

3961
            let mock = Mock::new(vec![
3962
                Action::Read(PeerMessage::BlockProposalNotification((&block1).into())),
3963
                Action::Write(PeerMessage::BlockProposalRequest(
3964
                    BlockProposalRequest::new(block1.body().mast_hash()),
3965
                )),
3966
                Action::Read(PeerMessage::Bye),
3967
            ]);
3968
            peer_loop_handler
3969
                .run(mock, from_main_rx, &mut peer_state)
3970
                .await
3971
                .unwrap();
3972

3973
            drop(to_main_tx);
3974
            drop(peer_broadcast_tx);
3975
        }
3976
    }
3977

3978
    mod proof_qualities {
3979
        use strum::IntoEnumIterator;
3980

3981
        use super::*;
3982
        use crate::config_models::cli_args;
3983
        use crate::models::blockchain::transaction::Transaction;
3984
        use crate::models::peer::transfer_transaction::TransactionProofQuality;
3985
        use crate::models::state::wallet::transaction_output::TxOutput;
3986
        use crate::tests::shared::globalstate::mock_genesis_global_state;
3987

3988
        async fn tx_of_proof_quality(
3989
            network: Network,
3990
            quality: TransactionProofQuality,
3991
        ) -> Transaction {
3992
            let wallet_secret = WalletEntropy::devnet_wallet();
3993
            let alice_key = wallet_secret.nth_generation_spending_key_for_tests(0);
3994
            let alice_gsl = mock_genesis_global_state(
3995
                1,
3996
                wallet_secret,
3997
                cli_args::Args::default_with_network(network),
3998
            )
3999
            .await;
4000
            let alice = alice_gsl.lock_guard().await;
4001
            let genesis_block = alice.chain.light_state();
4002
            let in_seven_months = genesis_block.header().timestamp + Timestamp::months(7);
4003
            let prover_capability = match quality {
4004
                TransactionProofQuality::ProofCollection => TxProvingCapability::ProofCollection,
4005
                TransactionProofQuality::SingleProof => TxProvingCapability::SingleProof,
4006
            };
4007
            let config = TxCreationConfig::default()
4008
                .recover_change_off_chain(alice_key.into())
4009
                .with_prover_capability(prover_capability);
4010
            let block_height = genesis_block.header().height;
4011
            let consensus_rule_set = ConsensusRuleSet::infer_from(network, block_height);
4012
            alice_gsl
4013
                .api()
4014
                .tx_initiator_internal()
4015
                .create_transaction(
4016
                    Vec::<TxOutput>::new().into(),
4017
                    NativeCurrencyAmount::coins(1),
4018
                    in_seven_months,
4019
                    config,
4020
                    consensus_rule_set,
4021
                )
4022
                .await
4023
                .unwrap()
4024
                .transaction()
4025
                .clone()
4026
        }
4027

4028
        #[traced_test]
4029
        #[apply(shared_tokio_runtime)]
4030
        async fn client_favors_higher_proof_quality() {
4031
            // In this scenario the peer is informed of a transaction that it
4032
            // already knows, and it's tested that it checks the proof quality
4033
            // field and verifies that it exceeds the proof in the mempool
4034
            // before requesting the transasction.
4035
            let network = Network::Main;
4036
            let proof_collection_tx =
4037
                tx_of_proof_quality(network, TransactionProofQuality::ProofCollection).await;
4038
            let single_proof_tx =
4039
                tx_of_proof_quality(network, TransactionProofQuality::SingleProof).await;
4040

4041
            for (own_tx_pq, new_tx_pq) in
4042
                TransactionProofQuality::iter().cartesian_product(TransactionProofQuality::iter())
4043
            {
4044
                use TransactionProofQuality::*;
4045

4046
                let (
4047
                    _peer_broadcast_tx,
4048
                    from_main_rx_clone,
4049
                    to_main_tx,
4050
                    mut to_main_rx1,
4051
                    mut alice,
4052
                    handshake,
4053
                ) = get_test_genesis_setup(network, 1, cli_args::Args::default())
4054
                    .await
4055
                    .unwrap();
4056

4057
                let (own_tx, new_tx) = match (own_tx_pq, new_tx_pq) {
4058
                    (ProofCollection, ProofCollection) => {
4059
                        (&proof_collection_tx, &proof_collection_tx)
4060
                    }
4061
                    (ProofCollection, SingleProof) => (&proof_collection_tx, &single_proof_tx),
4062
                    (SingleProof, ProofCollection) => (&single_proof_tx, &proof_collection_tx),
4063
                    (SingleProof, SingleProof) => (&single_proof_tx, &single_proof_tx),
4064
                };
4065

4066
                alice
4067
                    .lock_guard_mut()
4068
                    .await
4069
                    .mempool_insert((*own_tx).to_owned(), UpgradePriority::Irrelevant)
4070
                    .await;
4071

4072
                let tx_notification: TransactionNotification = new_tx.try_into().unwrap();
4073

4074
                let own_proof_is_supreme = own_tx_pq >= new_tx_pq;
4075
                let mock = if own_proof_is_supreme {
4076
                    Mock::new(vec![
4077
                        Action::Read(PeerMessage::TransactionNotification(tx_notification)),
4078
                        Action::Read(PeerMessage::Bye),
4079
                    ])
4080
                } else {
4081
                    Mock::new(vec![
4082
                        Action::Read(PeerMessage::TransactionNotification(tx_notification)),
4083
                        Action::Write(PeerMessage::TransactionRequest(tx_notification.txid)),
4084
                        Action::Read(PeerMessage::Transaction(Box::new(
4085
                            new_tx.try_into().unwrap(),
4086
                        ))),
4087
                        Action::Read(PeerMessage::Bye),
4088
                    ])
4089
                };
4090

4091
                let now = proof_collection_tx.kernel.timestamp;
4092
                let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
4093
                    to_main_tx,
4094
                    alice.clone(),
4095
                    get_dummy_socket_address(0),
4096
                    handshake,
4097
                    true,
4098
                    1,
4099
                    now,
4100
                );
4101
                let mut peer_state = MutablePeerState::new(handshake.tip_header.height);
4102

4103
                peer_loop_handler
4104
                    .run(mock, from_main_rx_clone, &mut peer_state)
4105
                    .await
4106
                    .unwrap();
4107

4108
                if own_proof_is_supreme {
4109
                    match to_main_rx1.try_recv() {
4110
                        Err(TryRecvError::Empty) => (),
4111
                        Err(TryRecvError::Disconnected) => {
4112
                            panic!("to_main channel must still be open")
4113
                        }
4114
                        Ok(_) => panic!("to_main channel must be empty"),
4115
                    }
4116
                } else {
4117
                    match to_main_rx1.try_recv() {
4118
                        Err(TryRecvError::Empty) => panic!("Transaction must be sent to main loop"),
4119
                        Err(TryRecvError::Disconnected) => {
4120
                            panic!("to_main channel must still be open")
4121
                        }
4122
                        Ok(PeerTaskToMain::Transaction(_)) => (),
4123
                        _ => panic!("Unexpected result from channel"),
4124
                    }
4125
                }
4126
            }
4127
        }
4128
    }
4129

4130
    mod sync_challenges {
4131
        use super::*;
4132
        use crate::tests::shared::blocks::fake_valid_sequence_of_blocks_for_tests_dyn;
4133

4134
        #[traced_test]
4135
        #[apply(shared_tokio_runtime)]
4136
        async fn bad_sync_challenge_height_greater_than_tip() {
4137
            // Criterium: Challenge height may not exceed that of tip in the
4138
            // request.
4139

4140
            let network = Network::Main;
4141
            let (
4142
                _alice_main_to_peer_tx,
4143
                alice_main_to_peer_rx,
4144
                alice_peer_to_main_tx,
4145
                alice_peer_to_main_rx,
4146
                mut alice,
4147
                alice_hsd,
4148
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default())
4149
                .await
4150
                .unwrap();
4151
            let genesis_block: Block = Block::genesis(network);
4152
            let blocks: [Block; 11] = fake_valid_sequence_of_blocks_for_tests(
4153
                &genesis_block,
4154
                Timestamp::hours(1),
4155
                Default::default(),
4156
                network,
4157
            )
4158
            .await;
4159
            for block in &blocks {
4160
                alice.set_new_tip(block.clone()).await.unwrap();
4161
            }
4162

4163
            let bh12 = blocks.last().unwrap().header().height;
4164
            let sync_challenge = SyncChallenge {
4165
                tip_digest: blocks[9].hash(),
4166
                challenges: [bh12; 10],
4167
            };
4168
            let alice_p2p_messages = Mock::new(vec![
4169
                Action::Read(PeerMessage::SyncChallenge(sync_challenge)),
4170
                Action::Read(PeerMessage::Bye),
4171
            ]);
4172

4173
            let peer_address = get_dummy_socket_address(0);
4174
            let mut alice_peer_loop_handler = PeerLoopHandler::new(
4175
                alice_peer_to_main_tx.clone(),
4176
                alice.clone(),
4177
                peer_address,
4178
                alice_hsd,
4179
                false,
4180
                1,
4181
            );
4182
            alice_peer_loop_handler
4183
                .run_wrapper(alice_p2p_messages, alice_main_to_peer_rx)
4184
                .await
4185
                .unwrap();
4186

4187
            drop(alice_peer_to_main_rx);
4188

4189
            let latest_sanction = alice
4190
                .lock_guard()
4191
                .await
4192
                .net
4193
                .get_peer_standing_from_database(peer_address.ip())
4194
                .await
4195
                .unwrap();
4196
            assert_eq!(
4197
                NegativePeerSanction::InvalidSyncChallenge,
4198
                latest_sanction
4199
                    .latest_punishment
4200
                    .expect("peer must be sanctioned")
4201
                    .0
4202
            );
4203
        }
4204

4205
        #[traced_test]
4206
        #[apply(shared_tokio_runtime)]
4207
        async fn bad_sync_challenge_genesis_block_doesnt_crash_client() {
4208
            // Criterium: Challenge may not point to genesis block, or block 1, as
4209
            // tip.
4210

4211
            let network = Network::Main;
4212
            let genesis_block: Block = Block::genesis(network);
4213

4214
            let alice_cli = cli_args::Args::default();
4215
            let (
4216
                _alice_main_to_peer_tx,
4217
                alice_main_to_peer_rx,
4218
                alice_peer_to_main_tx,
4219
                alice_peer_to_main_rx,
4220
                alice,
4221
                alice_hsd,
4222
            ) = get_test_genesis_setup(network, 0, alice_cli).await.unwrap();
4223

4224
            let sync_challenge = SyncChallenge {
4225
                tip_digest: genesis_block.hash(),
4226
                challenges: [BlockHeight::genesis(); 10],
4227
            };
4228

4229
            let alice_p2p_messages = Mock::new(vec![
4230
                Action::Read(PeerMessage::SyncChallenge(sync_challenge)),
4231
                Action::Read(PeerMessage::Bye),
4232
            ]);
4233

4234
            let peer_address = get_dummy_socket_address(0);
4235
            let mut alice_peer_loop_handler = PeerLoopHandler::new(
4236
                alice_peer_to_main_tx.clone(),
4237
                alice.clone(),
4238
                peer_address,
4239
                alice_hsd,
4240
                false,
4241
                1,
4242
            );
4243
            alice_peer_loop_handler
4244
                .run_wrapper(alice_p2p_messages, alice_main_to_peer_rx)
4245
                .await
4246
                .unwrap();
4247

4248
            drop(alice_peer_to_main_rx);
4249

4250
            let latest_sanction = alice
4251
                .lock_guard()
4252
                .await
4253
                .net
4254
                .get_peer_standing_from_database(peer_address.ip())
4255
                .await
4256
                .unwrap();
4257
            assert_eq!(
4258
                NegativePeerSanction::InvalidSyncChallenge,
4259
                latest_sanction
4260
                    .latest_punishment
4261
                    .expect("peer must be sanctioned")
4262
                    .0
4263
            );
4264
        }
4265

4266
        #[traced_test]
4267
        #[apply(shared_tokio_runtime)]
4268
        async fn sync_challenge_happy_path() -> Result<()> {
4269
            // Bob notifies Alice of a block whose parameters satisfy the sync mode
4270
            // criterion. Alice issues a challenge. Bob responds. Alice enters into
4271
            // sync mode.
4272

4273
            let mut rng = rand::rng();
4274
            let network = Network::Main;
4275
            let genesis_block: Block = Block::genesis(network);
4276

4277
            const ALICE_SYNC_MODE_THRESHOLD: usize = 10;
4278
            let alice_cli = cli_args::Args {
4279
                sync_mode_threshold: ALICE_SYNC_MODE_THRESHOLD,
4280
                ..Default::default()
4281
            };
4282
            let (
4283
                _alice_main_to_peer_tx,
4284
                alice_main_to_peer_rx,
4285
                alice_peer_to_main_tx,
4286
                mut alice_peer_to_main_rx,
4287
                mut alice,
4288
                alice_hsd,
4289
            ) = get_test_genesis_setup(network, 0, alice_cli).await?;
4290
            let _alice_socket_address = get_dummy_socket_address(0);
4291

4292
            let (
4293
                _bob_main_to_peer_tx,
4294
                _bob_main_to_peer_rx,
4295
                _bob_peer_to_main_tx,
4296
                _bob_peer_to_main_rx,
4297
                mut bob,
4298
                _bob_hsd,
4299
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
4300
            let bob_socket_address = get_dummy_socket_address(0);
4301

4302
            let now = genesis_block.header().timestamp + Timestamp::hours(1);
4303
            let block_1 = fake_valid_block_for_tests(&alice, rng.random()).await;
4304
            assert!(
4305
                block_1.is_valid(&genesis_block, now, network).await,
4306
                "Block must be valid for this test to make sense"
4307
            );
4308
            let alice_tip = &block_1;
4309
            alice.set_new_tip(block_1.clone()).await?;
4310
            bob.set_new_tip(block_1.clone()).await?;
4311

4312
            // produce enough blocks to ensure alice needs to go into sync mode
4313
            // with this block notification.
4314
            let blocks = fake_valid_sequence_of_blocks_for_tests_dyn(
4315
                &block_1,
4316
                network.target_block_interval(),
4317
                (0..rng.random_range(ALICE_SYNC_MODE_THRESHOLD + 1..20))
4318
                    .map(|_| rng.random())
4319
                    .collect_vec(),
4320
                network,
4321
            )
4322
            .await;
4323
            for block in &blocks {
4324
                bob.set_new_tip(block.clone()).await?;
4325
            }
4326
            let bob_tip = blocks.last().unwrap();
4327

4328
            let block_notification_from_bob = PeerBlockNotification {
4329
                hash: bob_tip.hash(),
4330
                height: bob_tip.header().height,
4331
                cumulative_proof_of_work: bob_tip.header().cumulative_proof_of_work,
4332
            };
4333

4334
            let alice_rng_seed = rng.random::<[u8; 32]>();
4335
            let mut alice_rng_clone = StdRng::from_seed(alice_rng_seed);
4336
            let sync_challenge_from_alice = SyncChallenge::generate(
4337
                &block_notification_from_bob,
4338
                alice_tip.header().height,
4339
                alice_rng_clone.random(),
4340
            );
4341

4342
            println!(
4343
                "sync challenge from alice:\n{:?}",
4344
                sync_challenge_from_alice
4345
            );
4346

4347
            let sync_challenge_response_from_bob = bob
4348
                .lock_guard()
4349
                .await
4350
                .response_to_sync_challenge(sync_challenge_from_alice)
4351
                .await
4352
                .expect("should be able to respond to sync challenge");
4353

4354
            let alice_p2p_messages = Mock::new(vec![
4355
                Action::Read(PeerMessage::BlockNotification(block_notification_from_bob)),
4356
                Action::Write(PeerMessage::SyncChallenge(sync_challenge_from_alice)),
4357
                Action::Read(PeerMessage::SyncChallengeResponse(Box::new(
4358
                    sync_challenge_response_from_bob,
4359
                ))),
4360
                Action::Read(PeerMessage::BlockNotification(block_notification_from_bob)),
4361
                // Ensure no 2nd sync challenge is sent, as timeout has not yet passed.
4362
                // The absence of a Write here checks that a 2nd challenge isn't sent
4363
                // when a successful was just received.
4364
                Action::Read(PeerMessage::Bye),
4365
            ]);
4366

4367
            let mut alice_peer_loop_handler = PeerLoopHandler::with_mocked_time(
4368
                alice_peer_to_main_tx.clone(),
4369
                alice.clone(),
4370
                bob_socket_address,
4371
                alice_hsd,
4372
                false,
4373
                1,
4374
                bob_tip.header().timestamp,
4375
            );
4376
            alice_peer_loop_handler.set_rng(StdRng::from_seed(alice_rng_seed));
4377
            alice_peer_loop_handler
4378
                .run_wrapper(alice_p2p_messages, alice_main_to_peer_rx)
4379
                .await?;
4380

4381
            // AddPeerMaxBlockHeight message triggered *after* sync challenge
4382
            let mut expected_anchor_mmra = bob_tip.body().block_mmr_accumulator.clone();
4383
            expected_anchor_mmra.append(bob_tip.hash());
4384
            let expected_message_from_alice_peer_loop = PeerTaskToMain::AddPeerMaxBlockHeight {
4385
                peer_address: bob_socket_address,
4386
                claimed_height: bob_tip.header().height,
4387
                claimed_cumulative_pow: bob_tip.header().cumulative_proof_of_work,
4388
                claimed_block_mmra: expected_anchor_mmra,
4389
            };
4390
            let observed_message_from_alice_peer_loop = alice_peer_to_main_rx.recv().await.unwrap();
4391
            assert_eq!(
4392
                expected_message_from_alice_peer_loop,
4393
                observed_message_from_alice_peer_loop
4394
            );
4395

4396
            Ok(())
4397
        }
4398
    }
4399
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc