• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Neptune-Crypto / neptune-core / 16439574444

22 Jul 2025 08:51AM UTC coverage: 72.433% (-0.05%) from 72.479%
16439574444

push

github

aszepieniec
test: Increase timeout

Fix test `alice_updates_mutator_set_data_on_own_transaction` by
increasing the timeout from 30s to 45s. The test fails occasionally
on my (Alan's) local machine, particularly when some other costly
computation is running. On Github's CI machine it fails regularly.
With the larger timeout there is a some more margin for slower
machines.

20663 of 28527 relevant lines covered (72.43%)

502432.41 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.44
/src/peer_loop.rs
1
use std::cmp;
2
use std::marker::Unpin;
3
use std::net::SocketAddr;
4
use std::time::SystemTime;
5

6
use anyhow::bail;
7
use anyhow::Result;
8
use chrono::DateTime;
9
use chrono::Utc;
10
use futures::sink::Sink;
11
use futures::sink::SinkExt;
12
use futures::stream::TryStream;
13
use futures::stream::TryStreamExt;
14
use itertools::Itertools;
15
use rand::rngs::StdRng;
16
use rand::Rng;
17
use rand::SeedableRng;
18
use tasm_lib::triton_vm::prelude::Digest;
19
use tasm_lib::twenty_first::prelude::Mmr;
20
use tasm_lib::twenty_first::prelude::MmrMembershipProof;
21
use tasm_lib::twenty_first::util_types::mmr::mmr_accumulator::MmrAccumulator;
22
use tokio::select;
23
use tokio::sync::broadcast;
24
use tokio::sync::mpsc;
25
use tracing::debug;
26
use tracing::error;
27
use tracing::info;
28
use tracing::warn;
29

30
use crate::connect_to_peers::close_peer_connected_callback;
31
use crate::macros::fn_name;
32
use crate::macros::log_slow_scope;
33
use crate::main_loop::MAX_NUM_DIGESTS_IN_BATCH_REQUEST;
34
use crate::models::blockchain::block::block_height::BlockHeight;
35
use crate::models::blockchain::block::mutator_set_update::MutatorSetUpdate;
36
use crate::models::blockchain::block::Block;
37
use crate::models::blockchain::transaction::transaction_kernel::TransactionConfirmabilityError;
38
use crate::models::blockchain::transaction::Transaction;
39
use crate::models::channel::MainToPeerTask;
40
use crate::models::channel::PeerTaskToMain;
41
use crate::models::channel::PeerTaskToMainTransaction;
42
use crate::models::peer::handshake_data::HandshakeData;
43
use crate::models::peer::peer_info::PeerConnectionInfo;
44
use crate::models::peer::peer_info::PeerInfo;
45
use crate::models::peer::transfer_block::TransferBlock;
46
use crate::models::peer::BlockProposalRequest;
47
use crate::models::peer::BlockRequestBatch;
48
use crate::models::peer::IssuedSyncChallenge;
49
use crate::models::peer::MutablePeerState;
50
use crate::models::peer::NegativePeerSanction;
51
use crate::models::peer::PeerMessage;
52
use crate::models::peer::PeerSanction;
53
use crate::models::peer::PeerStanding;
54
use crate::models::peer::PositivePeerSanction;
55
use crate::models::peer::SyncChallenge;
56
use crate::models::proof_abstractions::mast_hash::MastHash;
57
use crate::models::proof_abstractions::timestamp::Timestamp;
58
use crate::models::state::block_proposal::BlockProposalRejectError;
59
use crate::models::state::mempool::MEMPOOL_IGNORE_TRANSACTIONS_THIS_MANY_SECS_AHEAD;
60
use crate::models::state::mempool::MEMPOOL_TX_THRESHOLD_AGE_IN_SECS;
61
use crate::models::state::GlobalState;
62
use crate::models::state::GlobalStateLock;
63
use crate::util_types::mutator_set::removal_record::RemovalRecordValidityError;
64

65
const STANDARD_BLOCK_BATCH_SIZE: usize = 250;
66
const MAX_PEER_LIST_LENGTH: usize = 10;
67
const MINIMUM_BLOCK_BATCH_SIZE: usize = 2;
68

69
const KEEP_CONNECTION_ALIVE: bool = false;
70
const DISCONNECT_CONNECTION: bool = true;
71

72
pub type PeerStandingNumber = i32;
73

74
/// Handles messages from peers via TCP
75
///
76
/// also handles messages from main task over the main-to-peer-tasks broadcast
77
/// channel.
78
#[derive(Debug, Clone)]
79
pub struct PeerLoopHandler {
80
    to_main_tx: mpsc::Sender<PeerTaskToMain>,
81
    global_state_lock: GlobalStateLock,
82
    peer_address: SocketAddr,
83
    peer_handshake_data: HandshakeData,
84
    inbound_connection: bool,
85
    distance: u8,
86
    rng: StdRng,
87
    #[cfg(test)]
88
    mock_now: Option<Timestamp>,
89
}
90

91
impl PeerLoopHandler {
92
    pub(crate) fn new(
28✔
93
        to_main_tx: mpsc::Sender<PeerTaskToMain>,
28✔
94
        global_state_lock: GlobalStateLock,
28✔
95
        peer_address: SocketAddr,
28✔
96
        peer_handshake_data: HandshakeData,
28✔
97
        inbound_connection: bool,
28✔
98
        distance: u8,
28✔
99
    ) -> Self {
28✔
100
        Self {
28✔
101
            to_main_tx,
28✔
102
            global_state_lock,
28✔
103
            peer_address,
28✔
104
            peer_handshake_data,
28✔
105
            inbound_connection,
28✔
106
            distance,
28✔
107
            rng: StdRng::from_rng(&mut rand::rng()),
28✔
108
            #[cfg(test)]
28✔
109
            mock_now: None,
28✔
110
        }
28✔
111
    }
28✔
112

113
    /// Allows for mocked timestamps such that time dependencies may be tested.
114
    #[cfg(test)]
115
    pub(crate) fn with_mocked_time(
22✔
116
        to_main_tx: mpsc::Sender<PeerTaskToMain>,
22✔
117
        global_state_lock: GlobalStateLock,
22✔
118
        peer_address: SocketAddr,
22✔
119
        peer_handshake_data: HandshakeData,
22✔
120
        inbound_connection: bool,
22✔
121
        distance: u8,
22✔
122
        mocked_time: Timestamp,
22✔
123
    ) -> Self {
22✔
124
        Self {
22✔
125
            to_main_tx,
22✔
126
            global_state_lock,
22✔
127
            peer_address,
22✔
128
            peer_handshake_data,
22✔
129
            inbound_connection,
22✔
130
            distance,
22✔
131
            mock_now: Some(mocked_time),
22✔
132
            rng: StdRng::from_rng(&mut rand::rng()),
22✔
133
        }
22✔
134
    }
22✔
135

136
    /// Overwrite the random number generator object with a specific one.
137
    ///
138
    /// Useful for derandomizing tests.
139
    #[cfg(test)]
140
    fn set_rng(&mut self, rng: StdRng) {
1✔
141
        self.rng = rng;
1✔
142
    }
1✔
143

144
    fn now(&self) -> Timestamp {
56✔
145
        #[cfg(not(test))]
146
        {
147
            Timestamp::now()
27✔
148
        }
149
        #[cfg(test)]
150
        {
151
            self.mock_now.unwrap_or(Timestamp::now())
29✔
152
        }
153
    }
56✔
154

155
    /// Punish a peer for bad behavior.
156
    ///
157
    /// Return `Err` if the peer in question is (now) banned.
158
    ///
159
    /// # Locking:
160
    ///   * acquires `global_state_lock` for write
161
    async fn punish(&mut self, reason: NegativePeerSanction) -> Result<()> {
6✔
162
        let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
6✔
163
        warn!("Punishing peer {} for {:?}", self.peer_address.ip(), reason);
6✔
164
        debug!(
6✔
165
            "Peer standing before punishment is {}",
×
166
            global_state_mut
×
167
                .net
×
168
                .peer_map
×
169
                .get(&self.peer_address)
×
170
                .unwrap()
×
171
                .standing
172
        );
173

174
        let Some(peer_info) = global_state_mut.net.peer_map.get_mut(&self.peer_address) else {
6✔
175
            bail!("Could not read peer map.");
×
176
        };
177
        let sanction_result = peer_info.standing.sanction(PeerSanction::Negative(reason));
6✔
178
        if let Err(err) = sanction_result {
6✔
179
            warn!("Banning peer: {err}");
1✔
180
        }
5✔
181

182
        sanction_result.map_err(|err| anyhow::anyhow!("Banning peer: {err}"))
6✔
183
    }
6✔
184

185
    /// Reward a peer for good behavior.
186
    ///
187
    /// Return `Err` if the peer in question is banned.
188
    ///
189
    /// # Locking:
190
    ///   * acquires `global_state_lock` for write
191
    async fn reward(&mut self, reason: PositivePeerSanction) -> Result<()> {
19✔
192
        let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
19✔
193
        info!("Rewarding peer {} for {:?}", self.peer_address.ip(), reason);
19✔
194
        let Some(peer_info) = global_state_mut.net.peer_map.get_mut(&self.peer_address) else {
19✔
195
            error!("Could not read peer map.");
1✔
196
            return Ok(());
1✔
197
        };
198
        let sanction_result = peer_info.standing.sanction(PeerSanction::Positive(reason));
18✔
199
        if sanction_result.is_err() {
18✔
200
            error!("Cannot reward banned peer");
×
201
        }
18✔
202

203
        sanction_result.map_err(|err| anyhow::anyhow!("Cannot reward banned peer: {err}"))
18✔
204
    }
19✔
205

206
    /// Construct a batch response, with blocks and their MMR membership proofs
207
    /// relative to a specified anchor.
208
    ///
209
    /// Returns `None` if the anchor has a lower leaf count than the blocks, or
210
    /// a block height of the response exceeds own tip height.
211
    async fn batch_response(
16✔
212
        state: &GlobalState,
16✔
213
        blocks: Vec<Block>,
16✔
214
        anchor: &MmrAccumulator,
16✔
215
    ) -> Option<Vec<(TransferBlock, MmrMembershipProof)>> {
16✔
216
        let own_tip_height = state.chain.light_state().header().height;
16✔
217
        let block_heights_match_anchor = blocks
16✔
218
            .iter()
16✔
219
            .all(|bl| bl.header().height < anchor.num_leafs().into());
46✔
220
        let block_heights_known = blocks.iter().all(|bl| bl.header().height <= own_tip_height);
46✔
221
        if !block_heights_match_anchor || !block_heights_known {
16✔
222
            let max_block_height = match blocks.iter().map(|bl| bl.header().height).max() {
×
223
                Some(height) => height.to_string(),
×
224
                None => "None".to_owned(),
×
225
            };
226

227
            debug!("max_block_height: {max_block_height}");
×
228
            debug!("own_tip_height: {own_tip_height}");
×
229
            debug!("anchor.num_leafs(): {}", anchor.num_leafs());
×
230
            debug!("block_heights_match_anchor: {block_heights_match_anchor}");
×
231
            debug!("block_heights_known: {block_heights_known}");
×
232
            return None;
×
233
        }
16✔
234

235
        let mut ret = vec![];
16✔
236
        for block in blocks {
62✔
237
            let mmr_mp = state
46✔
238
                .chain
46✔
239
                .archival_state()
46✔
240
                .archival_block_mmr
46✔
241
                .ammr()
46✔
242
                .prove_membership_relative_to_smaller_mmr(
46✔
243
                    block.header().height.into(),
46✔
244
                    anchor.num_leafs(),
46✔
245
                )
46✔
246
                .await;
46✔
247
            let block: TransferBlock = block.try_into().unwrap();
46✔
248
            ret.push((block, mmr_mp));
46✔
249
        }
250

251
        Some(ret)
16✔
252
    }
16✔
253

254
    /// Handle validation and send all blocks to the main task if they're all
255
    /// valid. Use with a list of blocks or a single block. When the
256
    /// `received_blocks` is a list, the parent of the `i+1`th block in the
257
    /// list is the `i`th block. The parent of element zero in this list is
258
    /// `parent_of_first_block`.
259
    ///
260
    /// # Return Value
261
    ///  - `Err` when the connection should be closed;
262
    ///  - `Ok(None)` if some block is invalid
263
    ///  - `Ok(None)` if the last block has insufficient cumulative PoW and we
264
    ///    are not syncing;
265
    ///  - `Ok(None)` if the last block has insufficient height and we are
266
    ///    syncing;
267
    ///  - `Ok(Some(block_height))` otherwise, referring to the block with the
268
    ///    highest height in the batch.
269
    ///
270
    /// A return value of Ok(Some(_)) means that the message was passed on to
271
    /// main loop.
272
    ///
273
    /// # Locking
274
    ///   * Acquires `global_state_lock` for write via `self.punish(..)` and
275
    ///     `self.reward(..)`.
276
    ///
277
    /// # Panics
278
    ///
279
    ///  - Panics if called with the empty list.
280
    async fn handle_blocks(
8✔
281
        &mut self,
8✔
282
        received_blocks: Vec<Block>,
8✔
283
        parent_of_first_block: Block,
8✔
284
    ) -> Result<Option<BlockHeight>> {
20✔
285
        debug!(
20✔
286
            "attempting to validate {} {}",
×
287
            received_blocks.len(),
×
288
            if received_blocks.len() == 1 {
×
289
                "block"
×
290
            } else {
291
                "blocks"
×
292
            }
293
        );
294
        let now = self.now();
20✔
295
        debug!("validating with respect to current timestamp {now}");
20✔
296
        let mut previous_block = &parent_of_first_block;
20✔
297
        for new_block in &received_blocks {
48✔
298
            let new_block_has_proof_of_work = new_block.has_proof_of_work(
29✔
299
                self.global_state_lock.cli().network,
29✔
300
                previous_block.header(),
29✔
301
            );
302
            debug!("new block has proof of work? {new_block_has_proof_of_work}");
29✔
303
            let new_block_is_valid = new_block
29✔
304
                .is_valid(previous_block, now, self.global_state_lock.cli().network)
29✔
305
                .await;
29✔
306
            debug!("new block is valid? {new_block_is_valid}");
29✔
307
            if !new_block_has_proof_of_work {
29✔
308
                warn!(
1✔
309
                    "Received invalid proof-of-work for block of height {} from peer with IP {}",
×
310
                    new_block.kernel.header.height, self.peer_address
×
311
                );
312
                warn!("Difficulty is {}.", previous_block.kernel.header.difficulty);
1✔
313
                warn!(
1✔
314
                    "Proof of work should be {} (or more) but was [{}].",
×
315
                    previous_block.kernel.header.difficulty.target(),
×
316
                    new_block.hash().values().iter().join(", ")
×
317
                );
318
                self.punish(NegativePeerSanction::InvalidBlock((
1✔
319
                    new_block.kernel.header.height,
1✔
320
                    new_block.hash(),
1✔
321
                )))
1✔
322
                .await?;
1✔
323
                warn!("Failed to validate block due to insufficient PoW");
1✔
324
                return Ok(None);
1✔
325
            } else if !new_block_is_valid {
28✔
326
                warn!(
×
327
                    "Received invalid block of height {} from peer with IP {}",
×
328
                    new_block.kernel.header.height, self.peer_address
×
329
                );
330
                self.punish(NegativePeerSanction::InvalidBlock((
×
331
                    new_block.kernel.header.height,
×
332
                    new_block.hash(),
×
333
                )))
×
334
                .await?;
×
335
                warn!("Failed to validate block: invalid block");
×
336
                return Ok(None);
×
337
            }
28✔
338
            info!(
28✔
339
                "Block with height {} is valid. mined: {}",
×
340
                new_block.kernel.header.height,
×
341
                new_block.kernel.header.timestamp.standard_format()
×
342
            );
343

344
            previous_block = new_block;
28✔
345
        }
346

347
        // evaluate the fork choice rule
348
        debug!("Checking last block's canonicity ...");
19✔
349
        let last_block = received_blocks.last().unwrap();
19✔
350
        let is_canonical = self
19✔
351
            .global_state_lock
19✔
352
            .lock_guard()
19✔
353
            .await
19✔
354
            .incoming_block_is_more_canonical(last_block);
19✔
355
        let last_block_height = last_block.header().height;
19✔
356
        let sync_mode_active_and_have_new_champion = self
19✔
357
            .global_state_lock
19✔
358
            .lock_guard()
19✔
359
            .await
19✔
360
            .net
361
            .sync_anchor
362
            .as_ref()
19✔
363
            .is_some_and(|x| {
19✔
364
                x.champion
×
365
                    .is_none_or(|(height, _)| height < last_block_height)
×
366
            });
×
367
        if !is_canonical && !sync_mode_active_and_have_new_champion {
19✔
368
            warn!(
1✔
369
                "Received {} blocks from peer but incoming blocks are less \
×
370
            canonical than current tip, or current sync-champion.",
×
371
                received_blocks.len()
×
372
            );
373
            return Ok(None);
1✔
374
        }
18✔
375

376
        // Send the new blocks to the main task which handles the state update
377
        // and storage to the database.
378
        let number_of_received_blocks = received_blocks.len();
18✔
379
        self.to_main_tx
18✔
380
            .send(PeerTaskToMain::NewBlocks(received_blocks))
18✔
381
            .await?;
18✔
382
        info!(
18✔
383
            "Updated block info by block from peer. block height {}",
×
384
            last_block_height
385
        );
386

387
        // Valuable, new, hard-to-produce information. Reward peer.
388
        self.reward(PositivePeerSanction::ValidBlocks(number_of_received_blocks))
18✔
389
            .await?;
18✔
390

391
        Ok(Some(last_block_height))
18✔
392
    }
20✔
393

394
    /// Take a single block received from a peer and (attempt to) find a path
395
    /// between the received block and a common ancestor stored in the blocks
396
    /// database.
397
    ///
398
    /// This function attempts to find the parent of the received block, either
399
    /// by searching the database or by requesting it from a peer.
400
    ///  - If the parent is not stored, it is requested from the peer and the
401
    ///    received block is pushed to the fork reconciliation list for later
402
    ///    handling by this function. The fork reconciliation list starts out
403
    ///    empty, but grows as more parents are requested and transmitted.
404
    ///  - If the parent is found in the database, a) block handling continues:
405
    ///    the entire list of fork reconciliation blocks are passed down the
406
    ///    pipeline, potentially leading to a state update; and b) the fork
407
    ///    reconciliation list is cleared.
408
    ///
409
    /// Locking:
410
    ///   * Acquires `global_state_lock` for write via `self.punish(..)` and
411
    ///     `self.reward(..)`.
412
    async fn try_ensure_path<S>(
32✔
413
        &mut self,
32✔
414
        received_block: Box<Block>,
32✔
415
        peer: &mut S,
32✔
416
        peer_state: &mut MutablePeerState,
32✔
417
    ) -> Result<()>
32✔
418
    where
32✔
419
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
32✔
420
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
32✔
421
        <S as TryStream>::Error: std::error::Error,
32✔
422
    {
32✔
423
        // Does the received block match the fork reconciliation list?
424
        let received_block_matches_fork_reconciliation_list = if let Some(successor) =
32✔
425
            peer_state.fork_reconciliation_blocks.last()
32✔
426
        {
427
            let valid = successor
10✔
428
                .is_valid(
10✔
429
                    received_block.as_ref(),
10✔
430
                    self.now(),
10✔
431
                    self.global_state_lock.cli().network,
10✔
432
                )
10✔
433
                .await;
10✔
434
            if !valid {
10✔
435
                warn!(
×
436
                        "Fork reconciliation failed after receiving {} blocks: successor of received block is invalid",
×
437
                        peer_state.fork_reconciliation_blocks.len() + 1
×
438
                    );
439
            }
10✔
440
            valid
10✔
441
        } else {
442
            true
22✔
443
        };
444

445
        // Are we running out of RAM?
446
        let too_many_blocks = peer_state.fork_reconciliation_blocks.len() + 1
32✔
447
            >= self.global_state_lock.cli().sync_mode_threshold;
32✔
448
        if too_many_blocks {
32✔
449
            warn!(
1✔
450
                "Fork reconciliation failed after receiving {} blocks: block count exceeds sync mode threshold",
×
451
                peer_state.fork_reconciliation_blocks.len() + 1
×
452
            );
453
        }
31✔
454

455
        // Block mismatch or too many blocks: abort!
456
        if !received_block_matches_fork_reconciliation_list || too_many_blocks {
32✔
457
            self.punish(NegativePeerSanction::ForkResolutionError((
1✔
458
                received_block.header().height,
1✔
459
                peer_state.fork_reconciliation_blocks.len() as u16,
1✔
460
                received_block.hash(),
1✔
461
            )))
1✔
462
            .await?;
1✔
463
            peer_state.fork_reconciliation_blocks = vec![];
1✔
464
            return Ok(());
1✔
465
        }
31✔
466

467
        // otherwise, append
468
        peer_state.fork_reconciliation_blocks.push(*received_block);
31✔
469

470
        // Try fetch parent
471
        let received_block_header = *peer_state
31✔
472
            .fork_reconciliation_blocks
31✔
473
            .last()
31✔
474
            .unwrap()
31✔
475
            .header();
31✔
476

477
        let parent_digest = received_block_header.prev_block_digest;
31✔
478
        let parent_height = received_block_header.height.previous()
31✔
479
            .expect("transferred block must have previous height because genesis block cannot be transferred");
31✔
480
        debug!("Try ensure path: fetching parent block");
31✔
481
        let parent_block = self
31✔
482
            .global_state_lock
31✔
483
            .lock_guard()
31✔
484
            .await
31✔
485
            .chain
486
            .archival_state()
31✔
487
            .get_block(parent_digest)
31✔
488
            .await?;
31✔
489
        debug!(
31✔
490
            "Completed parent block fetching from DB: {}",
×
491
            if parent_block.is_some() {
×
492
                "found".to_string()
×
493
            } else {
494
                "not found".to_string()
×
495
            }
496
        );
497

498
        // If parent is not known (but not genesis) request it.
499
        let Some(parent_block) = parent_block else {
31✔
500
            if parent_height.is_genesis() {
11✔
501
                peer_state.fork_reconciliation_blocks.clear();
1✔
502
                self.punish(NegativePeerSanction::DifferentGenesis).await?;
1✔
503
                return Ok(());
×
504
            }
10✔
505
            info!(
10✔
506
                "Parent not known: Requesting previous block with height {} from peer",
×
507
                parent_height
508
            );
509

510
            peer.send(PeerMessage::BlockRequestByHash(parent_digest))
10✔
511
                .await?;
10✔
512

513
            return Ok(());
10✔
514
        };
515

516
        // We want to treat the received fork reconciliation blocks (plus the
517
        // received block) in reverse order, from oldest to newest, because
518
        // they were requested from high to low block height.
519
        let mut new_blocks = peer_state.fork_reconciliation_blocks.clone();
20✔
520
        new_blocks.reverse();
20✔
521

522
        // Reset the fork resolution state since we got all the way back to a
523
        // block that we have.
524
        let fork_reconciliation_event = !peer_state.fork_reconciliation_blocks.is_empty();
20✔
525
        peer_state.fork_reconciliation_blocks.clear();
20✔
526

527
        if let Some(new_block_height) = self.handle_blocks(new_blocks, parent_block).await? {
20✔
528
            // If `BlockNotification` was received during a block reconciliation
529
            // event, then the peer might have one (or more (unlikely)) blocks
530
            // that we do not have. We should thus request those blocks.
531
            if fork_reconciliation_event
18✔
532
                && peer_state.highest_shared_block_height > new_block_height
18✔
533
            {
534
                peer.send(PeerMessage::BlockRequestByHeight(
1✔
535
                    peer_state.highest_shared_block_height,
1✔
536
                ))
1✔
537
                .await?;
1✔
538
            }
17✔
539
        }
2✔
540

541
        Ok(())
20✔
542
    }
32✔
543

544
    /// Handle peer messages and returns Ok(true) if connection should be closed.
545
    /// Connection should also be closed if an error is returned.
546
    /// Otherwise, returns OK(false).
547
    ///
548
    /// Locking:
549
    ///   * Acquires `global_state_lock` for read.
550
    ///   * Acquires `global_state_lock` for write via `self.punish(..)` and
551
    ///     `self.reward(..)`.
552
    async fn handle_peer_message<S>(
153✔
553
        &mut self,
153✔
554
        msg: PeerMessage,
153✔
555
        peer: &mut S,
153✔
556
        peer_state_info: &mut MutablePeerState,
153✔
557
    ) -> Result<bool>
153✔
558
    where
153✔
559
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
153✔
560
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
153✔
561
        <S as TryStream>::Error: std::error::Error,
153✔
562
    {
153✔
563
        debug!(
153✔
564
            "Received {} from peer {}",
×
565
            msg.get_type(),
×
566
            self.peer_address
567
        );
568
        match msg {
153✔
569
            PeerMessage::Bye => {
570
                // Note that the current peer is not removed from the global_state.peer_map here
571
                // but that this is done by the caller.
572
                info!("Got bye. Closing connection to peer");
42✔
573
                Ok(DISCONNECT_CONNECTION)
42✔
574
            }
575
            PeerMessage::PeerListRequest => {
576
                let peer_info = {
5✔
577
                    log_slow_scope!(fn_name!() + "::PeerMessage::PeerListRequest");
5✔
578

579
                    // We are interested in the address on which peers accept ingoing connections,
580
                    // not in the address in which they are connected to us. We are only interested in
581
                    // peers that accept incoming connections.
582
                    let mut peer_info: Vec<(SocketAddr, u128)> = self
5✔
583
                        .global_state_lock
5✔
584
                        .lock_guard()
5✔
585
                        .await
5✔
586
                        .net
587
                        .peer_map
588
                        .values()
5✔
589
                        .filter(|peer_info| peer_info.listen_address().is_some())
8✔
590
                        .take(MAX_PEER_LIST_LENGTH) // limit length of response
5✔
591
                        .map(|peer_info| {
8✔
592
                            (
8✔
593
                                // unwrap is safe bc of above `filter`
8✔
594
                                peer_info.listen_address().unwrap(),
8✔
595
                                peer_info.instance_id(),
8✔
596
                            )
8✔
597
                        })
8✔
598
                        .collect();
5✔
599

600
                    // We sort the returned list, so this function is easier to test
601
                    peer_info.sort_by_cached_key(|x| x.0);
5✔
602
                    peer_info
5✔
603
                };
604

605
                debug!("Responding with: {:?}", peer_info);
5✔
606
                peer.send(PeerMessage::PeerListResponse(peer_info)).await?;
5✔
607
                Ok(KEEP_CONNECTION_ALIVE)
5✔
608
            }
609
            PeerMessage::PeerListResponse(peers) => {
3✔
610
                log_slow_scope!(fn_name!() + "::PeerMessage::PeerListResponse");
3✔
611

612
                if peers.len() > MAX_PEER_LIST_LENGTH {
3✔
613
                    self.punish(NegativePeerSanction::FloodPeerListResponse)
×
614
                        .await?;
×
615
                }
3✔
616
                self.to_main_tx
3✔
617
                    .send(PeerTaskToMain::PeerDiscoveryAnswer((
3✔
618
                        peers,
3✔
619
                        self.peer_address,
3✔
620
                        // The distance to the revealed peers is 1 + this peer's distance
3✔
621
                        self.distance + 1,
3✔
622
                    )))
3✔
623
                    .await?;
3✔
624
                Ok(KEEP_CONNECTION_ALIVE)
3✔
625
            }
626
            PeerMessage::BlockNotificationRequest => {
627
                debug!("Got BlockNotificationRequest");
×
628

629
                peer.send(PeerMessage::BlockNotification(
×
630
                    self.global_state_lock
×
631
                        .lock_guard()
×
632
                        .await
×
633
                        .chain
634
                        .light_state()
×
635
                        .into(),
×
636
                ))
637
                .await?;
×
638

639
                Ok(KEEP_CONNECTION_ALIVE)
×
640
            }
641
            PeerMessage::BlockNotification(block_notification) => {
16✔
642
                const SYNC_CHALLENGE_COOLDOWN: Timestamp = Timestamp::minutes(10);
643

644
                let (tip_header, sync_anchor_is_set) = {
16✔
645
                    let state = self.global_state_lock.lock_guard().await;
16✔
646
                    (
16✔
647
                        *state.chain.light_state().header(),
16✔
648
                        state.net.sync_anchor.is_some(),
16✔
649
                    )
16✔
650
                };
651
                debug!(
16✔
652
                    "Got BlockNotification of height {}. Own height is {}",
×
653
                    block_notification.height, tip_header.height
654
                );
655

656
                let sync_mode_threshold = self.global_state_lock.cli().sync_mode_threshold;
16✔
657
                let now = self.now();
16✔
658
                let time_since_latest_successful_challenge = peer_state_info
16✔
659
                    .successful_sync_challenge_response_time
16✔
660
                    .map(|then| now - then);
16✔
661
                let cooldown_expired = time_since_latest_successful_challenge
16✔
662
                    .is_none_or(|time_passed| time_passed > SYNC_CHALLENGE_COOLDOWN);
16✔
663
                let exceeds_sync_mode_threshold = GlobalState::sync_mode_threshold_stateless(
16✔
664
                    &tip_header,
16✔
665
                    block_notification.height,
16✔
666
                    block_notification.cumulative_proof_of_work,
16✔
667
                    sync_mode_threshold,
16✔
668
                );
669
                if cooldown_expired && exceeds_sync_mode_threshold {
16✔
670
                    debug!("sync mode criterion satisfied.");
1✔
671

672
                    if peer_state_info.sync_challenge.is_some() {
1✔
673
                        warn!("Cannot launch new sync challenge because one is already on-going.");
×
674
                        return Ok(KEEP_CONNECTION_ALIVE);
×
675
                    }
1✔
676

677
                    info!(
1✔
678
                        "Peer indicates block which satisfies sync mode criterion, issuing challenge."
×
679
                    );
680
                    let challenge = SyncChallenge::generate(
1✔
681
                        &block_notification,
1✔
682
                        tip_header.height,
1✔
683
                        self.rng.random(),
1✔
684
                    );
685
                    peer_state_info.sync_challenge = Some(IssuedSyncChallenge::new(
1✔
686
                        challenge,
1✔
687
                        block_notification.cumulative_proof_of_work,
1✔
688
                        self.now(),
1✔
689
                    ));
1✔
690

691
                    debug!("sending challenge ...");
1✔
692
                    peer.send(PeerMessage::SyncChallenge(challenge)).await?;
1✔
693

694
                    return Ok(KEEP_CONNECTION_ALIVE);
1✔
695
                }
15✔
696

697
                peer_state_info.highest_shared_block_height = block_notification.height;
15✔
698
                let block_is_new = tip_header.cumulative_proof_of_work
15✔
699
                    < block_notification.cumulative_proof_of_work;
15✔
700

701
                debug!("block_is_new: {}", block_is_new);
15✔
702

703
                if block_is_new
15✔
704
                    && peer_state_info.fork_reconciliation_blocks.is_empty()
15✔
705
                    && !sync_anchor_is_set
14✔
706
                    && !exceeds_sync_mode_threshold
14✔
707
                {
708
                    debug!(
13✔
709
                        "sending BlockRequestByHeight to peer for block with height {}",
×
710
                        block_notification.height
711
                    );
712
                    peer.send(PeerMessage::BlockRequestByHeight(block_notification.height))
13✔
713
                        .await?;
13✔
714
                } else {
715
                    debug!(
2✔
716
                        "ignoring peer block. height {}. new: {}, reconciling_fork: {}",
×
717
                        block_notification.height,
718
                        block_is_new,
719
                        !peer_state_info.fork_reconciliation_blocks.is_empty()
×
720
                    );
721
                }
722

723
                Ok(KEEP_CONNECTION_ALIVE)
15✔
724
            }
725
            PeerMessage::SyncChallenge(sync_challenge) => {
2✔
726
                let response = {
×
727
                    log_slow_scope!(fn_name!() + "::PeerMessage::SyncChallenge");
2✔
728

729
                    info!("Got sync challenge from {}", self.peer_address.ip());
2✔
730

731
                    let response = self
2✔
732
                        .global_state_lock
2✔
733
                        .lock_guard()
2✔
734
                        .await
2✔
735
                        .response_to_sync_challenge(sync_challenge)
2✔
736
                        .await;
2✔
737

738
                    match response {
2✔
739
                        Ok(resp) => resp,
×
740
                        Err(e) => {
2✔
741
                            warn!("could not generate sync challenge response:\n{e}");
2✔
742
                            self.punish(NegativePeerSanction::InvalidSyncChallenge)
2✔
743
                                .await?;
2✔
744
                            return Ok(KEEP_CONNECTION_ALIVE);
2✔
745
                        }
746
                    }
747
                };
748

749
                info!(
×
750
                    "Responding to sync challenge from {}",
×
751
                    self.peer_address.ip()
×
752
                );
753
                peer.send(PeerMessage::SyncChallengeResponse(Box::new(response)))
×
754
                    .await?;
×
755

756
                Ok(KEEP_CONNECTION_ALIVE)
×
757
            }
758
            PeerMessage::SyncChallengeResponse(challenge_response) => {
1✔
759
                const SYNC_RESPONSE_TIMEOUT: Timestamp = Timestamp::seconds(45);
760

761
                log_slow_scope!(fn_name!() + "::PeerMessage::SyncChallengeResponse");
1✔
762
                info!(
1✔
763
                    "Got sync challenge response from {}",
×
764
                    self.peer_address.ip()
×
765
                );
766

767
                // The purpose of the sync challenge and sync challenge response
768
                // is to avoid going into sync mode based on a malicious target
769
                // fork. Instead of verifying that the claimed proof-of-work
770
                // number is correct (which would require sending and verifying,
771
                // at least, all blocks between luca (whatever that is) and the
772
                // claimed tip), we use a heuristic that requires less
773
                // communication and less verification work. The downside of
774
                // using a heuristic here is a nonzero false positive and false
775
                // negative rate. Note that the false negative event
776
                // (maliciously sending someone into sync mode based on a bogus
777
                // fork) still requires a significant amount of work from the
778
                // attacker, *in addition* to being lucky. Also, down the line
779
                // succinctness (and more specifically, recursive block
780
                // validation) obviates this entire subprotocol.
781

782
                // Did we issue a challenge?
783
                let Some(issued_challenge) = peer_state_info.sync_challenge else {
1✔
784
                    warn!("Sync challenge response was not prompted.");
×
785
                    self.punish(NegativePeerSanction::UnexpectedSyncChallengeResponse)
×
786
                        .await?;
×
787
                    return Ok(KEEP_CONNECTION_ALIVE);
×
788
                };
789

790
                // Reset the challenge, regardless of the response's success.
791
                peer_state_info.sync_challenge = None;
1✔
792

793
                // Does response match issued challenge?
794
                if !challenge_response
1✔
795
                    .matches(self.global_state_lock.cli().network, issued_challenge)
1✔
796
                {
797
                    self.punish(NegativePeerSanction::InvalidSyncChallengeResponse)
×
798
                        .await?;
×
799
                    return Ok(KEEP_CONNECTION_ALIVE);
×
800
                }
1✔
801

802
                // Does response verify?
803
                let claimed_tip_height = challenge_response.tip.header.height;
1✔
804
                let now = self.now();
1✔
805
                if !challenge_response
1✔
806
                    .is_valid(now, self.global_state_lock.cli().network)
1✔
807
                    .await
1✔
808
                {
809
                    self.punish(NegativePeerSanction::InvalidSyncChallengeResponse)
×
810
                        .await?;
×
811
                    return Ok(KEEP_CONNECTION_ALIVE);
×
812
                }
1✔
813

814
                // Does cumulative proof-of-work evolve reasonably?
815
                let own_tip_header = *self
1✔
816
                    .global_state_lock
1✔
817
                    .lock_guard()
1✔
818
                    .await
1✔
819
                    .chain
820
                    .light_state()
1✔
821
                    .header();
1✔
822
                if !challenge_response
1✔
823
                    .check_pow(self.global_state_lock.cli().network, own_tip_header.height)
1✔
824
                {
825
                    self.punish(NegativePeerSanction::FishyPowEvolutionChallengeResponse)
×
826
                        .await?;
×
827
                    return Ok(KEEP_CONNECTION_ALIVE);
×
828
                }
1✔
829

830
                // Is there some specific (*i.e.*, not aggregate) proof of work?
831
                if !challenge_response.check_difficulty(own_tip_header.difficulty) {
1✔
832
                    self.punish(NegativePeerSanction::FishyDifficultiesChallengeResponse)
×
833
                        .await?;
×
834
                    return Ok(KEEP_CONNECTION_ALIVE);
×
835
                }
1✔
836

837
                // Did it come in time?
838
                if now - issued_challenge.issued_at > SYNC_RESPONSE_TIMEOUT {
1✔
839
                    self.punish(NegativePeerSanction::TimedOutSyncChallengeResponse)
×
840
                        .await?;
×
841
                    return Ok(KEEP_CONNECTION_ALIVE);
×
842
                }
1✔
843

844
                info!("Successful sync challenge response; relaying peer tip info to main loop.");
1✔
845
                peer_state_info.successful_sync_challenge_response_time = Some(now);
1✔
846

847
                let mut sync_mmra_anchor = challenge_response.tip.body.block_mmr_accumulator;
1✔
848
                sync_mmra_anchor.append(issued_challenge.challenge.tip_digest);
1✔
849

850
                // Inform main loop
851
                self.to_main_tx
1✔
852
                    .send(PeerTaskToMain::AddPeerMaxBlockHeight {
1✔
853
                        peer_address: self.peer_address,
1✔
854
                        claimed_height: claimed_tip_height,
1✔
855
                        claimed_cumulative_pow: issued_challenge.accumulated_pow,
1✔
856
                        claimed_block_mmra: sync_mmra_anchor,
1✔
857
                    })
1✔
858
                    .await?;
1✔
859

860
                Ok(KEEP_CONNECTION_ALIVE)
1✔
861
            }
862
            PeerMessage::BlockRequestByHash(block_digest) => {
×
863
                let block = self
×
864
                    .global_state_lock
×
865
                    .lock_guard()
×
866
                    .await
×
867
                    .chain
868
                    .archival_state()
×
869
                    .get_block(block_digest)
×
870
                    .await?;
×
871

872
                match block {
×
873
                    None => {
874
                        // TODO: Consider punishing here
875
                        warn!("Peer requested unknown block with hash {}", block_digest);
×
876
                        Ok(KEEP_CONNECTION_ALIVE)
×
877
                    }
878
                    Some(b) => {
×
879
                        peer.send(PeerMessage::Block(Box::new(b.try_into().unwrap())))
×
880
                            .await?;
×
881
                        Ok(KEEP_CONNECTION_ALIVE)
×
882
                    }
883
                }
884
            }
885
            PeerMessage::BlockRequestByHeight(block_height) => {
16✔
886
                let block_response = {
15✔
887
                    log_slow_scope!(fn_name!() + "::PeerMessage::BlockRequestByHeight");
16✔
888

889
                    debug!("Got BlockRequestByHeight of height {}", block_height);
16✔
890

891
                    let canonical_block_digest = self
16✔
892
                        .global_state_lock
16✔
893
                        .lock_guard()
16✔
894
                        .await
16✔
895
                        .chain
896
                        .archival_state()
16✔
897
                        .archival_block_mmr
898
                        .ammr()
16✔
899
                        .try_get_leaf(block_height.into())
16✔
900
                        .await;
16✔
901

902
                    let Some(canonical_block_digest) = canonical_block_digest else {
16✔
903
                        let own_tip_height = self
1✔
904
                            .global_state_lock
1✔
905
                            .lock_guard()
1✔
906
                            .await
1✔
907
                            .chain
908
                            .light_state()
1✔
909
                            .header()
1✔
910
                            .height;
911
                        warn!("Got block request by height ({block_height}) for unknown block. Own tip height is {own_tip_height}.");
1✔
912
                        self.punish(NegativePeerSanction::BlockRequestUnknownHeight)
1✔
913
                            .await?;
1✔
914

915
                        return Ok(KEEP_CONNECTION_ALIVE);
1✔
916
                    };
917

918
                    let canonical_chain_block: Block = self
15✔
919
                        .global_state_lock
15✔
920
                        .lock_guard()
15✔
921
                        .await
15✔
922
                        .chain
923
                        .archival_state()
15✔
924
                        .get_block(canonical_block_digest)
15✔
925
                        .await?
15✔
926
                        .unwrap();
15✔
927

928
                    PeerMessage::Block(Box::new(canonical_chain_block.try_into().unwrap()))
15✔
929
                };
930

931
                debug!("Sending block");
15✔
932
                peer.send(block_response).await?;
15✔
933
                debug!("Sent block");
15✔
934
                Ok(KEEP_CONNECTION_ALIVE)
15✔
935
            }
936
            PeerMessage::Block(t_block) => {
32✔
937
                log_slow_scope!(fn_name!() + "::PeerMessage::Block");
32✔
938

939
                info!(
32✔
940
                    "Got new block from peer {}, height {}, mined {}",
×
941
                    self.peer_address,
942
                    t_block.header.height,
943
                    t_block.header.timestamp.standard_format()
×
944
                );
945
                let new_block_height = t_block.header.height;
32✔
946

947
                let block = match Block::try_from(*t_block) {
32✔
948
                    Ok(block) => Box::new(block),
32✔
949
                    Err(e) => {
×
950
                        warn!("Peer sent invalid block: {e:?}");
×
951
                        self.punish(NegativePeerSanction::InvalidTransferBlock)
×
952
                            .await?;
×
953

954
                        return Ok(KEEP_CONNECTION_ALIVE);
×
955
                    }
956
                };
957

958
                // Update the value for the highest known height that peer possesses iff
959
                // we are not in a fork reconciliation state.
960
                if peer_state_info.fork_reconciliation_blocks.is_empty() {
32✔
961
                    peer_state_info.highest_shared_block_height = new_block_height;
22✔
962
                }
22✔
963

964
                self.try_ensure_path(block, peer, peer_state_info).await?;
32✔
965

966
                // Reward happens as part of `try_ensure_path`
967

968
                Ok(KEEP_CONNECTION_ALIVE)
31✔
969
            }
970
            PeerMessage::BlockRequestBatch(BlockRequestBatch {
971
                known_blocks,
8✔
972
                max_response_len,
8✔
973
                anchor,
8✔
974
            }) => {
975
                debug!(
8✔
976
                    "Received BlockRequestBatch from peer {}, max_response_len: {max_response_len}",
×
977
                    self.peer_address
978
                );
979

980
                if known_blocks.len() > MAX_NUM_DIGESTS_IN_BATCH_REQUEST {
8✔
981
                    self.punish(NegativePeerSanction::BatchBlocksRequestTooManyDigests)
×
982
                        .await?;
×
983

984
                    return Ok(KEEP_CONNECTION_ALIVE);
×
985
                }
8✔
986

987
                // The last block in the list of the peers known block is the
988
                // earliest block, block with lowest height, the peer has
989
                // requested. If it does not belong to canonical chain, none of
990
                // the later will. So we can do an early abort in that case.
991
                let least_preferred = match known_blocks.last() {
8✔
992
                    Some(least_preferred) => *least_preferred,
8✔
993
                    None => {
994
                        self.punish(NegativePeerSanction::BatchBlocksRequestEmpty)
×
995
                            .await?;
×
996

997
                        return Ok(KEEP_CONNECTION_ALIVE);
×
998
                    }
999
                };
1000

1001
                let state = self.global_state_lock.lock_guard().await;
8✔
1002
                let block_mmr_num_leafs = state.chain.light_state().header().height.next().into();
8✔
1003
                let luca_is_known = state
8✔
1004
                    .chain
8✔
1005
                    .archival_state()
8✔
1006
                    .block_belongs_to_canonical_chain(least_preferred)
8✔
1007
                    .await;
8✔
1008
                if !luca_is_known || anchor.num_leafs() > block_mmr_num_leafs {
8✔
1009
                    drop(state);
×
1010
                    self.punish(NegativePeerSanction::BatchBlocksUnknownRequest)
×
1011
                        .await?;
×
1012
                    peer.send(PeerMessage::UnableToSatisfyBatchRequest).await?;
×
1013

1014
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1015
                }
8✔
1016

1017
                // Happy case: At least *one* of the blocks referenced by peer
1018
                // is known to us.
1019
                let first_block_in_response = {
8✔
1020
                    let mut first_block_in_response: Option<BlockHeight> = None;
8✔
1021
                    for block_digest in known_blocks {
10✔
1022
                        if state
10✔
1023
                            .chain
10✔
1024
                            .archival_state()
10✔
1025
                            .block_belongs_to_canonical_chain(block_digest)
10✔
1026
                            .await
10✔
1027
                        {
1028
                            let height = state
8✔
1029
                                .chain
8✔
1030
                                .archival_state()
8✔
1031
                                .get_block_header(block_digest)
8✔
1032
                                .await
8✔
1033
                                .unwrap()
8✔
1034
                                .height;
1035
                            first_block_in_response = Some(height);
8✔
1036
                            debug!(
8✔
1037
                                "Found block in canonical chain for batch response: {}",
×
1038
                                block_digest
1039
                            );
1040
                            break;
8✔
1041
                        }
2✔
1042
                    }
1043

1044
                    first_block_in_response
8✔
1045
                        .expect("existence of LUCA should have been established already.")
8✔
1046
                };
1047

1048
                debug!(
8✔
1049
                    "Peer's most preferred block has height {first_block_in_response}.\
×
1050
                 Now building response from that height."
×
1051
                );
1052

1053
                // Get the relevant blocks, at most batch-size many, descending from the
1054
                // peer's (alleged) most canonical block. Don't exceed `max_response_len`
1055
                // or `STANDARD_BLOCK_BATCH_SIZE` number of blocks in response.
1056
                let max_response_len = cmp::min(
8✔
1057
                    max_response_len,
8✔
1058
                    self.global_state_lock.cli().sync_mode_threshold,
8✔
1059
                );
1060
                let max_response_len = cmp::max(max_response_len, MINIMUM_BLOCK_BATCH_SIZE);
8✔
1061
                let max_response_len = cmp::min(max_response_len, STANDARD_BLOCK_BATCH_SIZE);
8✔
1062

1063
                let mut digests_of_returned_blocks = Vec::with_capacity(max_response_len);
8✔
1064
                let response_start_height: u64 = first_block_in_response.into();
8✔
1065
                let mut i: u64 = 1;
8✔
1066
                while digests_of_returned_blocks.len() < max_response_len {
31✔
1067
                    let block_height = response_start_height + i;
31✔
1068
                    match state
31✔
1069
                        .chain
31✔
1070
                        .archival_state()
31✔
1071
                        .archival_block_mmr
31✔
1072
                        .ammr()
31✔
1073
                        .try_get_leaf(block_height)
31✔
1074
                        .await
31✔
1075
                    {
1076
                        Some(digest) => {
23✔
1077
                            digests_of_returned_blocks.push(digest);
23✔
1078
                        }
23✔
1079
                        None => break,
8✔
1080
                    }
1081
                    i += 1;
23✔
1082
                }
1083

1084
                let mut returned_blocks: Vec<Block> =
8✔
1085
                    Vec::with_capacity(digests_of_returned_blocks.len());
8✔
1086
                for block_digest in digests_of_returned_blocks {
31✔
1087
                    let block = state
23✔
1088
                        .chain
23✔
1089
                        .archival_state()
23✔
1090
                        .get_block(block_digest)
23✔
1091
                        .await?
23✔
1092
                        .unwrap();
23✔
1093
                    returned_blocks.push(block);
23✔
1094
                }
1095

1096
                let response = Self::batch_response(&state, returned_blocks, &anchor).await;
8✔
1097

1098
                // issue 457. do not hold lock across a peer.send(), nor self.punish()
1099
                drop(state);
8✔
1100

1101
                let Some(response) = response else {
8✔
1102
                    warn!("Unable to satisfy batch-block request");
×
1103
                    self.punish(NegativePeerSanction::BatchBlocksUnknownRequest)
×
1104
                        .await?;
×
1105
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1106
                };
1107

1108
                debug!("Returning {} blocks in batch response", response.len());
8✔
1109

1110
                let response = PeerMessage::BlockResponseBatch(response);
8✔
1111
                peer.send(response).await?;
8✔
1112

1113
                Ok(KEEP_CONNECTION_ALIVE)
8✔
1114
            }
1115
            PeerMessage::BlockResponseBatch(authenticated_blocks) => {
×
1116
                log_slow_scope!(fn_name!() + "::PeerMessage::BlockResponseBatch");
×
1117

1118
                debug!(
×
1119
                    "handling block response batch with {} blocks",
×
1120
                    authenticated_blocks.len()
×
1121
                );
1122

1123
                // (Alan:) why is there even a minimum?
1124
                if authenticated_blocks.len() < MINIMUM_BLOCK_BATCH_SIZE {
×
1125
                    warn!("Got smaller batch response than allowed");
×
1126
                    self.punish(NegativePeerSanction::TooShortBlockBatch)
×
1127
                        .await?;
×
1128
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1129
                }
×
1130

1131
                // Verify that we are in fact in syncing mode
1132
                // TODO: Separate peer messages into those allowed under syncing
1133
                // and those that are not
1134
                let Some(sync_anchor) = self
×
1135
                    .global_state_lock
×
1136
                    .lock_guard()
×
1137
                    .await
×
1138
                    .net
1139
                    .sync_anchor
1140
                    .clone()
×
1141
                else {
1142
                    warn!("Received a batch of blocks without being in syncing mode");
×
1143
                    self.punish(NegativePeerSanction::ReceivedBatchBlocksOutsideOfSync)
×
1144
                        .await?;
×
1145
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1146
                };
1147

1148
                // Verify that the response matches the current state
1149
                // We get the latest block from the DB here since this message is
1150
                // only valid for archival nodes.
1151
                let (first_block, _) = &authenticated_blocks[0];
×
1152
                let first_blocks_parent_digest: Digest = first_block.header.prev_block_digest;
×
1153
                let most_canonical_own_block_match: Option<Block> = self
×
1154
                    .global_state_lock
×
1155
                    .lock_guard()
×
1156
                    .await
×
1157
                    .chain
1158
                    .archival_state()
×
1159
                    .get_block(first_blocks_parent_digest)
×
1160
                    .await
×
1161
                    .expect("Block lookup must succeed");
×
1162
                let most_canonical_own_block_match: Block = match most_canonical_own_block_match {
×
1163
                    Some(block) => block,
×
1164
                    None => {
1165
                        warn!("Got batch response with invalid start block");
×
1166
                        self.punish(NegativePeerSanction::BatchBlocksInvalidStartHeight)
×
1167
                            .await?;
×
1168
                        return Ok(KEEP_CONNECTION_ALIVE);
×
1169
                    }
1170
                };
1171

1172
                // Convert all blocks to Block objects
1173
                debug!(
×
1174
                    "Found own block of height {} to match received batch",
×
1175
                    most_canonical_own_block_match.kernel.header.height
×
1176
                );
1177
                let mut received_blocks = vec![];
×
1178
                for (t_block, membership_proof) in authenticated_blocks {
×
1179
                    let Ok(block) = Block::try_from(t_block) else {
×
1180
                        warn!("Received invalid transfer block from peer");
×
1181
                        self.punish(NegativePeerSanction::InvalidTransferBlock)
×
1182
                            .await?;
×
1183
                        return Ok(KEEP_CONNECTION_ALIVE);
×
1184
                    };
1185

1186
                    if !membership_proof.verify(
×
1187
                        block.header().height.into(),
×
1188
                        block.hash(),
×
1189
                        &sync_anchor.block_mmr.peaks(),
×
1190
                        sync_anchor.block_mmr.num_leafs(),
×
1191
                    ) {
×
1192
                        warn!("Authentication of received block fails relative to anchor");
×
1193
                        self.punish(NegativePeerSanction::InvalidBlockMmrAuthentication)
×
1194
                            .await?;
×
1195
                        return Ok(KEEP_CONNECTION_ALIVE);
×
1196
                    }
×
1197

1198
                    received_blocks.push(block);
×
1199
                }
1200

1201
                // Get the latest block that we know of and handle all received blocks
1202
                self.handle_blocks(received_blocks, most_canonical_own_block_match)
×
1203
                    .await?;
×
1204

1205
                // Reward happens as part of `handle_blocks`.
1206

1207
                Ok(KEEP_CONNECTION_ALIVE)
×
1208
            }
1209
            PeerMessage::UnableToSatisfyBatchRequest => {
1210
                log_slow_scope!(fn_name!() + "::PeerMessage::UnableToSatisfyBatchRequest");
×
1211
                warn!(
×
1212
                    "Peer {} reports inability to satisfy batch request.",
×
1213
                    self.peer_address
1214
                );
1215

1216
                Ok(KEEP_CONNECTION_ALIVE)
×
1217
            }
1218
            PeerMessage::Handshake(_) => {
1219
                log_slow_scope!(fn_name!() + "::PeerMessage::Handshake");
×
1220

1221
                // The handshake should have been sent during connection
1222
                // initialization. Here it is out of order at best, malicious at
1223
                // worst.
1224
                self.punish(NegativePeerSanction::InvalidMessage).await?;
×
1225
                Ok(KEEP_CONNECTION_ALIVE)
×
1226
            }
1227
            PeerMessage::ConnectionStatus(_) => {
1228
                log_slow_scope!(fn_name!() + "::PeerMessage::ConnectionStatus");
×
1229

1230
                // The connection status should have been sent during connection
1231
                // initialization. Here it is out of order at best, malicious at
1232
                // worst.
1233

1234
                self.punish(NegativePeerSanction::InvalidMessage).await?;
×
1235
                Ok(KEEP_CONNECTION_ALIVE)
×
1236
            }
1237
            PeerMessage::Transaction(transaction) => {
7✔
1238
                log_slow_scope!(fn_name!() + "::PeerMessage::Transaction");
7✔
1239

1240
                debug!(
7✔
1241
                    "`peer_loop` received following transaction from peer. {} inputs, {} outputs. Synced to mutator set hash: {}",
×
1242
                    transaction.kernel.inputs.len(),
×
1243
                    transaction.kernel.outputs.len(),
×
1244
                    transaction.kernel.mutator_set_hash
×
1245
                );
1246

1247
                let transaction: Transaction = (*transaction).into();
7✔
1248

1249
                // 1. If transaction is invalid, punish.
1250
                if !transaction
7✔
1251
                    .is_valid(self.global_state_lock.cli().network)
7✔
1252
                    .await
7✔
1253
                {
1254
                    warn!("Received invalid tx");
×
1255
                    self.punish(NegativePeerSanction::InvalidTransaction)
×
1256
                        .await?;
×
1257
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1258
                }
7✔
1259

1260
                // 2. If transaction has coinbase, punish.
1261
                // Transactions received from peers have not been mined yet.
1262
                // Only the miner is allowed to produce transactions with non-empty coinbase fields.
1263
                if transaction.kernel.coinbase.is_some() {
7✔
1264
                    warn!("Received non-mined transaction with coinbase.");
×
1265
                    self.punish(NegativePeerSanction::NonMinedTransactionHasCoinbase)
×
1266
                        .await?;
×
1267
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1268
                }
7✔
1269

1270
                // 3. If negative fee, punish.
1271
                if transaction.kernel.fee.is_negative() {
7✔
1272
                    warn!("Received negative-fee transaction.");
×
1273
                    self.punish(NegativePeerSanction::TransactionWithNegativeFee)
×
1274
                        .await?;
×
1275
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1276
                }
7✔
1277

1278
                // 4. If transaction is already known, ignore.
1279
                if self
7✔
1280
                    .global_state_lock
7✔
1281
                    .lock_guard()
7✔
1282
                    .await
7✔
1283
                    .mempool
1284
                    .contains_with_higher_proof_quality(
7✔
1285
                        transaction.kernel.txid(),
7✔
1286
                        transaction.proof.proof_quality()?,
7✔
1287
                        transaction.kernel.mutator_set_hash,
7✔
1288
                    )
1289
                {
1290
                    warn!("Received transaction that was already known");
×
1291

1292
                    // We received a transaction that we *probably* haven't requested.
1293
                    // Consider punishing here, if this is abused.
1294
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1295
                }
7✔
1296

1297
                // 5. if transaction is not confirmable, punish.
1298
                let (tip, mutator_set_accumulator_after) = {
7✔
1299
                    let state = self.global_state_lock.lock_guard().await;
7✔
1300

1301
                    (
7✔
1302
                        state.chain.light_state().hash(),
7✔
1303
                        state
7✔
1304
                            .chain
7✔
1305
                            .light_state()
7✔
1306
                            .mutator_set_accumulator_after()
7✔
1307
                            .expect("Block from state must have mutator set after"),
7✔
1308
                    )
7✔
1309
                };
1310
                if !transaction.is_confirmable_relative_to(&mutator_set_accumulator_after) {
7✔
1311
                    warn!(
×
1312
                        "Received unconfirmable transaction with TXID {}. Unconfirmable because:",
×
1313
                        transaction.kernel.txid()
×
1314
                    );
1315
                    // get fine-grained error code for informative logging
1316
                    let confirmability_error_code = transaction
×
1317
                        .kernel
×
1318
                        .is_confirmable_relative_to(&mutator_set_accumulator_after);
×
1319
                    match confirmability_error_code {
×
1320
                        Ok(_) => unreachable!(),
×
1321
                        Err(TransactionConfirmabilityError::InvalidRemovalRecord(index)) => {
×
1322
                            warn!("invalid removal record (at index {index})");
×
1323
                            let invalid_removal_record = transaction.kernel.inputs[index].clone();
×
1324
                            let removal_record_error_code = invalid_removal_record
×
1325
                                .validate_inner(&mutator_set_accumulator_after);
×
1326
                            debug!(
×
1327
                                "Absolute index set of removal record {index}: {:?}",
×
1328
                                invalid_removal_record.absolute_indices
1329
                            );
1330
                            match removal_record_error_code {
×
1331
                                Ok(_) => unreachable!(),
×
1332
                                Err(RemovalRecordValidityError::AbsentAuthenticatedChunk) => {
1333
                                    debug!("invalid because membership proof is missing");
×
1334
                                }
1335
                                Err(RemovalRecordValidityError::InvalidSwbfiMmrMp {
1336
                                    chunk_index,
×
1337
                                }) => {
1338
                                    debug!("invalid because membership proof for chunk index {chunk_index} is invalid");
×
1339
                                }
1340
                            };
1341
                            self.punish(NegativePeerSanction::UnconfirmableTransaction)
×
1342
                                .await?;
×
1343
                            return Ok(KEEP_CONNECTION_ALIVE);
×
1344
                        }
1345
                        Err(TransactionConfirmabilityError::DuplicateInputs) => {
1346
                            warn!("duplicate inputs");
×
1347
                            self.punish(NegativePeerSanction::DoubleSpendingTransaction)
×
1348
                                .await?;
×
1349
                            return Ok(KEEP_CONNECTION_ALIVE);
×
1350
                        }
1351
                        Err(TransactionConfirmabilityError::AlreadySpentInput(index)) => {
×
1352
                            warn!("already spent input (at index {index})");
×
1353
                            self.punish(NegativePeerSanction::DoubleSpendingTransaction)
×
1354
                                .await?;
×
1355
                            return Ok(KEEP_CONNECTION_ALIVE);
×
1356
                        }
1357
                    };
1358
                }
7✔
1359

1360
                // If transaction cannot be applied to mutator set, punish.
1361
                // I don't think this can happen when above checks pass but we include
1362
                // the check to ensure that transaction can be applied.
1363
                let ms_update = MutatorSetUpdate::new(
7✔
1364
                    transaction.kernel.inputs.clone(),
7✔
1365
                    transaction.kernel.outputs.clone(),
7✔
1366
                );
1367
                let can_apply = ms_update
7✔
1368
                    .apply_to_accumulator(&mut mutator_set_accumulator_after.clone())
7✔
1369
                    .is_ok();
7✔
1370
                if !can_apply {
7✔
1371
                    warn!("Cannot apply transaction to current mutator set.");
×
1372
                    warn!("Transaction ID: {}", transaction.kernel.txid());
×
1373
                    self.punish(NegativePeerSanction::CannotApplyTransactionToMutatorSet)
×
1374
                        .await?;
×
1375
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1376
                }
7✔
1377

1378
                let tx_timestamp = transaction.kernel.timestamp;
7✔
1379

1380
                // 6. Ignore if transaction is too old
1381
                let now = self.now();
7✔
1382
                if tx_timestamp < now - Timestamp::seconds(MEMPOOL_TX_THRESHOLD_AGE_IN_SECS) {
7✔
1383
                    // TODO: Consider punishing here
1384
                    warn!("Received too old tx");
×
1385
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1386
                }
7✔
1387

1388
                // 7. Ignore if transaction is too far into the future
1389
                if tx_timestamp
7✔
1390
                    > now + Timestamp::seconds(MEMPOOL_IGNORE_TRANSACTIONS_THIS_MANY_SECS_AHEAD)
7✔
1391
                {
1392
                    // TODO: Consider punishing here
1393
                    warn!("Received tx too far into the future. Got timestamp: {tx_timestamp:?}");
×
1394
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1395
                }
7✔
1396

1397
                // Otherwise, relay to main
1398
                let pt2m_transaction = PeerTaskToMainTransaction {
7✔
1399
                    transaction,
7✔
1400
                    confirmable_for_block: tip,
7✔
1401
                };
7✔
1402
                self.to_main_tx
7✔
1403
                    .send(PeerTaskToMain::Transaction(Box::new(pt2m_transaction)))
7✔
1404
                    .await?;
7✔
1405

1406
                Ok(KEEP_CONNECTION_ALIVE)
7✔
1407
            }
1408
            PeerMessage::TransactionNotification(tx_notification) => {
14✔
1409
                // addresses #457
1410
                // new scope for state read-lock to avoid holding across peer.send()
1411
                {
1412
                    log_slow_scope!(fn_name!() + "::PeerMessage::TransactionNotification");
14✔
1413

1414
                    // 1. Ignore if we already know this transaction, and
1415
                    // the proof quality is not higher than what we already know.
1416
                    let state = self.global_state_lock.lock_guard().await;
14✔
1417
                    let transaction_of_same_or_higher_proof_quality_is_known =
14✔
1418
                        state.mempool.contains_with_higher_proof_quality(
14✔
1419
                            tx_notification.txid,
14✔
1420
                            tx_notification.proof_quality,
14✔
1421
                            tx_notification.mutator_set_hash,
14✔
1422
                        );
1423
                    if transaction_of_same_or_higher_proof_quality_is_known {
14✔
1424
                        debug!("transaction with same or higher proof quality was already known");
7✔
1425
                        return Ok(KEEP_CONNECTION_ALIVE);
7✔
1426
                    }
7✔
1427

1428
                    // Only accept transactions that do not require executing
1429
                    // `update`.
1430
                    if state
7✔
1431
                        .chain
7✔
1432
                        .light_state()
7✔
1433
                        .mutator_set_accumulator_after()
7✔
1434
                        .expect("Block from state must have mutator set after")
7✔
1435
                        .hash()
7✔
1436
                        != tx_notification.mutator_set_hash
7✔
1437
                    {
1438
                        debug!("transaction refers to non-canonical mutator set state");
×
1439
                        return Ok(KEEP_CONNECTION_ALIVE);
×
1440
                    }
7✔
1441
                }
1442

1443
                // 2. Request the actual `Transaction` from peer
1444
                debug!("requesting transaction from peer");
7✔
1445
                peer.send(PeerMessage::TransactionRequest(tx_notification.txid))
7✔
1446
                    .await?;
7✔
1447

1448
                Ok(KEEP_CONNECTION_ALIVE)
7✔
1449
            }
1450
            PeerMessage::TransactionRequest(transaction_identifier) => {
5✔
1451
                let state = self.global_state_lock.lock_guard().await;
5✔
1452
                let Some(transaction) = state.mempool.get(transaction_identifier) else {
5✔
1453
                    return Ok(KEEP_CONNECTION_ALIVE);
1✔
1454
                };
1455

1456
                let Ok(transfer_transaction) = transaction.try_into() else {
4✔
1457
                    warn!("Peer requested transaction that cannot be converted to transfer object");
×
1458
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1459
                };
1460

1461
                // Drop state immediately to prevent holding over a response.
1462
                drop(state);
4✔
1463

1464
                peer.send(PeerMessage::Transaction(Box::new(transfer_transaction)))
4✔
1465
                    .await?;
4✔
1466

1467
                Ok(KEEP_CONNECTION_ALIVE)
4✔
1468
            }
1469
            PeerMessage::BlockProposalNotification(block_proposal_notification) => {
1✔
1470
                let verdict = self
1✔
1471
                    .global_state_lock
1✔
1472
                    .lock_guard()
1✔
1473
                    .await
1✔
1474
                    .favor_incoming_block_proposal_legacy(
1✔
1475
                        block_proposal_notification.height,
1✔
1476
                        block_proposal_notification.guesser_fee,
1✔
1477
                    );
1478
                match verdict {
1✔
1479
                    Ok(_) => {
1480
                        peer.send(PeerMessage::BlockProposalRequest(
1✔
1481
                            BlockProposalRequest::new(block_proposal_notification.body_mast_hash),
1✔
1482
                        ))
1✔
1483
                        .await?
1✔
1484
                    }
1485
                    Err(reject_reason) => {
×
1486
                        info!(
×
1487
                        "Rejecting notification of block proposal with guesser fee {} from peer \
×
1488
                        {}. Reason:\n{reject_reason}",
×
1489
                        block_proposal_notification.guesser_fee.display_n_decimals(5),
×
1490
                        self.peer_address
1491
                    )
1492
                    }
1493
                }
1494

1495
                Ok(KEEP_CONNECTION_ALIVE)
1✔
1496
            }
1497
            PeerMessage::BlockProposalRequest(block_proposal_request) => {
×
1498
                let matching_proposal = self
×
1499
                    .global_state_lock
×
1500
                    .lock_guard()
×
1501
                    .await
×
1502
                    .mining_state
1503
                    .block_proposal
1504
                    .filter(|x| x.body().mast_hash() == block_proposal_request.body_mast_hash)
×
1505
                    .map(|x| x.to_owned());
×
1506
                if let Some(proposal) = matching_proposal {
×
1507
                    peer.send(PeerMessage::BlockProposal(Box::new(proposal)))
×
1508
                        .await?;
×
1509
                } else {
1510
                    self.punish(NegativePeerSanction::BlockProposalNotFound)
×
1511
                        .await?;
×
1512
                }
1513

1514
                Ok(KEEP_CONNECTION_ALIVE)
×
1515
            }
1516
            PeerMessage::BlockProposal(new_proposal) => {
1✔
1517
                info!("Got block proposal from peer.");
1✔
1518

1519
                // Is the proposal valid?
1520
                // Lock needs to be held here because race conditions: otherwise
1521
                // the block proposal that was validated might not match with
1522
                // the one whose favorability is being computed.
1523
                let state = self.global_state_lock.lock_guard().await;
1✔
1524
                let tip = state.chain.light_state();
1✔
1525
                let proposal_is_valid = new_proposal
1✔
1526
                    .is_valid(tip, self.now(), self.global_state_lock.cli().network)
1✔
1527
                    .await;
1✔
1528
                if !proposal_is_valid {
1✔
1529
                    drop(state);
×
1530
                    self.punish(NegativePeerSanction::InvalidBlockProposal)
×
1531
                        .await?;
×
1532
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1533
                }
1✔
1534

1535
                // Is block proposal favorable?
1536
                let is_favorable = state.favor_incoming_block_proposal(
1✔
1537
                    new_proposal.header().prev_block_digest,
1✔
1538
                    new_proposal
1✔
1539
                        .total_guesser_reward()
1✔
1540
                        .expect("Block was validated"),
1✔
1541
                );
1✔
1542
                drop(state);
1✔
1543

1544
                if let Err(rejection_reason) = is_favorable {
1✔
1545
                    match rejection_reason {
×
1546
                        // no need to punish and log if the fees are equal.  we just ignore the incoming proposal.
1547
                        BlockProposalRejectError::InsufficientFee { current, received }
×
1548
                            if Some(received) == current =>
×
1549
                        {
1550
                            debug!("ignoring new block proposal because the fee is equal to the present one");
×
1551
                            return Ok(KEEP_CONNECTION_ALIVE);
×
1552
                        }
1553
                        _ => {
1554
                            warn!("Rejecting new block proposal:\n{rejection_reason}");
×
1555
                            self.punish(NegativePeerSanction::NonFavorableBlockProposal)
×
1556
                                .await?;
×
1557
                            return Ok(KEEP_CONNECTION_ALIVE);
×
1558
                        }
1559
                    }
1560
                };
1✔
1561

1562
                self.send_to_main(PeerTaskToMain::BlockProposal(new_proposal), line!())
1✔
1563
                    .await?;
1✔
1564

1565
                // Valuable, new, hard-to-produce information. Reward peer.
1566
                self.reward(PositivePeerSanction::NewBlockProposal).await?;
1✔
1567

1568
                Ok(KEEP_CONNECTION_ALIVE)
1✔
1569
            }
1570
        }
1571
    }
153✔
1572

1573
    /// send msg to main via mpsc channel `to_main_tx` and logs if slow.
1574
    ///
1575
    /// the channel could potentially fill up in which case the send() will
1576
    /// block until there is capacity.  we wrap the send() so we can log if
1577
    /// that ever happens to the extent it passes slow-scope threshold.
1578
    async fn send_to_main(
1✔
1579
        &self,
1✔
1580
        msg: PeerTaskToMain,
1✔
1581
        line: u32,
1✔
1582
    ) -> Result<(), tokio::sync::mpsc::error::SendError<PeerTaskToMain>> {
1✔
1583
        // we measure across the send() in case the channel ever fills up.
1584
        log_slow_scope!(fn_name!() + &format!("peer_loop.rs:{}", line));
1✔
1585

1586
        self.to_main_tx.send(msg).await
1✔
1587
    }
1✔
1588

1589
    /// Handle message from main task. The boolean return value indicates if
1590
    /// the connection should be closed.
1591
    ///
1592
    /// Locking:
1593
    ///   * acquires `global_state_lock` for write via Self::punish()
1594
    async fn handle_main_task_message<S>(
29✔
1595
        &mut self,
29✔
1596
        msg: MainToPeerTask,
29✔
1597
        peer: &mut S,
29✔
1598
        peer_state_info: &mut MutablePeerState,
29✔
1599
    ) -> Result<bool>
29✔
1600
    where
29✔
1601
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
29✔
1602
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
29✔
1603
        <S as TryStream>::Error: std::error::Error,
29✔
1604
    {
29✔
1605
        debug!("Handling {} message from main in peer loop", msg.get_type());
29✔
1606
        match msg {
29✔
1607
            MainToPeerTask::Block(block) => {
23✔
1608
                // We don't currently differentiate whether a new block came from a peer, or from our
1609
                // own miner. It's always shared through this logic.
1610
                let new_block_height = block.kernel.header.height;
23✔
1611
                if new_block_height > peer_state_info.highest_shared_block_height {
23✔
1612
                    debug!("Sending PeerMessage::BlockNotification");
12✔
1613
                    peer_state_info.highest_shared_block_height = new_block_height;
12✔
1614
                    peer.send(PeerMessage::BlockNotification(block.as_ref().into()))
12✔
1615
                        .await?;
12✔
1616
                    debug!("Sent PeerMessage::BlockNotification");
12✔
1617
                }
11✔
1618
                Ok(KEEP_CONNECTION_ALIVE)
23✔
1619
            }
1620
            MainToPeerTask::RequestBlockBatch(batch_block_request) => {
×
1621
                // Only ask one of the peers about the batch of blocks
1622
                if batch_block_request.peer_addr_target != self.peer_address {
×
1623
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1624
                }
×
1625

1626
                let max_response_len = std::cmp::min(
×
1627
                    STANDARD_BLOCK_BATCH_SIZE,
1628
                    self.global_state_lock.cli().sync_mode_threshold,
×
1629
                );
1630

1631
                peer.send(PeerMessage::BlockRequestBatch(BlockRequestBatch {
×
1632
                    known_blocks: batch_block_request.known_blocks,
×
1633
                    max_response_len,
×
1634
                    anchor: batch_block_request.anchor_mmr,
×
1635
                }))
×
1636
                .await?;
×
1637

1638
                Ok(KEEP_CONNECTION_ALIVE)
×
1639
            }
1640
            MainToPeerTask::PeerSynchronizationTimeout(socket_addr) => {
×
1641
                log_slow_scope!(fn_name!() + "::MainToPeerTask::PeerSynchronizationTimeout");
×
1642

1643
                if self.peer_address != socket_addr {
×
1644
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1645
                }
×
1646

1647
                self.punish(NegativePeerSanction::SynchronizationTimeout)
×
1648
                    .await?;
×
1649

1650
                // If this peer failed the last synchronization attempt, we only
1651
                // sanction, we don't disconnect.
1652
                Ok(KEEP_CONNECTION_ALIVE)
×
1653
            }
1654
            MainToPeerTask::MakePeerDiscoveryRequest => {
1655
                peer.send(PeerMessage::PeerListRequest).await?;
×
1656
                Ok(KEEP_CONNECTION_ALIVE)
×
1657
            }
1658
            MainToPeerTask::Disconnect(peer_address) => {
×
1659
                log_slow_scope!(fn_name!() + "::MainToPeerTask::Disconnect");
×
1660

1661
                // Only disconnect from the peer the main task requested a disconnect for.
1662
                if peer_address != self.peer_address {
×
1663
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1664
                }
×
1665
                self.register_peer_disconnection().await;
×
1666

1667
                Ok(DISCONNECT_CONNECTION)
×
1668
            }
1669
            MainToPeerTask::DisconnectAll() => {
1670
                self.register_peer_disconnection().await;
×
1671

1672
                Ok(DISCONNECT_CONNECTION)
×
1673
            }
1674
            MainToPeerTask::MakeSpecificPeerDiscoveryRequest(target_socket_addr) => {
×
1675
                if target_socket_addr == self.peer_address {
×
1676
                    peer.send(PeerMessage::PeerListRequest).await?;
×
1677
                }
×
1678
                Ok(KEEP_CONNECTION_ALIVE)
×
1679
            }
1680
            MainToPeerTask::TransactionNotification(transaction_notification) => {
6✔
1681
                debug!("Sending PeerMessage::TransactionNotification");
6✔
1682
                peer.send(PeerMessage::TransactionNotification(
6✔
1683
                    transaction_notification,
6✔
1684
                ))
6✔
1685
                .await?;
6✔
1686
                debug!("Sent PeerMessage::TransactionNotification");
6✔
1687
                Ok(KEEP_CONNECTION_ALIVE)
6✔
1688
            }
1689
            MainToPeerTask::BlockProposalNotification(block_proposal_notification) => {
×
1690
                debug!("Sending PeerMessage::BlockProposalNotification");
×
1691
                peer.send(PeerMessage::BlockProposalNotification(
×
1692
                    block_proposal_notification,
×
1693
                ))
×
1694
                .await?;
×
1695
                debug!("Sent PeerMessage::BlockProposalNotification");
×
1696
                Ok(KEEP_CONNECTION_ALIVE)
×
1697
            }
1698
        }
1699
    }
29✔
1700

1701
    /// Loop for the peer tasks. Awaits either a message from the peer over TCP,
1702
    /// or a message from main over the main-to-peer-tasks broadcast channel.
1703
    async fn run<S>(
49✔
1704
        &mut self,
49✔
1705
        mut peer: S,
49✔
1706
        mut from_main_rx: broadcast::Receiver<MainToPeerTask>,
49✔
1707
        peer_state_info: &mut MutablePeerState,
49✔
1708
    ) -> Result<()>
49✔
1709
    where
49✔
1710
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
49✔
1711
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
49✔
1712
        <S as TryStream>::Error: std::error::Error,
49✔
1713
    {
49✔
1714
        loop {
1715
            select! {
188✔
1716
                // Handle peer messages
1717
                peer_message = peer.try_next() => {
188✔
1718
                    let peer_address = self.peer_address;
153✔
1719
                    let peer_message = match peer_message {
153✔
1720
                        Ok(message) => message,
153✔
1721
                        Err(err) => {
×
1722
                            let msg = format!("Error when receiving from peer: {peer_address}");
×
1723
                            error!("{msg}. Error: {err}");
×
1724
                            bail!("{msg}. Closing connection.");
×
1725
                        }
1726
                    };
1727
                    let Some(peer_message) = peer_message else {
153✔
1728
                        info!("Peer {peer_address} closed connection.");
×
1729
                        break;
×
1730
                    };
1731

1732
                    let syncing =
153✔
1733
                        self.global_state_lock.lock(|s| s.net.sync_anchor.is_some()).await;
153✔
1734
                    let message_type = peer_message.get_type();
153✔
1735
                    if peer_message.ignore_during_sync() && syncing {
153✔
1736
                        debug!(
×
1737
                            "Ignoring {message_type} message when syncing, from {peer_address}",
×
1738
                        );
1739
                        continue;
×
1740
                    }
153✔
1741
                    if peer_message.ignore_when_not_sync() && !syncing {
153✔
1742
                        debug!(
×
1743
                            "Ignoring {message_type} message when not syncing, from {peer_address}",
×
1744
                        );
1745
                        continue;
×
1746
                    }
153✔
1747

1748
                    match self
153✔
1749
                        .handle_peer_message(peer_message, &mut peer, peer_state_info)
153✔
1750
                        .await
153✔
1751
                    {
1752
                        Ok(false) => {}
110✔
1753
                        Ok(true) => {
1754
                            info!("Closing connection to {peer_address}");
42✔
1755
                            break;
42✔
1756
                        }
1757
                        Err(err) => {
1✔
1758
                            warn!("Closing connection to {peer_address} because of error {err}.");
1✔
1759
                            bail!("{err}");
1✔
1760
                        }
1761
                    };
1762
                }
1763

1764
                // Handle messages from main task
1765
                main_msg_res = from_main_rx.recv() => {
188✔
1766
                    let main_msg = main_msg_res.unwrap_or_else(|err| {
29✔
1767
                        let err_msg = format!("Failed to read from main loop: {err}");
×
1768
                        error!(err_msg);
×
1769
                        panic!("{err_msg}");
×
1770
                    });
1771
                    let close_connection = self
29✔
1772
                        .handle_main_task_message(main_msg, &mut peer, peer_state_info)
29✔
1773
                        .await
29✔
1774
                        .unwrap_or_else(|err| {
29✔
1775
                            warn!("handle_main_task_message returned an error: {err}");
×
1776
                            true
×
1777
                        });
×
1778

1779
                    if close_connection {
29✔
1780
                        info!(
×
1781
                            "handle_main_task_message is closing the connection to {}",
×
1782
                            self.peer_address
1783
                        );
1784
                        break;
×
1785
                    }
29✔
1786
                }
1787
            }
1788
        }
1789

1790
        Ok(())
42✔
1791
    }
43✔
1792

1793
    /// Function called before entering the peer loop. Reads the potentially stored
1794
    /// peer standing from the database and does other book-keeping before entering
1795
    /// its final resting place: the `peer_loop`. Note that the peer has already been
1796
    /// accepted for a connection for this loop to be entered. So we don't need
1797
    /// to check the standing again.
1798
    ///
1799
    /// Locking:
1800
    ///   * acquires `global_state_lock` for write
1801
    pub(crate) async fn run_wrapper<S>(
38✔
1802
        &mut self,
38✔
1803
        mut peer: S,
38✔
1804
        from_main_rx: broadcast::Receiver<MainToPeerTask>,
38✔
1805
    ) -> Result<()>
38✔
1806
    where
38✔
1807
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
38✔
1808
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
38✔
1809
        <S as TryStream>::Error: std::error::Error,
38✔
1810
    {
38✔
1811
        const TIME_DIFFERENCE_WARN_THRESHOLD_IN_SECONDS: i128 = 120;
1812

1813
        let cli_args = self.global_state_lock.cli().clone();
38✔
1814

1815
        let standing = self
38✔
1816
            .global_state_lock
38✔
1817
            .lock_guard()
38✔
1818
            .await
38✔
1819
            .net
1820
            .peer_databases
1821
            .peer_standings
1822
            .get(self.peer_address.ip())
38✔
1823
            .await
38✔
1824
            .unwrap_or_else(|| PeerStanding::new(cli_args.peer_tolerance));
37✔
1825

1826
        // Add peer to peer map
1827
        let peer_connection_info = PeerConnectionInfo::new(
37✔
1828
            self.peer_handshake_data.listen_port,
37✔
1829
            self.peer_address,
37✔
1830
            self.inbound_connection,
37✔
1831
        );
1832
        let new_peer = PeerInfo::new(
37✔
1833
            peer_connection_info,
37✔
1834
            &self.peer_handshake_data,
37✔
1835
            SystemTime::now(),
37✔
1836
            cli_args.peer_tolerance,
37✔
1837
        )
1838
        .with_standing(standing);
37✔
1839

1840
        // If timestamps are different, we currently just log a warning.
1841
        let peer_clock_ahead_in_seconds = new_peer.time_difference_in_seconds();
37✔
1842
        let own_clock_ahead_in_seconds = -peer_clock_ahead_in_seconds;
37✔
1843
        if peer_clock_ahead_in_seconds > TIME_DIFFERENCE_WARN_THRESHOLD_IN_SECONDS
37✔
1844
            || own_clock_ahead_in_seconds > TIME_DIFFERENCE_WARN_THRESHOLD_IN_SECONDS
37✔
1845
        {
1846
            let own_datetime_utc: DateTime<Utc> =
×
1847
                new_peer.own_timestamp_connection_established.into();
×
1848
            let peer_datetime_utc: DateTime<Utc> =
×
1849
                new_peer.peer_timestamp_connection_established.into();
×
1850
            warn!(
×
1851
                "New peer {} disagrees with us about time. Peer reports time {} but our clock at handshake was {}.",
×
1852
                new_peer.connected_address(),
×
1853
                peer_datetime_utc.format("%Y-%m-%d %H:%M:%S"),
×
1854
                own_datetime_utc.format("%Y-%m-%d %H:%M:%S"));
×
1855
        }
37✔
1856

1857
        // Multiple tasks might attempt to set up a connection concurrently. So
1858
        // even though we've checked that this connection is allowed, this check
1859
        // could have been invalidated by another task, for one accepting an
1860
        // incoming connection from a peer we're currently connecting to. So we
1861
        // need to make the a check again while holding a write-lock, since
1862
        // we're modifying `peer_map` here. Holding a read-lock doesn't work
1863
        // since it would have to be dropped before acquiring the write-lock.
1864
        {
1865
            let mut global_state = self.global_state_lock.lock_guard_mut().await;
37✔
1866
            let peer_map = &mut global_state.net.peer_map;
37✔
1867
            if peer_map
37✔
1868
                .values()
37✔
1869
                .any(|pi| pi.instance_id() == self.peer_handshake_data.instance_id)
37✔
1870
            {
1871
                bail!("Attempted to connect to already connected peer. Aborting connection.");
×
1872
            }
37✔
1873

1874
            if peer_map.len() >= cli_args.max_num_peers {
37✔
1875
                bail!("Attempted to connect to more peers than allowed. Aborting connection.");
×
1876
            }
37✔
1877

1878
            if peer_map.contains_key(&self.peer_address) {
37✔
1879
                // This shouldn't be possible, unless the peer reports a different instance ID than
1880
                // for the other connection. Only a malignant client would do that.
1881
                bail!("Already connected to peer. Aborting connection");
×
1882
            }
37✔
1883

1884
            peer_map.insert(self.peer_address, new_peer);
37✔
1885
        }
1886

1887
        // `MutablePeerState` contains the part of the peer-loop's state that is mutable
1888
        let mut peer_state = MutablePeerState::new(self.peer_handshake_data.tip_header.height);
37✔
1889

1890
        // If peer indicates more canonical block, request a block notification to catch up ASAP
1891
        if self.peer_handshake_data.tip_header.cumulative_proof_of_work
37✔
1892
            > self
37✔
1893
                .global_state_lock
37✔
1894
                .lock_guard()
37✔
1895
                .await
37✔
1896
                .chain
1897
                .light_state()
37✔
1898
                .kernel
1899
                .header
1900
                .cumulative_proof_of_work
1901
        {
1902
            // Send block notification request to catch up ASAP, in case we're
1903
            // behind the newly-connected peer.
1904
            peer.send(PeerMessage::BlockNotificationRequest).await?;
×
1905
        }
37✔
1906

1907
        let res = self.run(peer, from_main_rx, &mut peer_state).await;
37✔
1908
        debug!("Exited peer loop for {}", self.peer_address);
31✔
1909

1910
        close_peer_connected_callback(
31✔
1911
            self.global_state_lock.clone(),
31✔
1912
            self.peer_address,
31✔
1913
            &self.to_main_tx,
31✔
1914
        )
31✔
1915
        .await;
31✔
1916

1917
        debug!("Ending peer loop for {}", self.peer_address);
31✔
1918

1919
        // Return any error that `run` returned. Returning and not suppressing errors is a quite nice
1920
        // feature to have for testing purposes.
1921
        res
31✔
1922
    }
31✔
1923

1924
    /// Register graceful peer disconnection in the global state.
1925
    ///
1926
    /// See also [`NetworkingState::register_peer_disconnection`][1].
1927
    ///
1928
    /// # Locking:
1929
    ///   * acquires `global_state_lock` for write
1930
    ///
1931
    /// [1]: crate::models::state::networking_state::NetworkingState::register_peer_disconnection
1932
    async fn register_peer_disconnection(&mut self) {
×
1933
        let peer_id = self.peer_handshake_data.instance_id;
×
1934
        self.global_state_lock
×
1935
            .lock_guard_mut()
×
1936
            .await
×
1937
            .net
1938
            .register_peer_disconnection(peer_id, SystemTime::now());
×
1939
    }
×
1940
}
1941

1942
#[cfg(test)]
1943
#[cfg_attr(coverage_nightly, coverage(off))]
1944
mod tests {
1945
    use macro_rules_attr::apply;
1946
    use rand::rngs::StdRng;
1947
    use rand::Rng;
1948
    use rand::SeedableRng;
1949
    use tokio::sync::mpsc::error::TryRecvError;
1950
    use tracing_test::traced_test;
1951

1952
    use super::*;
1953
    use crate::config_models::cli_args;
1954
    use crate::config_models::network::Network;
1955
    use crate::models::blockchain::type_scripts::native_currency_amount::NativeCurrencyAmount;
1956
    use crate::models::peer::peer_block_notifications::PeerBlockNotification;
1957
    use crate::models::peer::transaction_notification::TransactionNotification;
1958
    use crate::models::state::mempool::upgrade_priority::UpgradePriority;
1959
    use crate::models::state::tx_creation_config::TxCreationConfig;
1960
    use crate::models::state::tx_proving_capability::TxProvingCapability;
1961
    use crate::models::state::wallet::wallet_entropy::WalletEntropy;
1962
    use crate::tests::shared::blocks::fake_valid_block_for_tests;
1963
    use crate::tests::shared::blocks::fake_valid_sequence_of_blocks_for_tests;
1964
    use crate::tests::shared::globalstate::get_dummy_handshake_data_for_genesis;
1965
    use crate::tests::shared::globalstate::get_dummy_peer_connection_data_genesis;
1966
    use crate::tests::shared::globalstate::get_dummy_socket_address;
1967
    use crate::tests::shared::globalstate::get_test_genesis_setup;
1968
    use crate::tests::shared::mock_tx::invalid_empty_single_proof_transaction;
1969
    use crate::tests::shared::Action;
1970
    use crate::tests::shared::Mock;
1971
    use crate::tests::shared_tokio_runtime;
1972

1973
    #[traced_test]
1974
    #[apply(shared_tokio_runtime)]
1975
    async fn test_peer_loop_bye() -> Result<()> {
1976
        let mock = Mock::new(vec![Action::Read(PeerMessage::Bye)]);
1977

1978
        let (peer_broadcast_tx, _from_main_rx_clone, to_main_tx, _to_main_rx1, state_lock, hsd) =
1979
            get_test_genesis_setup(Network::Beta, 2, cli_args::Args::default()).await?;
1980

1981
        let peer_address = get_dummy_socket_address(2);
1982
        let from_main_rx_clone = peer_broadcast_tx.subscribe();
1983
        let mut peer_loop_handler =
1984
            PeerLoopHandler::new(to_main_tx, state_lock.clone(), peer_address, hsd, true, 1);
1985
        peer_loop_handler
1986
            .run_wrapper(mock, from_main_rx_clone)
1987
            .await?;
1988

1989
        assert_eq!(
1990
            2,
1991
            state_lock.lock_guard().await.net.peer_map.len(),
1992
            "peer map length must be back to 2 after goodbye"
1993
        );
1994

1995
        Ok(())
1996
    }
1997

1998
    #[traced_test]
1999
    #[apply(shared_tokio_runtime)]
2000
    async fn test_peer_loop_peer_list() {
2001
        let (peer_broadcast_tx, _from_main_rx_clone, to_main_tx, _to_main_rx1, state_lock, _hsd) =
2002
            get_test_genesis_setup(Network::Beta, 2, cli_args::Args::default())
2003
                .await
2004
                .unwrap();
2005

2006
        let mut peer_infos = state_lock
2007
            .lock_guard()
2008
            .await
2009
            .net
2010
            .peer_map
2011
            .clone()
2012
            .into_values()
2013
            .collect::<Vec<_>>();
2014
        peer_infos.sort_by_cached_key(|x| x.connected_address());
2015
        let (peer_address0, instance_id0) = (
2016
            peer_infos[0].connected_address(),
2017
            peer_infos[0].instance_id(),
2018
        );
2019
        let (peer_address1, instance_id1) = (
2020
            peer_infos[1].connected_address(),
2021
            peer_infos[1].instance_id(),
2022
        );
2023

2024
        let (hsd2, sa2) = get_dummy_peer_connection_data_genesis(Network::Beta, 2);
2025
        let expected_response = vec![
2026
            (peer_address0, instance_id0),
2027
            (peer_address1, instance_id1),
2028
            (sa2, hsd2.instance_id),
2029
        ];
2030
        let mock = Mock::new(vec![
2031
            Action::Read(PeerMessage::PeerListRequest),
2032
            Action::Write(PeerMessage::PeerListResponse(expected_response)),
2033
            Action::Read(PeerMessage::Bye),
2034
        ]);
2035

2036
        let from_main_rx_clone = peer_broadcast_tx.subscribe();
2037

2038
        let mut peer_loop_handler =
2039
            PeerLoopHandler::new(to_main_tx, state_lock.clone(), sa2, hsd2, true, 0);
2040
        peer_loop_handler
2041
            .run_wrapper(mock, from_main_rx_clone)
2042
            .await
2043
            .unwrap();
2044

2045
        assert_eq!(
2046
            2,
2047
            state_lock.lock_guard().await.net.peer_map.len(),
2048
            "peer map must have length 2 after saying goodbye to peer 2"
2049
        );
2050
    }
2051

2052
    #[traced_test]
2053
    #[apply(shared_tokio_runtime)]
2054
    async fn node_does_not_record_disconnection_time_when_peer_initiates_disconnect() -> Result<()>
2055
    {
2056
        let args = cli_args::Args::default();
2057
        let network = args.network;
2058
        let (from_main_tx, from_main_rx, to_main_tx, to_main_rx, state_lock, _) =
2059
            get_test_genesis_setup(network, 0, args).await?;
2060

2061
        let peer_address = get_dummy_socket_address(0);
2062
        let peer_handshake_data = get_dummy_handshake_data_for_genesis(network);
2063
        let peer_id = peer_handshake_data.instance_id;
2064
        let mut peer_loop_handler = PeerLoopHandler::new(
2065
            to_main_tx,
2066
            state_lock.clone(),
2067
            peer_address,
2068
            peer_handshake_data,
2069
            true,
2070
            1,
2071
        );
2072
        let mock = Mock::new(vec![Action::Read(PeerMessage::Bye)]);
2073
        peer_loop_handler.run_wrapper(mock, from_main_rx).await?;
2074

2075
        let global_state = state_lock.lock_guard().await;
2076
        assert!(global_state
2077
            .net
2078
            .last_disconnection_time_of_peer(peer_id)
2079
            .is_none());
2080

2081
        drop(to_main_rx);
2082
        drop(from_main_tx);
2083

2084
        Ok(())
2085
    }
2086

2087
    mod blocks {
2088
        use super::*;
2089

2090
        #[traced_test]
2091
        #[apply(shared_tokio_runtime)]
2092
        async fn different_genesis_test() -> Result<()> {
2093
            // In this scenario a peer provides another genesis block than what has been
2094
            // hardcoded. This should lead to the closing of the connection to this peer
2095
            // and a ban.
2096

2097
            let network = Network::Main;
2098
            let (
2099
                _peer_broadcast_tx,
2100
                from_main_rx_clone,
2101
                to_main_tx,
2102
                mut to_main_rx1,
2103
                state_lock,
2104
                hsd,
2105
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2106
            assert_eq!(1000, state_lock.cli().peer_tolerance);
2107
            let peer_address = get_dummy_socket_address(0);
2108

2109
            // Although the database is empty, `get_latest_block` still returns the genesis block,
2110
            // since that block is hardcoded.
2111
            let mut different_genesis_block = state_lock
2112
                .lock_guard()
2113
                .await
2114
                .chain
2115
                .archival_state()
2116
                .get_tip()
2117
                .await;
2118

2119
            different_genesis_block.set_header_nonce(StdRng::seed_from_u64(5550001).random());
2120
            let [block_1_with_different_genesis] = fake_valid_sequence_of_blocks_for_tests(
2121
                &different_genesis_block,
2122
                Timestamp::hours(1),
2123
                StdRng::seed_from_u64(5550001).random(),
2124
                network,
2125
            )
2126
            .await;
2127
            let mock = Mock::new(vec![Action::Read(PeerMessage::Block(Box::new(
2128
                block_1_with_different_genesis.try_into().unwrap(),
2129
            )))]);
2130

2131
            let mut peer_loop_handler = PeerLoopHandler::new(
2132
                to_main_tx.clone(),
2133
                state_lock.clone(),
2134
                peer_address,
2135
                hsd,
2136
                true,
2137
                1,
2138
            );
2139
            let res = peer_loop_handler
2140
                .run_wrapper(mock, from_main_rx_clone)
2141
                .await;
2142
            assert!(
2143
                res.is_err(),
2144
                "run_wrapper must return failure when genesis is different"
2145
            );
2146

2147
            match to_main_rx1.recv().await {
2148
                Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
2149
                _ => bail!("Must receive remove of peer block max height"),
2150
            }
2151

2152
            // Verify that no further message was sent to main loop
2153
            match to_main_rx1.try_recv() {
2154
                Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (),
2155
                _ => bail!("Block notification must not be sent for block with invalid PoW"),
2156
            };
2157

2158
            drop(to_main_tx);
2159

2160
            let peer_standing = state_lock
2161
                .lock_guard()
2162
                .await
2163
                .net
2164
                .get_peer_standing_from_database(peer_address.ip())
2165
                .await;
2166
            assert_eq!(
2167
                -i32::from(state_lock.cli().peer_tolerance),
2168
                peer_standing.unwrap().standing
2169
            );
2170
            assert_eq!(
2171
                NegativePeerSanction::DifferentGenesis,
2172
                peer_standing.unwrap().latest_punishment.unwrap().0
2173
            );
2174

2175
            Ok(())
2176
        }
2177

2178
        #[traced_test]
2179
        #[apply(shared_tokio_runtime)]
2180
        async fn block_without_valid_pow_test() -> Result<()> {
2181
            // In this scenario, a block without a valid PoW is received. This block should be rejected
2182
            // by the peer loop and a notification should never reach the main loop.
2183

2184
            let network = Network::Main;
2185
            let (
2186
                peer_broadcast_tx,
2187
                _from_main_rx_clone,
2188
                to_main_tx,
2189
                mut to_main_rx1,
2190
                state_lock,
2191
                hsd,
2192
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2193
            let peer_address = get_dummy_socket_address(0);
2194
            let genesis_block: Block = state_lock
2195
                .lock_guard()
2196
                .await
2197
                .chain
2198
                .archival_state()
2199
                .get_tip()
2200
                .await;
2201

2202
            // Make a with hash above what the implied threshold from
2203
            let [mut block_without_valid_pow] = fake_valid_sequence_of_blocks_for_tests(
2204
                &genesis_block,
2205
                Timestamp::hours(1),
2206
                StdRng::seed_from_u64(5550001).random(),
2207
                network,
2208
            )
2209
            .await;
2210

2211
            // This *probably* is invalid PoW -- and needs to be for this test to
2212
            // work.
2213
            block_without_valid_pow.set_header_nonce(Digest::default());
2214

2215
            // Sending an invalid block will not necessarily result in a ban. This depends on the peer
2216
            // tolerance that is set in the client. For this reason, we include a "Bye" here.
2217
            let mock = Mock::new(vec![
2218
                Action::Read(PeerMessage::Block(Box::new(
2219
                    block_without_valid_pow.clone().try_into().unwrap(),
2220
                ))),
2221
                Action::Read(PeerMessage::Bye),
2222
            ]);
2223

2224
            let from_main_rx_clone = peer_broadcast_tx.subscribe();
2225

2226
            let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
2227
                to_main_tx.clone(),
2228
                state_lock.clone(),
2229
                peer_address,
2230
                hsd,
2231
                true,
2232
                1,
2233
                block_without_valid_pow.header().timestamp,
2234
            );
2235
            peer_loop_handler
2236
                .run_wrapper(mock, from_main_rx_clone)
2237
                .await
2238
                .expect("sending (one) invalid block should not result in closed connection");
2239

2240
            match to_main_rx1.recv().await {
2241
                Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
2242
                _ => bail!("Must receive remove of peer block max height"),
2243
            }
2244

2245
            // Verify that no further message was sent to main loop
2246
            match to_main_rx1.try_recv() {
2247
                Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (),
2248
                _ => bail!("Block notification must not be sent for block with invalid PoW"),
2249
            };
2250

2251
            // We need to have the transmitter in scope until we have received from it
2252
            // otherwise the receiver will report the disconnected error when we attempt
2253
            // to read from it. And the purpose is to verify that the channel is empty,
2254
            // not that it has been closed.
2255
            drop(to_main_tx);
2256

2257
            // Verify that peer standing was stored in database
2258
            let standing = state_lock
2259
                .lock_guard()
2260
                .await
2261
                .net
2262
                .peer_databases
2263
                .peer_standings
2264
                .get(peer_address.ip())
2265
                .await
2266
                .unwrap();
2267
            assert!(
2268
                standing.standing < 0,
2269
                "Peer must be sanctioned for sending a bad block"
2270
            );
2271

2272
            Ok(())
2273
        }
2274

2275
        #[traced_test]
2276
        #[apply(shared_tokio_runtime)]
2277
        async fn test_peer_loop_block_with_block_in_db() -> Result<()> {
2278
            // The scenario tested here is that a client receives a block that is already
2279
            // known and stored. The expected behavior is to ignore the block and not send
2280
            // a message to the main task.
2281

2282
            let network = Network::Main;
2283
            let (
2284
                peer_broadcast_tx,
2285
                _from_main_rx_clone,
2286
                to_main_tx,
2287
                mut to_main_rx1,
2288
                mut alice,
2289
                hsd,
2290
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2291
            let peer_address = get_dummy_socket_address(0);
2292
            let genesis_block: Block = Block::genesis(network);
2293

2294
            let now = genesis_block.header().timestamp + Timestamp::hours(1);
2295
            let block_1 =
2296
                fake_valid_block_for_tests(&alice, StdRng::seed_from_u64(5550001).random()).await;
2297
            assert!(
2298
                block_1.is_valid(&genesis_block, now, network).await,
2299
                "Block must be valid for this test to make sense"
2300
            );
2301
            alice.set_new_tip(block_1.clone()).await?;
2302

2303
            let mock_peer_messages = Mock::new(vec![
2304
                Action::Read(PeerMessage::Block(Box::new(
2305
                    block_1.clone().try_into().unwrap(),
2306
                ))),
2307
                Action::Read(PeerMessage::Bye),
2308
            ]);
2309

2310
            let from_main_rx_clone = peer_broadcast_tx.subscribe();
2311

2312
            let mut alice_peer_loop_handler = PeerLoopHandler::with_mocked_time(
2313
                to_main_tx.clone(),
2314
                alice.clone(),
2315
                peer_address,
2316
                hsd,
2317
                false,
2318
                1,
2319
                block_1.header().timestamp,
2320
            );
2321
            alice_peer_loop_handler
2322
                .run_wrapper(mock_peer_messages, from_main_rx_clone)
2323
                .await?;
2324

2325
            match to_main_rx1.recv().await {
2326
                Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
2327
                other => bail!("Must receive remove of peer block max height. Got:\n {other:?}"),
2328
            }
2329
            match to_main_rx1.try_recv() {
2330
                Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (),
2331
                _ => bail!("Block notification must not be sent for block with invalid PoW"),
2332
            };
2333
            drop(to_main_tx);
2334

2335
            if !alice.lock_guard().await.net.peer_map.is_empty() {
2336
                bail!("peer map must be empty after closing connection gracefully");
2337
            }
2338

2339
            Ok(())
2340
        }
2341

2342
        #[traced_test]
2343
        #[apply(shared_tokio_runtime)]
2344
        async fn block_request_batch_simple() {
2345
            // Scenario: Six blocks (including genesis) are known. Peer requests
2346
            // from all possible starting points, and client responds with the
2347
            // correct list of blocks.
2348
            let network = Network::Main;
2349
            let (
2350
                _peer_broadcast_tx,
2351
                from_main_rx_clone,
2352
                to_main_tx,
2353
                _to_main_rx1,
2354
                mut state_lock,
2355
                hsd,
2356
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default())
2357
                .await
2358
                .unwrap();
2359
            let genesis_block: Block = Block::genesis(network);
2360
            let peer_address = get_dummy_socket_address(0);
2361
            let [block_1, block_2, block_3, block_4, block_5] =
2362
                fake_valid_sequence_of_blocks_for_tests(
2363
                    &genesis_block,
2364
                    Timestamp::hours(1),
2365
                    StdRng::seed_from_u64(5550001).random(),
2366
                    network,
2367
                )
2368
                .await;
2369
            let blocks = vec![
2370
                genesis_block,
2371
                block_1,
2372
                block_2,
2373
                block_3,
2374
                block_4,
2375
                block_5.clone(),
2376
            ];
2377
            for block in blocks.iter().skip(1) {
2378
                state_lock.set_new_tip(block.to_owned()).await.unwrap();
2379
            }
2380

2381
            let mmra = state_lock
2382
                .lock_guard()
2383
                .await
2384
                .chain
2385
                .archival_state()
2386
                .archival_block_mmr
2387
                .ammr()
2388
                .to_accumulator_async()
2389
                .await;
2390
            for i in 0..=4 {
2391
                let expected_response = {
2392
                    let state = state_lock.lock_guard().await;
2393
                    let blocks_for_response = blocks.iter().skip(i + 1).cloned().collect_vec();
2394
                    PeerLoopHandler::batch_response(&state, blocks_for_response, &mmra)
2395
                        .await
2396
                        .unwrap()
2397
                };
2398
                let mock = Mock::new(vec![
2399
                    Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch {
2400
                        known_blocks: vec![blocks[i].hash()],
2401
                        max_response_len: 14,
2402
                        anchor: mmra.clone(),
2403
                    })),
2404
                    Action::Write(PeerMessage::BlockResponseBatch(expected_response)),
2405
                    Action::Read(PeerMessage::Bye),
2406
                ]);
2407
                let mut peer_loop_handler = PeerLoopHandler::new(
2408
                    to_main_tx.clone(),
2409
                    state_lock.clone(),
2410
                    peer_address,
2411
                    hsd.clone(),
2412
                    false,
2413
                    1,
2414
                );
2415

2416
                peer_loop_handler
2417
                    .run_wrapper(mock, from_main_rx_clone.resubscribe())
2418
                    .await
2419
                    .unwrap();
2420
            }
2421
        }
2422

2423
        #[traced_test]
2424
        #[apply(shared_tokio_runtime)]
2425
        async fn block_request_batch_in_order_test() -> Result<()> {
2426
            // Scenario: A fork began at block 2, node knows two blocks of height 2 and two of height 3.
2427
            // A peer requests a batch of blocks starting from block 1. Ensure that the correct blocks
2428
            // are returned.
2429

2430
            let network = Network::Main;
2431
            let (
2432
                _peer_broadcast_tx,
2433
                from_main_rx_clone,
2434
                to_main_tx,
2435
                _to_main_rx1,
2436
                mut state_lock,
2437
                hsd,
2438
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2439
            let genesis_block: Block = Block::genesis(network);
2440
            let peer_address = get_dummy_socket_address(0);
2441
            let [block_1, block_2_a, block_3_a] = fake_valid_sequence_of_blocks_for_tests(
2442
                &genesis_block,
2443
                Timestamp::hours(1),
2444
                StdRng::seed_from_u64(5550001).random(),
2445
                network,
2446
            )
2447
            .await;
2448
            let [block_2_b, block_3_b] = fake_valid_sequence_of_blocks_for_tests(
2449
                &block_1,
2450
                Timestamp::hours(1),
2451
                StdRng::seed_from_u64(5550002).random(),
2452
                network,
2453
            )
2454
            .await;
2455
            assert_ne!(block_2_b.hash(), block_2_a.hash());
2456

2457
            state_lock.set_new_tip(block_1.clone()).await?;
2458
            state_lock.set_new_tip(block_2_a.clone()).await?;
2459
            state_lock.set_new_tip(block_2_b.clone()).await?;
2460
            state_lock.set_new_tip(block_3_b.clone()).await?;
2461
            state_lock.set_new_tip(block_3_a.clone()).await?;
2462

2463
            let anchor = state_lock
2464
                .lock_guard()
2465
                .await
2466
                .chain
2467
                .archival_state()
2468
                .archival_block_mmr
2469
                .ammr()
2470
                .to_accumulator_async()
2471
                .await;
2472
            let response_1 = {
2473
                let state_lock = state_lock.lock_guard().await;
2474
                PeerLoopHandler::batch_response(
2475
                    &state_lock,
2476
                    vec![block_1.clone(), block_2_a.clone(), block_3_a.clone()],
2477
                    &anchor,
2478
                )
2479
                .await
2480
                .unwrap()
2481
            };
2482

2483
            let mut mock = Mock::new(vec![
2484
                Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch {
2485
                    known_blocks: vec![genesis_block.hash()],
2486
                    max_response_len: 14,
2487
                    anchor: anchor.clone(),
2488
                })),
2489
                Action::Write(PeerMessage::BlockResponseBatch(response_1)),
2490
                Action::Read(PeerMessage::Bye),
2491
            ]);
2492

2493
            let mut peer_loop_handler_1 = PeerLoopHandler::with_mocked_time(
2494
                to_main_tx.clone(),
2495
                state_lock.clone(),
2496
                peer_address,
2497
                hsd.clone(),
2498
                false,
2499
                1,
2500
                block_3_a.header().timestamp,
2501
            );
2502

2503
            peer_loop_handler_1
2504
                .run_wrapper(mock, from_main_rx_clone.resubscribe())
2505
                .await?;
2506

2507
            // Peer knows block 2_b, verify that canonical chain with 2_a is returned
2508
            let response_2 = {
2509
                let state_lock = state_lock.lock_guard().await;
2510
                PeerLoopHandler::batch_response(
2511
                    &state_lock,
2512
                    vec![block_2_a, block_3_a.clone()],
2513
                    &anchor,
2514
                )
2515
                .await
2516
                .unwrap()
2517
            };
2518
            mock = Mock::new(vec![
2519
                Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch {
2520
                    known_blocks: vec![block_2_b.hash(), block_1.hash(), genesis_block.hash()],
2521
                    max_response_len: 14,
2522
                    anchor,
2523
                })),
2524
                Action::Write(PeerMessage::BlockResponseBatch(response_2)),
2525
                Action::Read(PeerMessage::Bye),
2526
            ]);
2527

2528
            let mut peer_loop_handler_2 = PeerLoopHandler::with_mocked_time(
2529
                to_main_tx.clone(),
2530
                state_lock.clone(),
2531
                peer_address,
2532
                hsd,
2533
                false,
2534
                1,
2535
                block_3_a.header().timestamp,
2536
            );
2537

2538
            peer_loop_handler_2
2539
                .run_wrapper(mock, from_main_rx_clone)
2540
                .await?;
2541

2542
            Ok(())
2543
        }
2544

2545
        #[traced_test]
2546
        #[apply(shared_tokio_runtime)]
2547
        async fn block_request_batch_out_of_order_test() -> Result<()> {
2548
            // Scenario: A fork began at block 2, node knows two blocks of height 2 and two of height 3.
2549
            // A peer requests a batch of blocks starting from block 1, but the peer supplies their
2550
            // hashes in a wrong order. Ensure that the correct blocks are returned, in the right order.
2551
            // The blocks will be supplied in the correct order but starting from the first digest in
2552
            // the list that is known and canonical.
2553

2554
            let network = Network::Main;
2555
            let (
2556
                _peer_broadcast_tx,
2557
                from_main_rx_clone,
2558
                to_main_tx,
2559
                _to_main_rx1,
2560
                mut state_lock,
2561
                hsd,
2562
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2563
            let genesis_block = Block::genesis(network);
2564
            let peer_address = get_dummy_socket_address(0);
2565
            let [block_1, block_2_a, block_3_a] = fake_valid_sequence_of_blocks_for_tests(
2566
                &genesis_block,
2567
                Timestamp::hours(1),
2568
                StdRng::seed_from_u64(5550001).random(),
2569
                network,
2570
            )
2571
            .await;
2572
            let [block_2_b, block_3_b] = fake_valid_sequence_of_blocks_for_tests(
2573
                &block_1,
2574
                Timestamp::hours(1),
2575
                StdRng::seed_from_u64(5550002).random(),
2576
                network,
2577
            )
2578
            .await;
2579
            assert_ne!(block_2_a.hash(), block_2_b.hash());
2580

2581
            state_lock.set_new_tip(block_1.clone()).await?;
2582
            state_lock.set_new_tip(block_2_a.clone()).await?;
2583
            state_lock.set_new_tip(block_2_b.clone()).await?;
2584
            state_lock.set_new_tip(block_3_b.clone()).await?;
2585
            state_lock.set_new_tip(block_3_a.clone()).await?;
2586

2587
            // Peer knows block 2_b, verify that canonical chain with 2_a is returned
2588
            let mut expected_anchor = block_3_a.body().block_mmr_accumulator.clone();
2589
            expected_anchor.append(block_3_a.hash());
2590
            let state_anchor = state_lock
2591
                .lock_guard()
2592
                .await
2593
                .chain
2594
                .archival_state()
2595
                .archival_block_mmr
2596
                .ammr()
2597
                .to_accumulator_async()
2598
                .await;
2599
            assert_eq!(
2600
                expected_anchor, state_anchor,
2601
                "Catching assumption about MMRA in tip and in archival state"
2602
            );
2603

2604
            let response = {
2605
                let state_lock = state_lock.lock_guard().await;
2606
                PeerLoopHandler::batch_response(
2607
                    &state_lock,
2608
                    vec![block_1.clone(), block_2_a, block_3_a.clone()],
2609
                    &expected_anchor,
2610
                )
2611
                .await
2612
                .unwrap()
2613
            };
2614
            let mock = Mock::new(vec![
2615
                Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch {
2616
                    known_blocks: vec![block_2_b.hash(), genesis_block.hash(), block_1.hash()],
2617
                    max_response_len: 14,
2618
                    anchor: expected_anchor,
2619
                })),
2620
                // Since genesis block is the 1st known in the list of known blocks,
2621
                // it's immediate descendent, block_1, is the first one returned.
2622
                Action::Write(PeerMessage::BlockResponseBatch(response)),
2623
                Action::Read(PeerMessage::Bye),
2624
            ]);
2625

2626
            let mut peer_loop_handler_2 = PeerLoopHandler::with_mocked_time(
2627
                to_main_tx.clone(),
2628
                state_lock.clone(),
2629
                peer_address,
2630
                hsd,
2631
                false,
2632
                1,
2633
                block_3_a.header().timestamp,
2634
            );
2635

2636
            peer_loop_handler_2
2637
                .run_wrapper(mock, from_main_rx_clone)
2638
                .await?;
2639

2640
            Ok(())
2641
        }
2642

2643
        #[traced_test]
2644
        #[apply(shared_tokio_runtime)]
2645
        async fn request_unknown_height_doesnt_crash() {
2646
            // Scenario: Only genesis block is known. Peer requests block of height
2647
            // 2.
2648
            let network = Network::Main;
2649
            let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, state_lock, hsd) =
2650
                get_test_genesis_setup(network, 0, cli_args::Args::default())
2651
                    .await
2652
                    .unwrap();
2653
            let peer_address = get_dummy_socket_address(0);
2654
            let mock = Mock::new(vec![
2655
                Action::Read(PeerMessage::BlockRequestByHeight(2.into())),
2656
                Action::Read(PeerMessage::Bye),
2657
            ]);
2658

2659
            let mut peer_loop_handler = PeerLoopHandler::new(
2660
                to_main_tx.clone(),
2661
                state_lock.clone(),
2662
                peer_address,
2663
                hsd,
2664
                false,
2665
                1,
2666
            );
2667

2668
            // This will return error if seen read/write order does not match that of the
2669
            // mocked object.
2670
            peer_loop_handler
2671
                .run_wrapper(mock, from_main_rx_clone)
2672
                .await
2673
                .unwrap();
2674

2675
            // Verify that peer is sanctioned for this nonsense.
2676
            assert!(state_lock
2677
                .lock_guard()
2678
                .await
2679
                .net
2680
                .get_peer_standing_from_database(peer_address.ip())
2681
                .await
2682
                .unwrap()
2683
                .standing
2684
                .is_negative());
2685
        }
2686

2687
        #[traced_test]
2688
        #[apply(shared_tokio_runtime)]
2689
        async fn find_canonical_chain_when_multiple_blocks_at_same_height_test() -> Result<()> {
2690
            // Scenario: A fork began at block 2, node knows two blocks of height 2 and two of height 3.
2691
            // A peer requests a block at height 2. Verify that the correct block at height 2 is
2692
            // returned.
2693

2694
            let network = Network::Main;
2695
            let (
2696
                _peer_broadcast_tx,
2697
                from_main_rx_clone,
2698
                to_main_tx,
2699
                _to_main_rx1,
2700
                mut state_lock,
2701
                hsd,
2702
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2703
            let genesis_block = Block::genesis(network);
2704
            let peer_address = get_dummy_socket_address(0);
2705

2706
            let [block_1, block_2_a, block_3_a] = fake_valid_sequence_of_blocks_for_tests(
2707
                &genesis_block,
2708
                Timestamp::hours(1),
2709
                StdRng::seed_from_u64(5550001).random(),
2710
                network,
2711
            )
2712
            .await;
2713
            let [block_2_b, block_3_b] = fake_valid_sequence_of_blocks_for_tests(
2714
                &block_1,
2715
                Timestamp::hours(1),
2716
                StdRng::seed_from_u64(5550002).random(),
2717
                network,
2718
            )
2719
            .await;
2720
            assert_ne!(block_2_a.hash(), block_2_b.hash());
2721

2722
            state_lock.set_new_tip(block_1.clone()).await?;
2723
            state_lock.set_new_tip(block_2_a.clone()).await?;
2724
            state_lock.set_new_tip(block_2_b.clone()).await?;
2725
            state_lock.set_new_tip(block_3_b.clone()).await?;
2726
            state_lock.set_new_tip(block_3_a.clone()).await?;
2727

2728
            let mock = Mock::new(vec![
2729
                Action::Read(PeerMessage::BlockRequestByHeight(2.into())),
2730
                Action::Write(PeerMessage::Block(Box::new(block_2_a.try_into().unwrap()))),
2731
                Action::Read(PeerMessage::BlockRequestByHeight(3.into())),
2732
                Action::Write(PeerMessage::Block(Box::new(
2733
                    block_3_a.clone().try_into().unwrap(),
2734
                ))),
2735
                Action::Read(PeerMessage::Bye),
2736
            ]);
2737

2738
            let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
2739
                to_main_tx.clone(),
2740
                state_lock.clone(),
2741
                peer_address,
2742
                hsd,
2743
                false,
2744
                1,
2745
                block_3_a.header().timestamp,
2746
            );
2747

2748
            // This will return error if seen read/write order does not match that of the
2749
            // mocked object.
2750
            peer_loop_handler
2751
                .run_wrapper(mock, from_main_rx_clone)
2752
                .await?;
2753

2754
            Ok(())
2755
        }
2756

2757
        #[traced_test]
2758
        #[apply(shared_tokio_runtime)]
2759
        async fn receival_of_block_notification_height_1() {
2760
            // Scenario: client only knows genesis block. Then receives block
2761
            // notification of height 1. Must request block 1.
2762
            let network = Network::Main;
2763
            let mut rng = StdRng::seed_from_u64(5552401);
2764
            let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, to_main_rx1, state_lock, hsd) =
2765
                get_test_genesis_setup(network, 0, cli_args::Args::default())
2766
                    .await
2767
                    .unwrap();
2768
            let block_1 = fake_valid_block_for_tests(&state_lock, rng.random()).await;
2769
            let notification_height1 = (&block_1).into();
2770
            let mock = Mock::new(vec![
2771
                Action::Read(PeerMessage::BlockNotification(notification_height1)),
2772
                Action::Write(PeerMessage::BlockRequestByHeight(1u64.into())),
2773
                Action::Read(PeerMessage::Bye),
2774
            ]);
2775

2776
            let peer_address = get_dummy_socket_address(0);
2777
            let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
2778
                to_main_tx.clone(),
2779
                state_lock.clone(),
2780
                peer_address,
2781
                hsd,
2782
                false,
2783
                1,
2784
                block_1.header().timestamp,
2785
            );
2786
            peer_loop_handler
2787
                .run_wrapper(mock, from_main_rx_clone)
2788
                .await
2789
                .unwrap();
2790

2791
            drop(to_main_rx1);
2792
        }
2793

2794
        #[traced_test]
2795
        #[apply(shared_tokio_runtime)]
2796
        async fn receive_block_request_by_height_block_7() {
2797
            // Scenario: client only knows blocks up to height 7. Then receives block-
2798
            // request-by-height for height 7. Must respond with block 7.
2799
            let network = Network::Main;
2800
            let mut rng = StdRng::seed_from_u64(5552401);
2801
            let (
2802
                _peer_broadcast_tx,
2803
                from_main_rx_clone,
2804
                to_main_tx,
2805
                to_main_rx1,
2806
                mut state_lock,
2807
                hsd,
2808
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default())
2809
                .await
2810
                .unwrap();
2811
            let genesis_block = Block::genesis(network);
2812
            let blocks: [Block; 7] = fake_valid_sequence_of_blocks_for_tests(
2813
                &genesis_block,
2814
                Timestamp::hours(1),
2815
                rng.random(),
2816
                network,
2817
            )
2818
            .await;
2819
            let block7 = blocks.last().unwrap().to_owned();
2820
            let tip_height: u64 = block7.header().height.into();
2821
            assert_eq!(7, tip_height);
2822

2823
            for block in &blocks {
2824
                state_lock.set_new_tip(block.to_owned()).await.unwrap();
2825
            }
2826

2827
            let block7_response = PeerMessage::Block(Box::new(block7.try_into().unwrap()));
2828
            let mock = Mock::new(vec![
2829
                Action::Read(PeerMessage::BlockRequestByHeight(7u64.into())),
2830
                Action::Write(block7_response),
2831
                Action::Read(PeerMessage::Bye),
2832
            ]);
2833

2834
            let peer_address = get_dummy_socket_address(0);
2835
            let mut peer_loop_handler = PeerLoopHandler::new(
2836
                to_main_tx.clone(),
2837
                state_lock.clone(),
2838
                peer_address,
2839
                hsd,
2840
                false,
2841
                1,
2842
            );
2843
            peer_loop_handler
2844
                .run_wrapper(mock, from_main_rx_clone)
2845
                .await
2846
                .unwrap();
2847

2848
            drop(to_main_rx1);
2849
        }
2850

2851
        #[traced_test]
2852
        #[apply(shared_tokio_runtime)]
2853
        async fn test_peer_loop_receival_of_first_block() -> Result<()> {
2854
            // Scenario: client only knows genesis block. Then receives block 1.
2855

2856
            let network = Network::Main;
2857
            let mut rng = StdRng::seed_from_u64(5550001);
2858
            let (
2859
                _peer_broadcast_tx,
2860
                from_main_rx_clone,
2861
                to_main_tx,
2862
                mut to_main_rx1,
2863
                state_lock,
2864
                hsd,
2865
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2866
            let peer_address = get_dummy_socket_address(0);
2867

2868
            let block_1 = fake_valid_block_for_tests(&state_lock, rng.random()).await;
2869
            let mock = Mock::new(vec![
2870
                Action::Read(PeerMessage::Block(Box::new(
2871
                    block_1.clone().try_into().unwrap(),
2872
                ))),
2873
                Action::Read(PeerMessage::Bye),
2874
            ]);
2875

2876
            let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
2877
                to_main_tx.clone(),
2878
                state_lock.clone(),
2879
                peer_address,
2880
                hsd,
2881
                false,
2882
                1,
2883
                block_1.header().timestamp,
2884
            );
2885
            peer_loop_handler
2886
                .run_wrapper(mock, from_main_rx_clone)
2887
                .await?;
2888

2889
            // Verify that a block was sent to `main_loop`
2890
            match to_main_rx1.recv().await {
2891
                Some(PeerTaskToMain::NewBlocks(_block)) => (),
2892
                _ => bail!("Did not find msg sent to main task"),
2893
            };
2894

2895
            match to_main_rx1.recv().await {
2896
                Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
2897
                _ => bail!("Must receive remove of peer block max height"),
2898
            }
2899

2900
            if !state_lock.lock_guard().await.net.peer_map.is_empty() {
2901
                bail!("peer map must be empty after closing connection gracefully");
2902
            }
2903

2904
            Ok(())
2905
        }
2906

2907
        #[traced_test]
2908
        #[apply(shared_tokio_runtime)]
2909
        async fn test_peer_loop_receival_of_second_block_no_blocks_in_db() -> Result<()> {
2910
            // In this scenario, the client only knows the genesis block (block 0) and then
2911
            // receives block 2, meaning that block 1 will have to be requested.
2912

2913
            let network = Network::Main;
2914
            let (
2915
                _peer_broadcast_tx,
2916
                from_main_rx_clone,
2917
                to_main_tx,
2918
                mut to_main_rx1,
2919
                state_lock,
2920
                hsd,
2921
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2922
            let peer_address = get_dummy_socket_address(0);
2923
            let genesis_block: Block = state_lock
2924
                .lock_guard()
2925
                .await
2926
                .chain
2927
                .archival_state()
2928
                .get_tip()
2929
                .await;
2930
            let [block_1, block_2] = fake_valid_sequence_of_blocks_for_tests(
2931
                &genesis_block,
2932
                Timestamp::hours(1),
2933
                StdRng::seed_from_u64(5550001).random(),
2934
                network,
2935
            )
2936
            .await;
2937

2938
            let mock = Mock::new(vec![
2939
                Action::Read(PeerMessage::Block(Box::new(
2940
                    block_2.clone().try_into().unwrap(),
2941
                ))),
2942
                Action::Write(PeerMessage::BlockRequestByHash(block_1.hash())),
2943
                Action::Read(PeerMessage::Block(Box::new(
2944
                    block_1.clone().try_into().unwrap(),
2945
                ))),
2946
                Action::Read(PeerMessage::Bye),
2947
            ]);
2948

2949
            let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
2950
                to_main_tx.clone(),
2951
                state_lock.clone(),
2952
                peer_address,
2953
                hsd,
2954
                true,
2955
                1,
2956
                block_2.header().timestamp,
2957
            );
2958
            peer_loop_handler
2959
                .run_wrapper(mock, from_main_rx_clone)
2960
                .await?;
2961

2962
            match to_main_rx1.recv().await {
2963
                Some(PeerTaskToMain::NewBlocks(blocks)) => {
2964
                    if blocks[0].hash() != block_1.hash() {
2965
                        bail!("1st received block by main loop must be block 1");
2966
                    }
2967
                    if blocks[1].hash() != block_2.hash() {
2968
                        bail!("2nd received block by main loop must be block 2");
2969
                    }
2970
                }
2971
                _ => bail!("Did not find msg sent to main task 1"),
2972
            };
2973
            match to_main_rx1.recv().await {
2974
                Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
2975
                _ => bail!("Must receive remove of peer block max height"),
2976
            }
2977

2978
            if !state_lock.lock_guard().await.net.peer_map.is_empty() {
2979
                bail!("peer map must be empty after closing connection gracefully");
2980
            }
2981

2982
            Ok(())
2983
        }
2984

2985
        #[traced_test]
2986
        #[apply(shared_tokio_runtime)]
2987
        async fn prevent_ram_exhaustion_test() -> Result<()> {
2988
            // In this scenario the peer sends more blocks than the client allows to store in the
2989
            // fork-reconciliation field. This should result in abandonment of the fork-reconciliation
2990
            // process as the alternative is that the program will crash because it runs out of RAM.
2991

2992
            let network = Network::Main;
2993
            let mut rng = StdRng::seed_from_u64(5550001);
2994
            let (
2995
                _peer_broadcast_tx,
2996
                from_main_rx_clone,
2997
                to_main_tx,
2998
                mut to_main_rx1,
2999
                mut state_lock,
3000
                _hsd,
3001
            ) = get_test_genesis_setup(network, 1, cli_args::Args::default()).await?;
3002
            let genesis_block = Block::genesis(network);
3003

3004
            // Restrict max number of blocks held in memory to 2.
3005
            let mut cli = state_lock.cli().clone();
3006
            cli.sync_mode_threshold = 2;
3007
            state_lock.set_cli(cli).await;
3008

3009
            let (hsd1, peer_address1) = get_dummy_peer_connection_data_genesis(Network::Beta, 1);
3010
            let [block_1, _block_2, block_3, block_4] = fake_valid_sequence_of_blocks_for_tests(
3011
                &genesis_block,
3012
                Timestamp::hours(1),
3013
                rng.random(),
3014
                network,
3015
            )
3016
            .await;
3017
            state_lock.set_new_tip(block_1.clone()).await?;
3018

3019
            let mock = Mock::new(vec![
3020
                Action::Read(PeerMessage::Block(Box::new(
3021
                    block_4.clone().try_into().unwrap(),
3022
                ))),
3023
                Action::Write(PeerMessage::BlockRequestByHash(block_3.hash())),
3024
                Action::Read(PeerMessage::Block(Box::new(
3025
                    block_3.clone().try_into().unwrap(),
3026
                ))),
3027
                Action::Read(PeerMessage::Bye),
3028
            ]);
3029

3030
            let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
3031
                to_main_tx.clone(),
3032
                state_lock.clone(),
3033
                peer_address1,
3034
                hsd1,
3035
                true,
3036
                1,
3037
                block_4.header().timestamp,
3038
            );
3039
            peer_loop_handler
3040
                .run_wrapper(mock, from_main_rx_clone)
3041
                .await?;
3042

3043
            match to_main_rx1.recv().await {
3044
                Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
3045
                _ => bail!("Must receive remove of peer block max height"),
3046
            }
3047

3048
            // Verify that no block is sent to main loop.
3049
            match to_main_rx1.try_recv() {
3050
            Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (),
3051
            _ => bail!("Peer must not handle more fork-reconciliation blocks than specified in CLI arguments"),
3052
        };
3053
            drop(to_main_tx);
3054

3055
            // Verify that peer is sanctioned for failed fork reconciliation attempt
3056
            assert!(state_lock
3057
                .lock_guard()
3058
                .await
3059
                .net
3060
                .get_peer_standing_from_database(peer_address1.ip())
3061
                .await
3062
                .unwrap()
3063
                .standing
3064
                .is_negative());
3065

3066
            Ok(())
3067
        }
3068

3069
        #[traced_test]
3070
        #[apply(shared_tokio_runtime)]
3071
        async fn test_peer_loop_receival_of_fourth_block_one_block_in_db() {
3072
            // In this scenario, the client know the genesis block (block 0) and block 1, it
3073
            // then receives block 4, meaning that block 3 and 2 will have to be requested.
3074

3075
            let network = Network::Main;
3076
            let (
3077
                _peer_broadcast_tx,
3078
                from_main_rx_clone,
3079
                to_main_tx,
3080
                mut to_main_rx1,
3081
                mut state_lock,
3082
                hsd,
3083
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default())
3084
                .await
3085
                .unwrap();
3086
            let peer_address: SocketAddr = get_dummy_socket_address(0);
3087
            let genesis_block = Block::genesis(network);
3088
            let [block_1, block_2, block_3, block_4] = fake_valid_sequence_of_blocks_for_tests(
3089
                &genesis_block,
3090
                Timestamp::hours(1),
3091
                StdRng::seed_from_u64(5550001).random(),
3092
                network,
3093
            )
3094
            .await;
3095
            state_lock.set_new_tip(block_1.clone()).await.unwrap();
3096

3097
            let mock = Mock::new(vec![
3098
                Action::Read(PeerMessage::Block(Box::new(
3099
                    block_4.clone().try_into().unwrap(),
3100
                ))),
3101
                Action::Write(PeerMessage::BlockRequestByHash(block_3.hash())),
3102
                Action::Read(PeerMessage::Block(Box::new(
3103
                    block_3.clone().try_into().unwrap(),
3104
                ))),
3105
                Action::Write(PeerMessage::BlockRequestByHash(block_2.hash())),
3106
                Action::Read(PeerMessage::Block(Box::new(
3107
                    block_2.clone().try_into().unwrap(),
3108
                ))),
3109
                Action::Read(PeerMessage::Bye),
3110
            ]);
3111

3112
            let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
3113
                to_main_tx.clone(),
3114
                state_lock.clone(),
3115
                peer_address,
3116
                hsd,
3117
                true,
3118
                1,
3119
                block_4.header().timestamp,
3120
            );
3121
            peer_loop_handler
3122
                .run_wrapper(mock, from_main_rx_clone)
3123
                .await
3124
                .unwrap();
3125

3126
            let Some(PeerTaskToMain::NewBlocks(blocks)) = to_main_rx1.recv().await else {
3127
                panic!("Did not find msg sent to main task");
3128
            };
3129
            assert_eq!(blocks[0].hash(), block_2.hash());
3130
            assert_eq!(blocks[1].hash(), block_3.hash());
3131
            assert_eq!(blocks[2].hash(), block_4.hash());
3132

3133
            let Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) = to_main_rx1.recv().await else {
3134
                panic!("Must receive remove of peer block max height");
3135
            };
3136

3137
            assert!(
3138
                state_lock.lock_guard().await.net.peer_map.is_empty(),
3139
                "peer map must be empty after closing connection gracefully"
3140
            );
3141
        }
3142

3143
        #[traced_test]
3144
        #[apply(shared_tokio_runtime)]
3145
        async fn test_peer_loop_receival_of_third_block_no_blocks_in_db() -> Result<()> {
3146
            // In this scenario, the client only knows the genesis block (block 0) and then
3147
            // receives block 3, meaning that block 2 and 1 will have to be requested.
3148

3149
            let network = Network::Main;
3150
            let (
3151
                _peer_broadcast_tx,
3152
                from_main_rx_clone,
3153
                to_main_tx,
3154
                mut to_main_rx1,
3155
                state_lock,
3156
                hsd,
3157
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
3158
            let peer_address = get_dummy_socket_address(0);
3159
            let genesis_block = Block::genesis(network);
3160

3161
            let [block_1, block_2, block_3] = fake_valid_sequence_of_blocks_for_tests(
3162
                &genesis_block,
3163
                Timestamp::hours(1),
3164
                StdRng::seed_from_u64(5550001).random(),
3165
                network,
3166
            )
3167
            .await;
3168

3169
            let mock = Mock::new(vec![
3170
                Action::Read(PeerMessage::Block(Box::new(
3171
                    block_3.clone().try_into().unwrap(),
3172
                ))),
3173
                Action::Write(PeerMessage::BlockRequestByHash(block_2.hash())),
3174
                Action::Read(PeerMessage::Block(Box::new(
3175
                    block_2.clone().try_into().unwrap(),
3176
                ))),
3177
                Action::Write(PeerMessage::BlockRequestByHash(block_1.hash())),
3178
                Action::Read(PeerMessage::Block(Box::new(
3179
                    block_1.clone().try_into().unwrap(),
3180
                ))),
3181
                Action::Read(PeerMessage::Bye),
3182
            ]);
3183

3184
            let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
3185
                to_main_tx.clone(),
3186
                state_lock.clone(),
3187
                peer_address,
3188
                hsd,
3189
                true,
3190
                1,
3191
                block_3.header().timestamp,
3192
            );
3193
            peer_loop_handler
3194
                .run_wrapper(mock, from_main_rx_clone)
3195
                .await?;
3196

3197
            match to_main_rx1.recv().await {
3198
                Some(PeerTaskToMain::NewBlocks(blocks)) => {
3199
                    if blocks[0].hash() != block_1.hash() {
3200
                        bail!("1st received block by main loop must be block 1");
3201
                    }
3202
                    if blocks[1].hash() != block_2.hash() {
3203
                        bail!("2nd received block by main loop must be block 2");
3204
                    }
3205
                    if blocks[2].hash() != block_3.hash() {
3206
                        bail!("3rd received block by main loop must be block 3");
3207
                    }
3208
                }
3209
                _ => bail!("Did not find msg sent to main task"),
3210
            };
3211
            match to_main_rx1.recv().await {
3212
                Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
3213
                _ => bail!("Must receive remove of peer block max height"),
3214
            }
3215

3216
            if !state_lock.lock_guard().await.net.peer_map.is_empty() {
3217
                bail!("peer map must be empty after closing connection gracefully");
3218
            }
3219

3220
            Ok(())
3221
        }
3222

3223
        #[traced_test]
3224
        #[apply(shared_tokio_runtime)]
3225
        async fn test_block_reconciliation_interrupted_by_block_notification() -> Result<()> {
3226
            // In this scenario, the client know the genesis block (block 0) and block 1, it
3227
            // then receives block 4, meaning that block 3, 2, and 1 will have to be requested.
3228
            // But the requests are interrupted by the peer sending another message: a new block
3229
            // notification.
3230

3231
            let network = Network::Main;
3232
            let (
3233
                _peer_broadcast_tx,
3234
                from_main_rx_clone,
3235
                to_main_tx,
3236
                mut to_main_rx1,
3237
                mut state_lock,
3238
                hsd,
3239
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
3240
            let peer_socket_address: SocketAddr = get_dummy_socket_address(0);
3241
            let genesis_block: Block = state_lock
3242
                .lock_guard()
3243
                .await
3244
                .chain
3245
                .archival_state()
3246
                .get_tip()
3247
                .await;
3248

3249
            let [block_1, block_2, block_3, block_4, block_5] =
3250
                fake_valid_sequence_of_blocks_for_tests(
3251
                    &genesis_block,
3252
                    Timestamp::hours(1),
3253
                    StdRng::seed_from_u64(5550001).random(),
3254
                    network,
3255
                )
3256
                .await;
3257
            state_lock.set_new_tip(block_1.clone()).await?;
3258

3259
            let mock = Mock::new(vec![
3260
                Action::Read(PeerMessage::Block(Box::new(
3261
                    block_4.clone().try_into().unwrap(),
3262
                ))),
3263
                Action::Write(PeerMessage::BlockRequestByHash(block_3.hash())),
3264
                Action::Read(PeerMessage::Block(Box::new(
3265
                    block_3.clone().try_into().unwrap(),
3266
                ))),
3267
                Action::Write(PeerMessage::BlockRequestByHash(block_2.hash())),
3268
                //
3269
                // Now make the interruption of the block reconciliation process
3270
                Action::Read(PeerMessage::BlockNotification((&block_5).into())),
3271
                //
3272
                // Complete the block reconciliation process by requesting the last block
3273
                // in this process, to get back to a mutually known block.
3274
                Action::Read(PeerMessage::Block(Box::new(
3275
                    block_2.clone().try_into().unwrap(),
3276
                ))),
3277
                //
3278
                // Then anticipate the request of the block that was announced
3279
                // in the interruption.
3280
                // Note that we cannot anticipate the response, as only the main
3281
                // task writes to the database. And the database needs to be updated
3282
                // for the handling of block 5 to be done correctly.
3283
                Action::Write(PeerMessage::BlockRequestByHeight(
3284
                    block_5.kernel.header.height,
3285
                )),
3286
                Action::Read(PeerMessage::Bye),
3287
            ]);
3288

3289
            let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
3290
                to_main_tx.clone(),
3291
                state_lock.clone(),
3292
                peer_socket_address,
3293
                hsd,
3294
                false,
3295
                1,
3296
                block_5.header().timestamp,
3297
            );
3298
            peer_loop_handler
3299
                .run_wrapper(mock, from_main_rx_clone)
3300
                .await?;
3301

3302
            match to_main_rx1.recv().await {
3303
                Some(PeerTaskToMain::NewBlocks(blocks)) => {
3304
                    if blocks[0].hash() != block_2.hash() {
3305
                        bail!("1st received block by main loop must be block 1");
3306
                    }
3307
                    if blocks[1].hash() != block_3.hash() {
3308
                        bail!("2nd received block by main loop must be block 2");
3309
                    }
3310
                    if blocks[2].hash() != block_4.hash() {
3311
                        bail!("3rd received block by main loop must be block 3");
3312
                    }
3313
                }
3314
                _ => bail!("Did not find msg sent to main task"),
3315
            };
3316
            match to_main_rx1.recv().await {
3317
                Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
3318
                _ => bail!("Must receive remove of peer block max height"),
3319
            }
3320

3321
            if !state_lock.lock_guard().await.net.peer_map.is_empty() {
3322
                bail!("peer map must be empty after closing connection gracefully");
3323
            }
3324

3325
            Ok(())
3326
        }
3327

3328
        #[traced_test]
3329
        #[apply(shared_tokio_runtime)]
3330
        async fn test_block_reconciliation_interrupted_by_peer_list_request() -> Result<()> {
3331
            // In this scenario, the client knows the genesis block (block 0) and block 1, it
3332
            // then receives block 4, meaning that block 3, 2, and 1 will have to be requested.
3333
            // But the requests are interrupted by the peer sending another message: a request
3334
            // for a list of peers.
3335

3336
            let network = Network::Main;
3337
            let (
3338
                _peer_broadcast_tx,
3339
                from_main_rx_clone,
3340
                to_main_tx,
3341
                mut to_main_rx1,
3342
                mut state_lock,
3343
                _hsd,
3344
            ) = get_test_genesis_setup(network, 1, cli_args::Args::default()).await?;
3345
            let genesis_block = Block::genesis(network);
3346
            let peer_infos: Vec<PeerInfo> = state_lock
3347
                .lock_guard()
3348
                .await
3349
                .net
3350
                .peer_map
3351
                .clone()
3352
                .into_values()
3353
                .collect::<Vec<_>>();
3354

3355
            let [block_1, block_2, block_3, block_4] = fake_valid_sequence_of_blocks_for_tests(
3356
                &genesis_block,
3357
                Timestamp::hours(1),
3358
                StdRng::seed_from_u64(5550001).random(),
3359
                network,
3360
            )
3361
            .await;
3362
            state_lock.set_new_tip(block_1.clone()).await?;
3363

3364
            let (hsd_1, sa_1) = get_dummy_peer_connection_data_genesis(network, 1);
3365
            let expected_peer_list_resp = vec![
3366
                (
3367
                    peer_infos[0].listen_address().unwrap(),
3368
                    peer_infos[0].instance_id(),
3369
                ),
3370
                (sa_1, hsd_1.instance_id),
3371
            ];
3372
            let mock = Mock::new(vec![
3373
                Action::Read(PeerMessage::Block(Box::new(
3374
                    block_4.clone().try_into().unwrap(),
3375
                ))),
3376
                Action::Write(PeerMessage::BlockRequestByHash(block_3.hash())),
3377
                Action::Read(PeerMessage::Block(Box::new(
3378
                    block_3.clone().try_into().unwrap(),
3379
                ))),
3380
                Action::Write(PeerMessage::BlockRequestByHash(block_2.hash())),
3381
                //
3382
                // Now make the interruption of the block reconciliation process
3383
                Action::Read(PeerMessage::PeerListRequest),
3384
                //
3385
                // Answer the request for a peer list
3386
                Action::Write(PeerMessage::PeerListResponse(expected_peer_list_resp)),
3387
                //
3388
                // Complete the block reconciliation process by requesting the last block
3389
                // in this process, to get back to a mutually known block.
3390
                Action::Read(PeerMessage::Block(Box::new(
3391
                    block_2.clone().try_into().unwrap(),
3392
                ))),
3393
                Action::Read(PeerMessage::Bye),
3394
            ]);
3395

3396
            let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
3397
                to_main_tx,
3398
                state_lock.clone(),
3399
                sa_1,
3400
                hsd_1,
3401
                true,
3402
                1,
3403
                block_4.header().timestamp,
3404
            );
3405
            peer_loop_handler
3406
                .run_wrapper(mock, from_main_rx_clone)
3407
                .await?;
3408

3409
            // Verify that blocks are sent to `main_loop` in expected ordering
3410
            match to_main_rx1.recv().await {
3411
                Some(PeerTaskToMain::NewBlocks(blocks)) => {
3412
                    if blocks[0].hash() != block_2.hash() {
3413
                        bail!("1st received block by main loop must be block 1");
3414
                    }
3415
                    if blocks[1].hash() != block_3.hash() {
3416
                        bail!("2nd received block by main loop must be block 2");
3417
                    }
3418
                    if blocks[2].hash() != block_4.hash() {
3419
                        bail!("3rd received block by main loop must be block 3");
3420
                    }
3421
                }
3422
                _ => bail!("Did not find msg sent to main task"),
3423
            };
3424

3425
            assert_eq!(
3426
                1,
3427
                state_lock.lock_guard().await.net.peer_map.len(),
3428
                "One peer must remain in peer list after peer_1 closed gracefully"
3429
            );
3430

3431
            Ok(())
3432
        }
3433
    }
3434

3435
    mod transactions {
3436
        use crate::main_loop::proof_upgrader::PrimitiveWitnessToProofCollection;
3437
        use crate::main_loop::proof_upgrader::PrimitiveWitnessToSingleProof;
3438
        use crate::models::blockchain::transaction::primitive_witness::PrimitiveWitness;
3439
        use crate::models::proof_abstractions::tasm::program::TritonVmProofJobOptions;
3440
        use crate::tests::shared::blocks::fake_deterministic_successor;
3441
        use crate::tests::shared::mock_tx::genesis_tx_with_proof_type;
3442
        use crate::triton_vm_job_queue::vm_job_queue;
3443

3444
        use super::*;
3445

3446
        #[traced_test]
3447
        #[apply(shared_tokio_runtime)]
3448
        async fn receive_transaction_request() {
3449
            let network = Network::Main;
3450
            let dummy_tx = invalid_empty_single_proof_transaction();
3451
            let txid = dummy_tx.kernel.txid();
3452

3453
            for transaction_is_known in [false, true] {
3454
                let (_peer_broadcast_tx, from_main_rx, to_main_tx, _, mut state_lock, _hsd) =
3455
                    get_test_genesis_setup(network, 1, cli_args::Args::default())
3456
                        .await
3457
                        .unwrap();
3458
                if transaction_is_known {
3459
                    state_lock
3460
                        .lock_guard_mut()
3461
                        .await
3462
                        .mempool_insert(dummy_tx.clone(), UpgradePriority::Irrelevant)
3463
                        .await;
3464
                }
3465

3466
                let mock = if transaction_is_known {
3467
                    Mock::new(vec![
3468
                        Action::Read(PeerMessage::TransactionRequest(txid)),
3469
                        Action::Write(PeerMessage::Transaction(Box::new(
3470
                            (&dummy_tx).try_into().unwrap(),
3471
                        ))),
3472
                        Action::Read(PeerMessage::Bye),
3473
                    ])
3474
                } else {
3475
                    Mock::new(vec![
3476
                        Action::Read(PeerMessage::TransactionRequest(txid)),
3477
                        Action::Read(PeerMessage::Bye),
3478
                    ])
3479
                };
3480

3481
                let hsd = get_dummy_handshake_data_for_genesis(network);
3482
                let mut peer_state = MutablePeerState::new(hsd.tip_header.height);
3483
                let mut peer_loop_handler = PeerLoopHandler::new(
3484
                    to_main_tx,
3485
                    state_lock,
3486
                    get_dummy_socket_address(0),
3487
                    hsd,
3488
                    true,
3489
                    1,
3490
                );
3491

3492
                peer_loop_handler
3493
                    .run(mock, from_main_rx, &mut peer_state)
3494
                    .await
3495
                    .unwrap();
3496
            }
3497
        }
3498

3499
        #[traced_test]
3500
        #[apply(shared_tokio_runtime)]
3501
        async fn empty_mempool_request_tx_test() {
3502
            // In this scenario the client receives a transaction notification from
3503
            // a peer of a transaction it doesn't know; the client must then request it.
3504

3505
            let network = Network::Main;
3506
            let (
3507
                _peer_broadcast_tx,
3508
                from_main_rx_clone,
3509
                to_main_tx,
3510
                mut to_main_rx1,
3511
                state_lock,
3512
                _hsd,
3513
            ) = get_test_genesis_setup(network, 1, cli_args::Args::default())
3514
                .await
3515
                .unwrap();
3516

3517
            let spending_key = state_lock
3518
                .lock_guard()
3519
                .await
3520
                .wallet_state
3521
                .wallet_entropy
3522
                .nth_symmetric_key_for_tests(0);
3523
            let genesis_block = Block::genesis(network);
3524
            let now = genesis_block.kernel.header.timestamp;
3525
            let config = TxCreationConfig::default()
3526
                .recover_change_off_chain(spending_key.into())
3527
                .with_prover_capability(TxProvingCapability::ProofCollection);
3528
            let transaction_1: Transaction = state_lock
3529
                .api()
3530
                .tx_initiator_internal()
3531
                .create_transaction(
3532
                    Default::default(),
3533
                    NativeCurrencyAmount::coins(0),
3534
                    now,
3535
                    config,
3536
                )
3537
                .await
3538
                .unwrap()
3539
                .transaction
3540
                .into();
3541

3542
            // Build the resulting transaction notification
3543
            let tx_notification: TransactionNotification = (&transaction_1).try_into().unwrap();
3544
            let mock = Mock::new(vec![
3545
                Action::Read(PeerMessage::TransactionNotification(tx_notification)),
3546
                Action::Write(PeerMessage::TransactionRequest(tx_notification.txid)),
3547
                Action::Read(PeerMessage::Transaction(Box::new(
3548
                    (&transaction_1).try_into().unwrap(),
3549
                ))),
3550
                Action::Read(PeerMessage::Bye),
3551
            ]);
3552

3553
            let (hsd_1, _sa_1) = get_dummy_peer_connection_data_genesis(network, 1);
3554

3555
            // Mock a timestamp to allow transaction to be considered valid
3556
            let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
3557
                to_main_tx,
3558
                state_lock.clone(),
3559
                get_dummy_socket_address(0),
3560
                hsd_1.clone(),
3561
                true,
3562
                1,
3563
                now,
3564
            );
3565

3566
            let mut peer_state = MutablePeerState::new(hsd_1.tip_header.height);
3567

3568
            assert!(
3569
                state_lock.lock_guard().await.mempool.is_empty(),
3570
                "Mempool must be empty at init"
3571
            );
3572
            peer_loop_handler
3573
                .run(mock, from_main_rx_clone, &mut peer_state)
3574
                .await
3575
                .unwrap();
3576

3577
            // Transaction must be sent to `main_loop`. The transaction is stored to the mempool
3578
            // by the `main_loop`.
3579
            match to_main_rx1.recv().await {
3580
                Some(PeerTaskToMain::Transaction(_)) => (),
3581
                _ => panic!("Must receive remove of peer block max height"),
3582
            };
3583
        }
3584

3585
        #[traced_test]
3586
        #[apply(shared_tokio_runtime)]
3587
        async fn populated_mempool_request_tx_test() -> Result<()> {
3588
            // In this scenario the peer is informed of a transaction that it already knows
3589

3590
            let network = Network::Main;
3591
            let (
3592
                _peer_broadcast_tx,
3593
                from_main_rx_clone,
3594
                to_main_tx,
3595
                mut to_main_rx1,
3596
                mut state_lock,
3597
                _hsd,
3598
            ) = get_test_genesis_setup(network, 1, cli_args::Args::default())
3599
                .await
3600
                .unwrap();
3601
            let spending_key = state_lock
3602
                .lock_guard()
3603
                .await
3604
                .wallet_state
3605
                .wallet_entropy
3606
                .nth_symmetric_key_for_tests(0);
3607

3608
            let genesis_block = Block::genesis(network);
3609
            let now = genesis_block.kernel.header.timestamp;
3610
            let config = TxCreationConfig::default()
3611
                .recover_change_off_chain(spending_key.into())
3612
                .with_prover_capability(TxProvingCapability::ProofCollection);
3613
            let transaction_1: Transaction = state_lock
3614
                .api()
3615
                .tx_initiator_internal()
3616
                .create_transaction(
3617
                    Default::default(),
3618
                    NativeCurrencyAmount::coins(0),
3619
                    now,
3620
                    config,
3621
                )
3622
                .await
3623
                .unwrap()
3624
                .transaction
3625
                .into();
3626

3627
            let (hsd_1, _sa_1) = get_dummy_peer_connection_data_genesis(network, 1);
3628
            let mut peer_loop_handler = PeerLoopHandler::new(
3629
                to_main_tx,
3630
                state_lock.clone(),
3631
                get_dummy_socket_address(0),
3632
                hsd_1.clone(),
3633
                true,
3634
                1,
3635
            );
3636
            let mut peer_state = MutablePeerState::new(hsd_1.tip_header.height);
3637

3638
            assert!(
3639
                state_lock.lock_guard().await.mempool.is_empty(),
3640
                "Mempool must be empty at init"
3641
            );
3642
            state_lock
3643
                .lock_guard_mut()
3644
                .await
3645
                .mempool_insert(transaction_1.clone(), UpgradePriority::Irrelevant)
3646
                .await;
3647
            assert!(
3648
                !state_lock.lock_guard().await.mempool.is_empty(),
3649
                "Mempool must be non-empty after insertion"
3650
            );
3651

3652
            // Run the peer loop and verify expected exchange -- namely that the
3653
            // tx notification is received and the the transaction is *not*
3654
            // requested.
3655
            let tx_notification: TransactionNotification = (&transaction_1).try_into().unwrap();
3656
            let mock = Mock::new(vec![
3657
                Action::Read(PeerMessage::TransactionNotification(tx_notification)),
3658
                Action::Read(PeerMessage::Bye),
3659
            ]);
3660
            peer_loop_handler
3661
                .run(mock, from_main_rx_clone, &mut peer_state)
3662
                .await
3663
                .unwrap();
3664

3665
            // nothing is allowed to be sent to `main_loop`
3666
            match to_main_rx1.try_recv() {
3667
                Err(TryRecvError::Empty) => (),
3668
                Err(TryRecvError::Disconnected) => panic!("to_main channel must still be open"),
3669
                Ok(_) => panic!("to_main channel must be empty"),
3670
            };
3671
            Ok(())
3672
        }
3673

3674
        #[traced_test]
3675
        #[apply(shared_tokio_runtime)]
3676
        async fn accepts_tx_with_updated_mutator_set() {
3677
            // Scenario: node has transaction in mempool and receives
3678
            // transaction notification for the same transaction with an updated
3679
            // mutator set. The node must request the new transaction and it
3680
            // must be passed on to main loop.
3681
            //
3682
            // Both ProofCollection and SingleProof backed transactions are
3683
            // tested.
3684

3685
            enum ProofType {
3686
                ProofCollection,
3687
                SingleProof,
3688
            }
3689

3690
            let network = Network::Main;
3691

3692
            for proof_type in [ProofType::ProofCollection, ProofType::SingleProof] {
3693
                let proof_job_options = TritonVmProofJobOptions::default();
3694
                let upgrade = async |primitive_witness: PrimitiveWitness| match proof_type {
3695
                    ProofType::ProofCollection => {
3696
                        PrimitiveWitnessToProofCollection { primitive_witness }
3697
                            .upgrade(vm_job_queue(), &proof_job_options)
3698
                            .await
3699
                            .unwrap()
3700
                    }
3701
                    ProofType::SingleProof => PrimitiveWitnessToSingleProof { primitive_witness }
3702
                        .upgrade(vm_job_queue(), &proof_job_options)
3703
                        .await
3704
                        .unwrap(),
3705
                };
3706

3707
                let (
3708
                    _peer_broadcast_tx,
3709
                    from_main_rx_clone,
3710
                    to_main_tx,
3711
                    mut to_main_rx1,
3712
                    mut state_lock,
3713
                    _hsd,
3714
                ) = get_test_genesis_setup(network, 1, cli_args::Args::default())
3715
                    .await
3716
                    .unwrap();
3717
                let fee = NativeCurrencyAmount::from_nau(500);
3718
                let pw_genesis =
3719
                    genesis_tx_with_proof_type(TxProvingCapability::PrimitiveWitness, network, fee)
3720
                        .await
3721
                        .proof
3722
                        .clone()
3723
                        .into_primitive_witness();
3724

3725
                let tx_synced_to_genesis = upgrade(pw_genesis.clone()).await;
3726

3727
                let genesis_block = Block::genesis(network);
3728
                let block1 = fake_deterministic_successor(&genesis_block, network).await;
3729
                state_lock
3730
                    .lock_guard_mut()
3731
                    .await
3732
                    .set_new_tip(block1.clone())
3733
                    .await
3734
                    .unwrap();
3735

3736
                state_lock
3737
                    .lock_guard_mut()
3738
                    .await
3739
                    .mempool_insert(tx_synced_to_genesis, UpgradePriority::Irrelevant)
3740
                    .await;
3741

3742
                // Mempool should now contain the unsynced transaction. Tip is block 1.
3743
                let pw_block1 =
3744
                    pw_genesis.update_with_new_ms_data(block1.mutator_set_update().unwrap());
3745
                let tx_synced_to_block1 = upgrade(pw_block1).await;
3746

3747
                let tx_notification: TransactionNotification =
3748
                    (&tx_synced_to_block1).try_into().unwrap();
3749
                let mock = Mock::new(vec![
3750
                    Action::Read(PeerMessage::TransactionNotification(tx_notification)),
3751
                    Action::Write(PeerMessage::TransactionRequest(tx_notification.txid)),
3752
                    Action::Read(PeerMessage::Transaction(Box::new(
3753
                        (&tx_synced_to_block1).try_into().unwrap(),
3754
                    ))),
3755
                    Action::Read(PeerMessage::Bye),
3756
                ]);
3757

3758
                // Mock a timestamp to allow transaction to be considered valid
3759
                let now = tx_synced_to_block1.kernel.timestamp;
3760
                let (hsd_1, _sa_1) = get_dummy_peer_connection_data_genesis(network, 1);
3761
                let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
3762
                    to_main_tx,
3763
                    state_lock.clone(),
3764
                    get_dummy_socket_address(0),
3765
                    hsd_1.clone(),
3766
                    true,
3767
                    1,
3768
                    now,
3769
                );
3770

3771
                let mut peer_state = MutablePeerState::new(hsd_1.tip_header.height);
3772
                peer_loop_handler
3773
                    .run(mock, from_main_rx_clone, &mut peer_state)
3774
                    .await
3775
                    .unwrap();
3776

3777
                // Transaction must be sent to `main_loop`. The transaction is stored to the mempool
3778
                // by the `main_loop`.
3779
                match to_main_rx1.recv().await {
3780
                    Some(PeerTaskToMain::Transaction(_)) => (),
3781
                    _ => panic!("Main loop must receive new transaction"),
3782
                };
3783
            }
3784
        }
3785
    }
3786

3787
    mod block_proposals {
3788
        use super::*;
3789

3790
        struct TestSetup {
3791
            peer_loop_handler: PeerLoopHandler,
3792
            to_main_rx: mpsc::Receiver<PeerTaskToMain>,
3793
            from_main_rx: broadcast::Receiver<MainToPeerTask>,
3794
            peer_state: MutablePeerState,
3795
            to_main_tx: mpsc::Sender<PeerTaskToMain>,
3796
            genesis_block: Block,
3797
            peer_broadcast_tx: broadcast::Sender<MainToPeerTask>,
3798
        }
3799

3800
        async fn genesis_setup(network: Network) -> TestSetup {
3801
            let (peer_broadcast_tx, from_main_rx, to_main_tx, to_main_rx, alice, _hsd) =
3802
                get_test_genesis_setup(network, 0, cli_args::Args::default())
3803
                    .await
3804
                    .unwrap();
3805
            let peer_hsd = get_dummy_handshake_data_for_genesis(network);
3806
            let peer_loop_handler = PeerLoopHandler::new(
3807
                to_main_tx.clone(),
3808
                alice.clone(),
3809
                get_dummy_socket_address(0),
3810
                peer_hsd.clone(),
3811
                true,
3812
                1,
3813
            );
3814
            let peer_state = MutablePeerState::new(peer_hsd.tip_header.height);
3815

3816
            // (peer_loop_handler, to_main_rx1)
3817
            TestSetup {
3818
                peer_broadcast_tx,
3819
                peer_loop_handler,
3820
                to_main_rx,
3821
                from_main_rx,
3822
                peer_state,
3823
                to_main_tx,
3824
                genesis_block: Block::genesis(network),
3825
            }
3826
        }
3827

3828
        #[traced_test]
3829
        #[apply(shared_tokio_runtime)]
3830
        async fn accept_block_proposal_height_one() {
3831
            // Node knows genesis block, receives a block proposal for block 1
3832
            // and must accept this. Verify that main loop is informed of block
3833
            // proposal.
3834
            let TestSetup {
3835
                peer_broadcast_tx,
3836
                mut peer_loop_handler,
3837
                mut to_main_rx,
3838
                from_main_rx,
3839
                mut peer_state,
3840
                to_main_tx,
3841
                genesis_block,
3842
            } = genesis_setup(Network::Main).await;
3843
            let block1 = fake_valid_block_for_tests(
3844
                &peer_loop_handler.global_state_lock,
3845
                StdRng::seed_from_u64(5550001).random(),
3846
            )
3847
            .await;
3848

3849
            let mock = Mock::new(vec![
3850
                Action::Read(PeerMessage::BlockProposal(Box::new(block1))),
3851
                Action::Read(PeerMessage::Bye),
3852
            ]);
3853
            peer_loop_handler
3854
                .run(mock, from_main_rx, &mut peer_state)
3855
                .await
3856
                .unwrap();
3857

3858
            match to_main_rx.try_recv().unwrap() {
3859
                PeerTaskToMain::BlockProposal(block) => {
3860
                    assert_eq!(genesis_block.hash(), block.header().prev_block_digest);
3861
                }
3862
                _ => panic!("Expected main loop to be informed of block proposal"),
3863
            };
3864

3865
            drop(to_main_tx);
3866
            drop(peer_broadcast_tx);
3867
        }
3868

3869
        #[traced_test]
3870
        #[apply(shared_tokio_runtime)]
3871
        async fn accept_block_proposal_notification_height_one() {
3872
            // Node knows genesis block, receives a block proposal notification
3873
            // for block 1 and must accept this by requesting the block
3874
            // proposal from peer.
3875
            let TestSetup {
3876
                peer_broadcast_tx,
3877
                mut peer_loop_handler,
3878
                to_main_rx: _,
3879
                from_main_rx,
3880
                mut peer_state,
3881
                to_main_tx,
3882
                ..
3883
            } = genesis_setup(Network::Main).await;
3884
            let block1 = fake_valid_block_for_tests(
3885
                &peer_loop_handler.global_state_lock,
3886
                StdRng::seed_from_u64(5550001).random(),
3887
            )
3888
            .await;
3889

3890
            let mock = Mock::new(vec![
3891
                Action::Read(PeerMessage::BlockProposalNotification((&block1).into())),
3892
                Action::Write(PeerMessage::BlockProposalRequest(
3893
                    BlockProposalRequest::new(block1.body().mast_hash()),
3894
                )),
3895
                Action::Read(PeerMessage::Bye),
3896
            ]);
3897
            peer_loop_handler
3898
                .run(mock, from_main_rx, &mut peer_state)
3899
                .await
3900
                .unwrap();
3901

3902
            drop(to_main_tx);
3903
            drop(peer_broadcast_tx);
3904
        }
3905
    }
3906

3907
    mod proof_qualities {
3908
        use strum::IntoEnumIterator;
3909

3910
        use super::*;
3911
        use crate::config_models::cli_args;
3912
        use crate::models::blockchain::transaction::Transaction;
3913
        use crate::models::peer::transfer_transaction::TransactionProofQuality;
3914
        use crate::models::state::wallet::transaction_output::TxOutput;
3915
        use crate::tests::shared::globalstate::mock_genesis_global_state;
3916

3917
        async fn tx_of_proof_quality(
3918
            network: Network,
3919
            quality: TransactionProofQuality,
3920
        ) -> Transaction {
3921
            let wallet_secret = WalletEntropy::devnet_wallet();
3922
            let alice_key = wallet_secret.nth_generation_spending_key_for_tests(0);
3923
            let alice_gsl = mock_genesis_global_state(
3924
                1,
3925
                wallet_secret,
3926
                cli_args::Args::default_with_network(network),
3927
            )
3928
            .await;
3929
            let alice = alice_gsl.lock_guard().await;
3930
            let genesis_block = alice.chain.light_state();
3931
            let in_seven_months = genesis_block.header().timestamp + Timestamp::months(7);
3932
            let prover_capability = match quality {
3933
                TransactionProofQuality::ProofCollection => TxProvingCapability::ProofCollection,
3934
                TransactionProofQuality::SingleProof => TxProvingCapability::SingleProof,
3935
            };
3936
            let config = TxCreationConfig::default()
3937
                .recover_change_off_chain(alice_key.into())
3938
                .with_prover_capability(prover_capability);
3939
            alice_gsl
3940
                .api()
3941
                .tx_initiator_internal()
3942
                .create_transaction(
3943
                    Vec::<TxOutput>::new().into(),
3944
                    NativeCurrencyAmount::coins(1),
3945
                    in_seven_months,
3946
                    config,
3947
                )
3948
                .await
3949
                .unwrap()
3950
                .transaction()
3951
                .clone()
3952
        }
3953

3954
        #[traced_test]
3955
        #[apply(shared_tokio_runtime)]
3956
        async fn client_favors_higher_proof_quality() {
3957
            // In this scenario the peer is informed of a transaction that it
3958
            // already knows, and it's tested that it checks the proof quality
3959
            // field and verifies that it exceeds the proof in the mempool
3960
            // before requesting the transasction.
3961
            let network = Network::Main;
3962
            let proof_collection_tx =
3963
                tx_of_proof_quality(network, TransactionProofQuality::ProofCollection).await;
3964
            let single_proof_tx =
3965
                tx_of_proof_quality(network, TransactionProofQuality::SingleProof).await;
3966

3967
            for (own_tx_pq, new_tx_pq) in
3968
                TransactionProofQuality::iter().cartesian_product(TransactionProofQuality::iter())
3969
            {
3970
                use TransactionProofQuality::*;
3971

3972
                let (
3973
                    _peer_broadcast_tx,
3974
                    from_main_rx_clone,
3975
                    to_main_tx,
3976
                    mut to_main_rx1,
3977
                    mut alice,
3978
                    handshake_data,
3979
                ) = get_test_genesis_setup(network, 1, cli_args::Args::default())
3980
                    .await
3981
                    .unwrap();
3982

3983
                let (own_tx, new_tx) = match (own_tx_pq, new_tx_pq) {
3984
                    (ProofCollection, ProofCollection) => {
3985
                        (&proof_collection_tx, &proof_collection_tx)
3986
                    }
3987
                    (ProofCollection, SingleProof) => (&proof_collection_tx, &single_proof_tx),
3988
                    (SingleProof, ProofCollection) => (&single_proof_tx, &proof_collection_tx),
3989
                    (SingleProof, SingleProof) => (&single_proof_tx, &single_proof_tx),
3990
                };
3991

3992
                alice
3993
                    .lock_guard_mut()
3994
                    .await
3995
                    .mempool_insert((*own_tx).to_owned(), UpgradePriority::Irrelevant)
3996
                    .await;
3997

3998
                let tx_notification: TransactionNotification = new_tx.try_into().unwrap();
3999

4000
                let own_proof_is_supreme = own_tx_pq >= new_tx_pq;
4001
                let mock = if own_proof_is_supreme {
4002
                    Mock::new(vec![
4003
                        Action::Read(PeerMessage::TransactionNotification(tx_notification)),
4004
                        Action::Read(PeerMessage::Bye),
4005
                    ])
4006
                } else {
4007
                    Mock::new(vec![
4008
                        Action::Read(PeerMessage::TransactionNotification(tx_notification)),
4009
                        Action::Write(PeerMessage::TransactionRequest(tx_notification.txid)),
4010
                        Action::Read(PeerMessage::Transaction(Box::new(
4011
                            new_tx.try_into().unwrap(),
4012
                        ))),
4013
                        Action::Read(PeerMessage::Bye),
4014
                    ])
4015
                };
4016

4017
                let now = proof_collection_tx.kernel.timestamp;
4018
                let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
4019
                    to_main_tx,
4020
                    alice.clone(),
4021
                    get_dummy_socket_address(0),
4022
                    handshake_data.clone(),
4023
                    true,
4024
                    1,
4025
                    now,
4026
                );
4027
                let mut peer_state = MutablePeerState::new(handshake_data.tip_header.height);
4028

4029
                peer_loop_handler
4030
                    .run(mock, from_main_rx_clone, &mut peer_state)
4031
                    .await
4032
                    .unwrap();
4033

4034
                if own_proof_is_supreme {
4035
                    match to_main_rx1.try_recv() {
4036
                        Err(TryRecvError::Empty) => (),
4037
                        Err(TryRecvError::Disconnected) => {
4038
                            panic!("to_main channel must still be open")
4039
                        }
4040
                        Ok(_) => panic!("to_main channel must be empty"),
4041
                    }
4042
                } else {
4043
                    match to_main_rx1.try_recv() {
4044
                        Err(TryRecvError::Empty) => panic!("Transaction must be sent to main loop"),
4045
                        Err(TryRecvError::Disconnected) => {
4046
                            panic!("to_main channel must still be open")
4047
                        }
4048
                        Ok(PeerTaskToMain::Transaction(_)) => (),
4049
                        _ => panic!("Unexpected result from channel"),
4050
                    }
4051
                }
4052
            }
4053
        }
4054
    }
4055

4056
    mod sync_challenges {
4057
        use super::*;
4058
        use crate::tests::shared::blocks::fake_valid_sequence_of_blocks_for_tests_dyn;
4059

4060
        #[traced_test]
4061
        #[apply(shared_tokio_runtime)]
4062
        async fn bad_sync_challenge_height_greater_than_tip() {
4063
            // Criterium: Challenge height may not exceed that of tip in the
4064
            // request.
4065

4066
            let network = Network::Main;
4067
            let (
4068
                _alice_main_to_peer_tx,
4069
                alice_main_to_peer_rx,
4070
                alice_peer_to_main_tx,
4071
                alice_peer_to_main_rx,
4072
                mut alice,
4073
                alice_hsd,
4074
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default())
4075
                .await
4076
                .unwrap();
4077
            let genesis_block: Block = Block::genesis(network);
4078
            let blocks: [Block; 11] = fake_valid_sequence_of_blocks_for_tests(
4079
                &genesis_block,
4080
                Timestamp::hours(1),
4081
                Default::default(),
4082
                network,
4083
            )
4084
            .await;
4085
            for block in &blocks {
4086
                alice.set_new_tip(block.clone()).await.unwrap();
4087
            }
4088

4089
            let bh12 = blocks.last().unwrap().header().height;
4090
            let sync_challenge = SyncChallenge {
4091
                tip_digest: blocks[9].hash(),
4092
                challenges: [bh12; 10],
4093
            };
4094
            let alice_p2p_messages = Mock::new(vec![
4095
                Action::Read(PeerMessage::SyncChallenge(sync_challenge)),
4096
                Action::Read(PeerMessage::Bye),
4097
            ]);
4098

4099
            let peer_address = get_dummy_socket_address(0);
4100
            let mut alice_peer_loop_handler = PeerLoopHandler::new(
4101
                alice_peer_to_main_tx.clone(),
4102
                alice.clone(),
4103
                peer_address,
4104
                alice_hsd,
4105
                false,
4106
                1,
4107
            );
4108
            alice_peer_loop_handler
4109
                .run_wrapper(alice_p2p_messages, alice_main_to_peer_rx)
4110
                .await
4111
                .unwrap();
4112

4113
            drop(alice_peer_to_main_rx);
4114

4115
            let latest_sanction = alice
4116
                .lock_guard()
4117
                .await
4118
                .net
4119
                .get_peer_standing_from_database(peer_address.ip())
4120
                .await
4121
                .unwrap();
4122
            assert_eq!(
4123
                NegativePeerSanction::InvalidSyncChallenge,
4124
                latest_sanction
4125
                    .latest_punishment
4126
                    .expect("peer must be sanctioned")
4127
                    .0
4128
            );
4129
        }
4130

4131
        #[traced_test]
4132
        #[apply(shared_tokio_runtime)]
4133
        async fn bad_sync_challenge_genesis_block_doesnt_crash_client() {
4134
            // Criterium: Challenge may not point to genesis block, or block 1, as
4135
            // tip.
4136

4137
            let network = Network::Main;
4138
            let genesis_block: Block = Block::genesis(network);
4139

4140
            let alice_cli = cli_args::Args::default();
4141
            let (
4142
                _alice_main_to_peer_tx,
4143
                alice_main_to_peer_rx,
4144
                alice_peer_to_main_tx,
4145
                alice_peer_to_main_rx,
4146
                alice,
4147
                alice_hsd,
4148
            ) = get_test_genesis_setup(network, 0, alice_cli).await.unwrap();
4149

4150
            let sync_challenge = SyncChallenge {
4151
                tip_digest: genesis_block.hash(),
4152
                challenges: [BlockHeight::genesis(); 10],
4153
            };
4154

4155
            let alice_p2p_messages = Mock::new(vec![
4156
                Action::Read(PeerMessage::SyncChallenge(sync_challenge)),
4157
                Action::Read(PeerMessage::Bye),
4158
            ]);
4159

4160
            let peer_address = get_dummy_socket_address(0);
4161
            let mut alice_peer_loop_handler = PeerLoopHandler::new(
4162
                alice_peer_to_main_tx.clone(),
4163
                alice.clone(),
4164
                peer_address,
4165
                alice_hsd,
4166
                false,
4167
                1,
4168
            );
4169
            alice_peer_loop_handler
4170
                .run_wrapper(alice_p2p_messages, alice_main_to_peer_rx)
4171
                .await
4172
                .unwrap();
4173

4174
            drop(alice_peer_to_main_rx);
4175

4176
            let latest_sanction = alice
4177
                .lock_guard()
4178
                .await
4179
                .net
4180
                .get_peer_standing_from_database(peer_address.ip())
4181
                .await
4182
                .unwrap();
4183
            assert_eq!(
4184
                NegativePeerSanction::InvalidSyncChallenge,
4185
                latest_sanction
4186
                    .latest_punishment
4187
                    .expect("peer must be sanctioned")
4188
                    .0
4189
            );
4190
        }
4191

4192
        #[traced_test]
4193
        #[apply(shared_tokio_runtime)]
4194
        async fn sync_challenge_happy_path() -> Result<()> {
4195
            // Bob notifies Alice of a block whose parameters satisfy the sync mode
4196
            // criterion. Alice issues a challenge. Bob responds. Alice enters into
4197
            // sync mode.
4198

4199
            let mut rng = rand::rng();
4200
            let network = Network::Main;
4201
            let genesis_block: Block = Block::genesis(network);
4202

4203
            const ALICE_SYNC_MODE_THRESHOLD: usize = 10;
4204
            let alice_cli = cli_args::Args {
4205
                sync_mode_threshold: ALICE_SYNC_MODE_THRESHOLD,
4206
                ..Default::default()
4207
            };
4208
            let (
4209
                _alice_main_to_peer_tx,
4210
                alice_main_to_peer_rx,
4211
                alice_peer_to_main_tx,
4212
                mut alice_peer_to_main_rx,
4213
                mut alice,
4214
                alice_hsd,
4215
            ) = get_test_genesis_setup(network, 0, alice_cli).await?;
4216
            let _alice_socket_address = get_dummy_socket_address(0);
4217

4218
            let (
4219
                _bob_main_to_peer_tx,
4220
                _bob_main_to_peer_rx,
4221
                _bob_peer_to_main_tx,
4222
                _bob_peer_to_main_rx,
4223
                mut bob,
4224
                _bob_hsd,
4225
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
4226
            let bob_socket_address = get_dummy_socket_address(0);
4227

4228
            let now = genesis_block.header().timestamp + Timestamp::hours(1);
4229
            let block_1 = fake_valid_block_for_tests(&alice, rng.random()).await;
4230
            assert!(
4231
                block_1.is_valid(&genesis_block, now, network).await,
4232
                "Block must be valid for this test to make sense"
4233
            );
4234
            let alice_tip = &block_1;
4235
            alice.set_new_tip(block_1.clone()).await?;
4236
            bob.set_new_tip(block_1.clone()).await?;
4237

4238
            // produce enough blocks to ensure alice needs to go into sync mode
4239
            // with this block notification.
4240
            let blocks = fake_valid_sequence_of_blocks_for_tests_dyn(
4241
                &block_1,
4242
                network.target_block_interval(),
4243
                (0..rng.random_range(ALICE_SYNC_MODE_THRESHOLD + 1..20))
4244
                    .map(|_| rng.random())
4245
                    .collect_vec(),
4246
                network,
4247
            )
4248
            .await;
4249
            for block in &blocks {
4250
                bob.set_new_tip(block.clone()).await?;
4251
            }
4252
            let bob_tip = blocks.last().unwrap();
4253

4254
            let block_notification_from_bob = PeerBlockNotification {
4255
                hash: bob_tip.hash(),
4256
                height: bob_tip.header().height,
4257
                cumulative_proof_of_work: bob_tip.header().cumulative_proof_of_work,
4258
            };
4259

4260
            let alice_rng_seed = rng.random::<[u8; 32]>();
4261
            let mut alice_rng_clone = StdRng::from_seed(alice_rng_seed);
4262
            let sync_challenge_from_alice = SyncChallenge::generate(
4263
                &block_notification_from_bob,
4264
                alice_tip.header().height,
4265
                alice_rng_clone.random(),
4266
            );
4267

4268
            println!(
4269
                "sync challenge from alice:\n{:?}",
4270
                sync_challenge_from_alice
4271
            );
4272

4273
            let sync_challenge_response_from_bob = bob
4274
                .lock_guard()
4275
                .await
4276
                .response_to_sync_challenge(sync_challenge_from_alice)
4277
                .await
4278
                .expect("should be able to respond to sync challenge");
4279

4280
            let alice_p2p_messages = Mock::new(vec![
4281
                Action::Read(PeerMessage::BlockNotification(block_notification_from_bob)),
4282
                Action::Write(PeerMessage::SyncChallenge(sync_challenge_from_alice)),
4283
                Action::Read(PeerMessage::SyncChallengeResponse(Box::new(
4284
                    sync_challenge_response_from_bob,
4285
                ))),
4286
                Action::Read(PeerMessage::BlockNotification(block_notification_from_bob)),
4287
                // Ensure no 2nd sync challenge is sent, as timeout has not yet passed.
4288
                // The absence of a Write here checks that a 2nd challenge isn't sent
4289
                // when a successful was just received.
4290
                Action::Read(PeerMessage::Bye),
4291
            ]);
4292

4293
            let mut alice_peer_loop_handler = PeerLoopHandler::with_mocked_time(
4294
                alice_peer_to_main_tx.clone(),
4295
                alice.clone(),
4296
                bob_socket_address,
4297
                alice_hsd,
4298
                false,
4299
                1,
4300
                bob_tip.header().timestamp,
4301
            );
4302
            alice_peer_loop_handler.set_rng(StdRng::from_seed(alice_rng_seed));
4303
            alice_peer_loop_handler
4304
                .run_wrapper(alice_p2p_messages, alice_main_to_peer_rx)
4305
                .await?;
4306

4307
            // AddPeerMaxBlockHeight message triggered *after* sync challenge
4308
            let mut expected_anchor_mmra = bob_tip.body().block_mmr_accumulator.clone();
4309
            expected_anchor_mmra.append(bob_tip.hash());
4310
            let expected_message_from_alice_peer_loop = PeerTaskToMain::AddPeerMaxBlockHeight {
4311
                peer_address: bob_socket_address,
4312
                claimed_height: bob_tip.header().height,
4313
                claimed_cumulative_pow: bob_tip.header().cumulative_proof_of_work,
4314
                claimed_block_mmra: expected_anchor_mmra,
4315
            };
4316
            let observed_message_from_alice_peer_loop = alice_peer_to_main_rx.recv().await.unwrap();
4317
            assert_eq!(
4318
                expected_message_from_alice_peer_loop,
4319
                observed_message_from_alice_peer_loop
4320
            );
4321

4322
            Ok(())
4323
        }
4324
    }
4325
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc