• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Neptune-Crypto / neptune-core / 16318292757

16 Jul 2025 11:31AM UTC coverage: 72.439% (-0.04%) from 72.481%
16318292757

Pull #628

github

web-flow
Merge 11d55ce67 into cd1aad2bb
Pull Request #628: Dockerize

20666 of 28529 relevant lines covered (72.44%)

487039.41 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.61
/src/peer_loop.rs
1
use std::cmp;
2
use std::marker::Unpin;
3
use std::net::SocketAddr;
4
use std::time::SystemTime;
5

6
use anyhow::bail;
7
use anyhow::Result;
8
use chrono::DateTime;
9
use chrono::Utc;
10
use futures::sink::Sink;
11
use futures::sink::SinkExt;
12
use futures::stream::TryStream;
13
use futures::stream::TryStreamExt;
14
use itertools::Itertools;
15
use rand::rngs::StdRng;
16
use rand::Rng;
17
use rand::SeedableRng;
18
use tasm_lib::triton_vm::prelude::Digest;
19
use tasm_lib::twenty_first::prelude::Mmr;
20
use tasm_lib::twenty_first::prelude::MmrMembershipProof;
21
use tasm_lib::twenty_first::util_types::mmr::mmr_accumulator::MmrAccumulator;
22
use tokio::select;
23
use tokio::sync::broadcast;
24
use tokio::sync::mpsc;
25
use tracing::debug;
26
use tracing::error;
27
use tracing::info;
28
use tracing::warn;
29

30
use crate::connect_to_peers::close_peer_connected_callback;
31
use crate::macros::fn_name;
32
use crate::macros::log_slow_scope;
33
use crate::main_loop::MAX_NUM_DIGESTS_IN_BATCH_REQUEST;
34
use crate::models::blockchain::block::block_height::BlockHeight;
35
use crate::models::blockchain::block::mutator_set_update::MutatorSetUpdate;
36
use crate::models::blockchain::block::Block;
37
use crate::models::blockchain::transaction::transaction_kernel::TransactionConfirmabilityError;
38
use crate::models::blockchain::transaction::Transaction;
39
use crate::models::channel::MainToPeerTask;
40
use crate::models::channel::PeerTaskToMain;
41
use crate::models::channel::PeerTaskToMainTransaction;
42
use crate::models::peer::handshake_data::HandshakeData;
43
use crate::models::peer::peer_info::PeerConnectionInfo;
44
use crate::models::peer::peer_info::PeerInfo;
45
use crate::models::peer::transfer_block::TransferBlock;
46
use crate::models::peer::BlockProposalRequest;
47
use crate::models::peer::BlockRequestBatch;
48
use crate::models::peer::IssuedSyncChallenge;
49
use crate::models::peer::MutablePeerState;
50
use crate::models::peer::NegativePeerSanction;
51
use crate::models::peer::PeerMessage;
52
use crate::models::peer::PeerSanction;
53
use crate::models::peer::PeerStanding;
54
use crate::models::peer::PositivePeerSanction;
55
use crate::models::peer::SyncChallenge;
56
use crate::models::proof_abstractions::mast_hash::MastHash;
57
use crate::models::proof_abstractions::timestamp::Timestamp;
58
use crate::models::state::block_proposal::BlockProposalRejectError;
59
use crate::models::state::mempool::MEMPOOL_IGNORE_TRANSACTIONS_THIS_MANY_SECS_AHEAD;
60
use crate::models::state::mempool::MEMPOOL_TX_THRESHOLD_AGE_IN_SECS;
61
use crate::models::state::GlobalState;
62
use crate::models::state::GlobalStateLock;
63
use crate::util_types::mutator_set::removal_record::RemovalRecordValidityError;
64

65
const STANDARD_BLOCK_BATCH_SIZE: usize = 250;
66
const MAX_PEER_LIST_LENGTH: usize = 10;
67
const MINIMUM_BLOCK_BATCH_SIZE: usize = 2;
68

69
const KEEP_CONNECTION_ALIVE: bool = false;
70
const DISCONNECT_CONNECTION: bool = true;
71

72
pub type PeerStandingNumber = i32;
73

74
/// Handles messages from peers via TCP
75
///
76
/// also handles messages from main task over the main-to-peer-tasks broadcast
77
/// channel.
78
#[derive(Debug, Clone)]
79
pub struct PeerLoopHandler {
80
    to_main_tx: mpsc::Sender<PeerTaskToMain>,
81
    global_state_lock: GlobalStateLock,
82
    peer_address: SocketAddr,
83
    peer_handshake_data: HandshakeData,
84
    inbound_connection: bool,
85
    distance: u8,
86
    rng: StdRng,
87
    #[cfg(test)]
88
    mock_now: Option<Timestamp>,
89
}
90

91
impl PeerLoopHandler {
92
    pub(crate) fn new(
28✔
93
        to_main_tx: mpsc::Sender<PeerTaskToMain>,
28✔
94
        global_state_lock: GlobalStateLock,
28✔
95
        peer_address: SocketAddr,
28✔
96
        peer_handshake_data: HandshakeData,
28✔
97
        inbound_connection: bool,
28✔
98
        distance: u8,
28✔
99
    ) -> Self {
28✔
100
        Self {
28✔
101
            to_main_tx,
28✔
102
            global_state_lock,
28✔
103
            peer_address,
28✔
104
            peer_handshake_data,
28✔
105
            inbound_connection,
28✔
106
            distance,
28✔
107
            rng: StdRng::from_rng(&mut rand::rng()),
28✔
108
            #[cfg(test)]
28✔
109
            mock_now: None,
28✔
110
        }
28✔
111
    }
28✔
112

113
    /// Allows for mocked timestamps such that time dependencies may be tested.
114
    #[cfg(test)]
115
    pub(crate) fn with_mocked_time(
22✔
116
        to_main_tx: mpsc::Sender<PeerTaskToMain>,
22✔
117
        global_state_lock: GlobalStateLock,
22✔
118
        peer_address: SocketAddr,
22✔
119
        peer_handshake_data: HandshakeData,
22✔
120
        inbound_connection: bool,
22✔
121
        distance: u8,
22✔
122
        mocked_time: Timestamp,
22✔
123
    ) -> Self {
22✔
124
        Self {
22✔
125
            to_main_tx,
22✔
126
            global_state_lock,
22✔
127
            peer_address,
22✔
128
            peer_handshake_data,
22✔
129
            inbound_connection,
22✔
130
            distance,
22✔
131
            mock_now: Some(mocked_time),
22✔
132
            rng: StdRng::from_rng(&mut rand::rng()),
22✔
133
        }
22✔
134
    }
22✔
135

136
    /// Overwrite the random number generator object with a specific one.
137
    ///
138
    /// Useful for derandomizing tests.
139
    #[cfg(test)]
140
    fn set_rng(&mut self, rng: StdRng) {
1✔
141
        self.rng = rng;
1✔
142
    }
1✔
143

144
    fn now(&self) -> Timestamp {
56✔
145
        #[cfg(not(test))]
146
        {
147
            Timestamp::now()
27✔
148
        }
149
        #[cfg(test)]
150
        {
151
            self.mock_now.unwrap_or(Timestamp::now())
29✔
152
        }
153
    }
56✔
154

155
    /// Punish a peer for bad behavior.
156
    ///
157
    /// Return `Err` if the peer in question is (now) banned.
158
    ///
159
    /// # Locking:
160
    ///   * acquires `global_state_lock` for write
161
    async fn punish(&mut self, reason: NegativePeerSanction) -> Result<()> {
6✔
162
        let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
6✔
163
        warn!("Punishing peer {} for {:?}", self.peer_address.ip(), reason);
6✔
164
        debug!(
6✔
165
            "Peer standing before punishment is {}",
×
166
            global_state_mut
×
167
                .net
×
168
                .peer_map
×
169
                .get(&self.peer_address)
×
170
                .unwrap()
×
171
                .standing
172
        );
173

174
        let Some(peer_info) = global_state_mut.net.peer_map.get_mut(&self.peer_address) else {
6✔
175
            bail!("Could not read peer map.");
×
176
        };
177
        let sanction_result = peer_info.standing.sanction(PeerSanction::Negative(reason));
6✔
178
        if let Err(err) = sanction_result {
6✔
179
            warn!("Banning peer: {err}");
1✔
180
        }
5✔
181

182
        sanction_result.map_err(|err| anyhow::anyhow!("Banning peer: {err}"))
6✔
183
    }
6✔
184

185
    /// Reward a peer for good behavior.
186
    ///
187
    /// Return `Err` if the peer in question is banned.
188
    ///
189
    /// # Locking:
190
    ///   * acquires `global_state_lock` for write
191
    async fn reward(&mut self, reason: PositivePeerSanction) -> Result<()> {
19✔
192
        let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
19✔
193
        info!("Rewarding peer {} for {:?}", self.peer_address.ip(), reason);
19✔
194
        let Some(peer_info) = global_state_mut.net.peer_map.get_mut(&self.peer_address) else {
19✔
195
            error!("Could not read peer map.");
1✔
196
            return Ok(());
1✔
197
        };
198
        let sanction_result = peer_info.standing.sanction(PeerSanction::Positive(reason));
18✔
199
        if sanction_result.is_err() {
18✔
200
            error!("Cannot reward banned peer");
×
201
        }
18✔
202

203
        sanction_result.map_err(|err| anyhow::anyhow!("Cannot reward banned peer: {err}"))
18✔
204
    }
19✔
205

206
    /// Construct a batch response, with blocks and their MMR membership proofs
207
    /// relative to a specified anchor.
208
    ///
209
    /// Returns `None` if the anchor has a lower leaf count than the blocks, or
210
    /// a block height of the response exceeds own tip height.
211
    async fn batch_response(
16✔
212
        state: &GlobalState,
16✔
213
        blocks: Vec<Block>,
16✔
214
        anchor: &MmrAccumulator,
16✔
215
    ) -> Option<Vec<(TransferBlock, MmrMembershipProof)>> {
16✔
216
        let own_tip_height = state.chain.light_state().header().height;
16✔
217
        let block_heights_match_anchor = blocks
16✔
218
            .iter()
16✔
219
            .all(|bl| bl.header().height < anchor.num_leafs().into());
46✔
220
        let block_heights_known = blocks.iter().all(|bl| bl.header().height <= own_tip_height);
46✔
221
        if !block_heights_match_anchor || !block_heights_known {
16✔
222
            let max_block_height = match blocks.iter().map(|bl| bl.header().height).max() {
×
223
                Some(height) => height.to_string(),
×
224
                None => "None".to_owned(),
×
225
            };
226

227
            debug!("max_block_height: {max_block_height}");
×
228
            debug!("own_tip_height: {own_tip_height}");
×
229
            debug!("anchor.num_leafs(): {}", anchor.num_leafs());
×
230
            debug!("block_heights_match_anchor: {block_heights_match_anchor}");
×
231
            debug!("block_heights_known: {block_heights_known}");
×
232
            return None;
×
233
        }
16✔
234

235
        let mut ret = vec![];
16✔
236
        for block in blocks {
62✔
237
            let mmr_mp = state
46✔
238
                .chain
46✔
239
                .archival_state()
46✔
240
                .archival_block_mmr
46✔
241
                .ammr()
46✔
242
                .prove_membership_relative_to_smaller_mmr(
46✔
243
                    block.header().height.into(),
46✔
244
                    anchor.num_leafs(),
46✔
245
                )
46✔
246
                .await;
46✔
247
            let block: TransferBlock = block.try_into().unwrap();
46✔
248
            ret.push((block, mmr_mp));
46✔
249
        }
250

251
        Some(ret)
16✔
252
    }
16✔
253

254
    /// Handle validation and send all blocks to the main task if they're all
255
    /// valid. Use with a list of blocks or a single block. When the
256
    /// `received_blocks` is a list, the parent of the `i+1`th block in the
257
    /// list is the `i`th block. The parent of element zero in this list is
258
    /// `parent_of_first_block`.
259
    ///
260
    /// # Return Value
261
    ///  - `Err` when the connection should be closed;
262
    ///  - `Ok(None)` if some block is invalid
263
    ///  - `Ok(None)` if the last block has insufficient cumulative PoW and we
264
    ///    are not syncing;
265
    ///  - `Ok(None)` if the last block has insufficient height and we are
266
    ///    syncing;
267
    ///  - `Ok(Some(block_height))` otherwise, referring to the block with the
268
    ///    highest height in the batch.
269
    ///
270
    /// A return value of Ok(Some(_)) means that the message was passed on to
271
    /// main loop.
272
    ///
273
    /// # Locking
274
    ///   * Acquires `global_state_lock` for write via `self.punish(..)` and
275
    ///     `self.reward(..)`.
276
    ///
277
    /// # Panics
278
    ///
279
    ///  - Panics if called with the empty list.
280
    async fn handle_blocks(
8✔
281
        &mut self,
8✔
282
        received_blocks: Vec<Block>,
8✔
283
        parent_of_first_block: Block,
8✔
284
    ) -> Result<Option<BlockHeight>> {
20✔
285
        debug!(
20✔
286
            "attempting to validate {} {}",
×
287
            received_blocks.len(),
×
288
            if received_blocks.len() == 1 {
×
289
                "block"
×
290
            } else {
291
                "blocks"
×
292
            }
293
        );
294
        let now = self.now();
20✔
295
        debug!("validating with respect to current timestamp {now}");
20✔
296
        let mut previous_block = &parent_of_first_block;
20✔
297
        for new_block in &received_blocks {
48✔
298
            let new_block_has_proof_of_work = new_block.has_proof_of_work(
29✔
299
                self.global_state_lock.cli().network,
29✔
300
                previous_block.header(),
29✔
301
            );
302
            debug!("new block has proof of work? {new_block_has_proof_of_work}");
29✔
303
            let new_block_is_valid = new_block
29✔
304
                .is_valid(previous_block, now, self.global_state_lock.cli().network)
29✔
305
                .await;
29✔
306
            debug!("new block is valid? {new_block_is_valid}");
29✔
307
            if !new_block_has_proof_of_work {
29✔
308
                warn!(
1✔
309
                    "Received invalid proof-of-work for block of height {} from peer with IP {}",
×
310
                    new_block.kernel.header.height, self.peer_address
×
311
                );
312
                warn!("Difficulty is {}.", previous_block.kernel.header.difficulty);
1✔
313
                warn!(
1✔
314
                    "Proof of work should be {} (or more) but was [{}].",
×
315
                    previous_block.kernel.header.difficulty.target(),
×
316
                    new_block.hash().values().iter().join(", ")
×
317
                );
318
                self.punish(NegativePeerSanction::InvalidBlock((
1✔
319
                    new_block.kernel.header.height,
1✔
320
                    new_block.hash(),
1✔
321
                )))
1✔
322
                .await?;
1✔
323
                warn!("Failed to validate block due to insufficient PoW");
1✔
324
                return Ok(None);
1✔
325
            } else if !new_block_is_valid {
28✔
326
                warn!(
×
327
                    "Received invalid block of height {} from peer with IP {}",
×
328
                    new_block.kernel.header.height, self.peer_address
×
329
                );
330
                self.punish(NegativePeerSanction::InvalidBlock((
×
331
                    new_block.kernel.header.height,
×
332
                    new_block.hash(),
×
333
                )))
×
334
                .await?;
×
335
                warn!("Failed to validate block: invalid block");
×
336
                return Ok(None);
×
337
            }
28✔
338
            info!(
28✔
339
                "Block with height {} is valid. mined: {}",
×
340
                new_block.kernel.header.height,
×
341
                new_block.kernel.header.timestamp.standard_format()
×
342
            );
343

344
            previous_block = new_block;
28✔
345
        }
346

347
        // evaluate the fork choice rule
348
        debug!("Checking last block's canonicity ...");
19✔
349
        let last_block = received_blocks.last().unwrap();
19✔
350
        let is_canonical = self
19✔
351
            .global_state_lock
19✔
352
            .lock_guard()
19✔
353
            .await
19✔
354
            .incoming_block_is_more_canonical(last_block);
19✔
355
        let last_block_height = last_block.header().height;
19✔
356
        let sync_mode_active_and_have_new_champion = self
19✔
357
            .global_state_lock
19✔
358
            .lock_guard()
19✔
359
            .await
19✔
360
            .net
361
            .sync_anchor
362
            .as_ref()
19✔
363
            .is_some_and(|x| {
19✔
364
                x.champion
×
365
                    .is_none_or(|(height, _)| height < last_block_height)
×
366
            });
×
367
        if !is_canonical && !sync_mode_active_and_have_new_champion {
19✔
368
            warn!(
1✔
369
                "Received {} blocks from peer but incoming blocks are less \
×
370
            canonical than current tip, or current sync-champion.",
×
371
                received_blocks.len()
×
372
            );
373
            return Ok(None);
1✔
374
        }
18✔
375

376
        // Send the new blocks to the main task which handles the state update
377
        // and storage to the database.
378
        let number_of_received_blocks = received_blocks.len();
18✔
379
        self.to_main_tx
18✔
380
            .send(PeerTaskToMain::NewBlocks(received_blocks))
18✔
381
            .await?;
18✔
382
        info!(
18✔
383
            "Updated block info by block from peer. block height {}",
×
384
            last_block_height
385
        );
386

387
        // Valuable, new, hard-to-produce information. Reward peer.
388
        self.reward(PositivePeerSanction::ValidBlocks(number_of_received_blocks))
18✔
389
            .await?;
18✔
390

391
        Ok(Some(last_block_height))
18✔
392
    }
20✔
393

394
    /// Take a single block received from a peer and (attempt to) find a path
395
    /// between the received block and a common ancestor stored in the blocks
396
    /// database.
397
    ///
398
    /// This function attempts to find the parent of the received block, either
399
    /// by searching the database or by requesting it from a peer.
400
    ///  - If the parent is not stored, it is requested from the peer and the
401
    ///    received block is pushed to the fork reconciliation list for later
402
    ///    handling by this function. The fork reconciliation list starts out
403
    ///    empty, but grows as more parents are requested and transmitted.
404
    ///  - If the parent is found in the database, a) block handling continues:
405
    ///    the entire list of fork reconciliation blocks are passed down the
406
    ///    pipeline, potentially leading to a state update; and b) the fork
407
    ///    reconciliation list is cleared.
408
    ///
409
    /// Locking:
410
    ///   * Acquires `global_state_lock` for write via `self.punish(..)` and
411
    ///     `self.reward(..)`.
412
    async fn try_ensure_path<S>(
32✔
413
        &mut self,
32✔
414
        received_block: Box<Block>,
32✔
415
        peer: &mut S,
32✔
416
        peer_state: &mut MutablePeerState,
32✔
417
    ) -> Result<()>
32✔
418
    where
32✔
419
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
32✔
420
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
32✔
421
        <S as TryStream>::Error: std::error::Error,
32✔
422
    {
32✔
423
        // Does the received block match the fork reconciliation list?
424
        let received_block_matches_fork_reconciliation_list = if let Some(successor) =
32✔
425
            peer_state.fork_reconciliation_blocks.last()
32✔
426
        {
427
            let valid = successor
10✔
428
                .is_valid(
10✔
429
                    received_block.as_ref(),
10✔
430
                    self.now(),
10✔
431
                    self.global_state_lock.cli().network,
10✔
432
                )
10✔
433
                .await;
10✔
434
            if !valid {
10✔
435
                warn!(
×
436
                        "Fork reconciliation failed after receiving {} blocks: successor of received block is invalid",
×
437
                        peer_state.fork_reconciliation_blocks.len() + 1
×
438
                    );
439
            }
10✔
440
            valid
10✔
441
        } else {
442
            true
22✔
443
        };
444

445
        // Are we running out of RAM?
446
        let too_many_blocks = peer_state.fork_reconciliation_blocks.len() + 1
32✔
447
            >= self.global_state_lock.cli().sync_mode_threshold;
32✔
448
        if too_many_blocks {
32✔
449
            warn!(
1✔
450
                "Fork reconciliation failed after receiving {} blocks: block count exceeds sync mode threshold",
×
451
                peer_state.fork_reconciliation_blocks.len() + 1
×
452
            );
453
        }
31✔
454

455
        // Block mismatch or too many blocks: abort!
456
        if !received_block_matches_fork_reconciliation_list || too_many_blocks {
32✔
457
            self.punish(NegativePeerSanction::ForkResolutionError((
1✔
458
                received_block.header().height,
1✔
459
                peer_state.fork_reconciliation_blocks.len() as u16,
1✔
460
                received_block.hash(),
1✔
461
            )))
1✔
462
            .await?;
1✔
463
            peer_state.fork_reconciliation_blocks = vec![];
1✔
464
            return Ok(());
1✔
465
        }
31✔
466

467
        // otherwise, append
468
        peer_state.fork_reconciliation_blocks.push(*received_block);
31✔
469

470
        // Try fetch parent
471
        let received_block_header = *peer_state
31✔
472
            .fork_reconciliation_blocks
31✔
473
            .last()
31✔
474
            .unwrap()
31✔
475
            .header();
31✔
476

477
        let parent_digest = received_block_header.prev_block_digest;
31✔
478
        let parent_height = received_block_header.height.previous()
31✔
479
            .expect("transferred block must have previous height because genesis block cannot be transferred");
31✔
480
        debug!("Try ensure path: fetching parent block");
31✔
481
        let parent_block = self
31✔
482
            .global_state_lock
31✔
483
            .lock_guard()
31✔
484
            .await
31✔
485
            .chain
486
            .archival_state()
31✔
487
            .get_block(parent_digest)
31✔
488
            .await?;
31✔
489
        debug!(
31✔
490
            "Completed parent block fetching from DB: {}",
×
491
            if parent_block.is_some() {
×
492
                "found".to_string()
×
493
            } else {
494
                "not found".to_string()
×
495
            }
496
        );
497

498
        // If parent is not known (but not genesis) request it.
499
        let Some(parent_block) = parent_block else {
31✔
500
            if parent_height.is_genesis() {
11✔
501
                peer_state.fork_reconciliation_blocks.clear();
1✔
502
                self.punish(NegativePeerSanction::DifferentGenesis).await?;
1✔
503
                return Ok(());
×
504
            }
10✔
505
            info!(
10✔
506
                "Parent not known: Requesting previous block with height {} from peer",
×
507
                parent_height
508
            );
509

510
            peer.send(PeerMessage::BlockRequestByHash(parent_digest))
10✔
511
                .await?;
10✔
512

513
            return Ok(());
10✔
514
        };
515

516
        // We want to treat the received fork reconciliation blocks (plus the
517
        // received block) in reverse order, from oldest to newest, because
518
        // they were requested from high to low block height.
519
        let mut new_blocks = peer_state.fork_reconciliation_blocks.clone();
20✔
520
        new_blocks.reverse();
20✔
521

522
        // Reset the fork resolution state since we got all the way back to a
523
        // block that we have.
524
        let fork_reconciliation_event = !peer_state.fork_reconciliation_blocks.is_empty();
20✔
525
        peer_state.fork_reconciliation_blocks.clear();
20✔
526

527
        if let Some(new_block_height) = self.handle_blocks(new_blocks, parent_block).await? {
20✔
528
            // If `BlockNotification` was received during a block reconciliation
529
            // event, then the peer might have one (or more (unlikely)) blocks
530
            // that we do not have. We should thus request those blocks.
531
            if fork_reconciliation_event
18✔
532
                && peer_state.highest_shared_block_height > new_block_height
18✔
533
            {
534
                peer.send(PeerMessage::BlockRequestByHeight(
1✔
535
                    peer_state.highest_shared_block_height,
1✔
536
                ))
1✔
537
                .await?;
1✔
538
            }
17✔
539
        }
2✔
540

541
        Ok(())
20✔
542
    }
32✔
543

544
    /// Handle peer messages and returns Ok(true) if connection should be closed.
545
    /// Connection should also be closed if an error is returned.
546
    /// Otherwise, returns OK(false).
547
    ///
548
    /// Locking:
549
    ///   * Acquires `global_state_lock` for read.
550
    ///   * Acquires `global_state_lock` for write via `self.punish(..)` and
551
    ///     `self.reward(..)`.
552
    async fn handle_peer_message<S>(
153✔
553
        &mut self,
153✔
554
        msg: PeerMessage,
153✔
555
        peer: &mut S,
153✔
556
        peer_state_info: &mut MutablePeerState,
153✔
557
    ) -> Result<bool>
153✔
558
    where
153✔
559
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
153✔
560
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
153✔
561
        <S as TryStream>::Error: std::error::Error,
153✔
562
    {
153✔
563
        debug!(
153✔
564
            "Received {} from peer {}",
×
565
            msg.get_type(),
×
566
            self.peer_address
567
        );
568
        match msg {
153✔
569
            PeerMessage::Bye => {
570
                // Note that the current peer is not removed from the global_state.peer_map here
571
                // but that this is done by the caller.
572
                info!("Got bye. Closing connection to peer");
42✔
573
                Ok(DISCONNECT_CONNECTION)
42✔
574
            }
575
            PeerMessage::PeerListRequest => {
576
                let peer_info = {
5✔
577
                    log_slow_scope!(fn_name!() + "::PeerMessage::PeerListRequest");
5✔
578

579
                    // We are interested in the address on which peers accept ingoing connections,
580
                    // not in the address in which they are connected to us. We are only interested in
581
                    // peers that accept incoming connections.
582
                    let mut peer_info: Vec<(SocketAddr, u128)> = self
5✔
583
                        .global_state_lock
5✔
584
                        .lock_guard()
5✔
585
                        .await
5✔
586
                        .net
587
                        .peer_map
588
                        .values()
5✔
589
                        .filter(|peer_info| peer_info.listen_address().is_some())
8✔
590
                        .take(MAX_PEER_LIST_LENGTH) // limit length of response
5✔
591
                        .map(|peer_info| {
8✔
592
                            (
8✔
593
                                // unwrap is safe bc of above `filter`
8✔
594
                                peer_info.listen_address().unwrap(),
8✔
595
                                peer_info.instance_id(),
8✔
596
                            )
8✔
597
                        })
8✔
598
                        .collect();
5✔
599

600
                    // We sort the returned list, so this function is easier to test
601
                    peer_info.sort_by_cached_key(|x| x.0);
5✔
602
                    peer_info
5✔
603
                };
604

605
                debug!("Responding with: {:?}", peer_info);
5✔
606
                peer.send(PeerMessage::PeerListResponse(peer_info)).await?;
5✔
607
                Ok(KEEP_CONNECTION_ALIVE)
5✔
608
            }
609
            PeerMessage::PeerListResponse(peers) => {
3✔
610
                log_slow_scope!(fn_name!() + "::PeerMessage::PeerListResponse");
3✔
611

612
                if peers.len() > MAX_PEER_LIST_LENGTH {
3✔
613
                    self.punish(NegativePeerSanction::FloodPeerListResponse)
×
614
                        .await?;
×
615
                }
3✔
616
                self.to_main_tx
3✔
617
                    .send(PeerTaskToMain::PeerDiscoveryAnswer((
3✔
618
                        peers,
3✔
619
                        self.peer_address,
3✔
620
                        // The distance to the revealed peers is 1 + this peer's distance
3✔
621
                        self.distance + 1,
3✔
622
                    )))
3✔
623
                    .await?;
3✔
624
                Ok(KEEP_CONNECTION_ALIVE)
3✔
625
            }
626
            PeerMessage::BlockNotificationRequest => {
627
                debug!("Got BlockNotificationRequest");
×
628

629
                peer.send(PeerMessage::BlockNotification(
×
630
                    self.global_state_lock
×
631
                        .lock_guard()
×
632
                        .await
×
633
                        .chain
634
                        .light_state()
×
635
                        .into(),
×
636
                ))
637
                .await?;
×
638

639
                Ok(KEEP_CONNECTION_ALIVE)
×
640
            }
641
            PeerMessage::BlockNotification(block_notification) => {
16✔
642
                const SYNC_CHALLENGE_COOLDOWN: Timestamp = Timestamp::minutes(10);
643

644
                let (tip_header, sync_anchor_is_set) = {
16✔
645
                    let state = self.global_state_lock.lock_guard().await;
16✔
646
                    (
16✔
647
                        *state.chain.light_state().header(),
16✔
648
                        state.net.sync_anchor.is_some(),
16✔
649
                    )
16✔
650
                };
651
                debug!(
16✔
652
                    "Got BlockNotification of height {}. Own height is {}",
×
653
                    block_notification.height, tip_header.height
654
                );
655

656
                let sync_mode_threshold = self.global_state_lock.cli().sync_mode_threshold;
16✔
657
                let now = self.now();
16✔
658
                let time_since_latest_successful_challenge = peer_state_info
16✔
659
                    .successful_sync_challenge_response_time
16✔
660
                    .map(|then| now - then);
16✔
661
                let cooldown_expired = time_since_latest_successful_challenge
16✔
662
                    .is_none_or(|time_passed| time_passed > SYNC_CHALLENGE_COOLDOWN);
16✔
663
                let exceeds_sync_mode_threshold = GlobalState::sync_mode_threshold_stateless(
16✔
664
                    &tip_header,
16✔
665
                    block_notification.height,
16✔
666
                    block_notification.cumulative_proof_of_work,
16✔
667
                    sync_mode_threshold,
16✔
668
                );
669
                if cooldown_expired && exceeds_sync_mode_threshold {
16✔
670
                    debug!("sync mode criterion satisfied.");
1✔
671

672
                    if peer_state_info.sync_challenge.is_some() {
1✔
673
                        warn!("Cannot launch new sync challenge because one is already on-going.");
×
674
                        return Ok(KEEP_CONNECTION_ALIVE);
×
675
                    }
1✔
676

677
                    info!(
1✔
678
                        "Peer indicates block which satisfies sync mode criterion, issuing challenge."
×
679
                    );
680
                    let challenge = SyncChallenge::generate(
1✔
681
                        &block_notification,
1✔
682
                        tip_header.height,
1✔
683
                        self.rng.random(),
1✔
684
                    );
685
                    peer_state_info.sync_challenge = Some(IssuedSyncChallenge::new(
1✔
686
                        challenge,
1✔
687
                        block_notification.cumulative_proof_of_work,
1✔
688
                        self.now(),
1✔
689
                    ));
1✔
690

691
                    debug!("sending challenge ...");
1✔
692
                    peer.send(PeerMessage::SyncChallenge(challenge)).await?;
1✔
693

694
                    return Ok(KEEP_CONNECTION_ALIVE);
1✔
695
                }
15✔
696

697
                peer_state_info.highest_shared_block_height = block_notification.height;
15✔
698
                let block_is_new = tip_header.cumulative_proof_of_work
15✔
699
                    < block_notification.cumulative_proof_of_work;
15✔
700

701
                debug!("block_is_new: {}", block_is_new);
15✔
702

703
                if block_is_new
15✔
704
                    && peer_state_info.fork_reconciliation_blocks.is_empty()
15✔
705
                    && !sync_anchor_is_set
14✔
706
                    && !exceeds_sync_mode_threshold
14✔
707
                {
708
                    debug!(
13✔
709
                        "sending BlockRequestByHeight to peer for block with height {}",
×
710
                        block_notification.height
711
                    );
712
                    peer.send(PeerMessage::BlockRequestByHeight(block_notification.height))
13✔
713
                        .await?;
13✔
714
                } else {
715
                    debug!(
2✔
716
                        "ignoring peer block. height {}. new: {}, reconciling_fork: {}",
×
717
                        block_notification.height,
718
                        block_is_new,
719
                        !peer_state_info.fork_reconciliation_blocks.is_empty()
×
720
                    );
721
                }
722

723
                Ok(KEEP_CONNECTION_ALIVE)
15✔
724
            }
725
            PeerMessage::SyncChallenge(sync_challenge) => {
2✔
726
                let response = {
×
727
                    log_slow_scope!(fn_name!() + "::PeerMessage::SyncChallenge");
2✔
728

729
                    info!("Got sync challenge from {}", self.peer_address.ip());
2✔
730

731
                    let response = self
2✔
732
                        .global_state_lock
2✔
733
                        .lock_guard()
2✔
734
                        .await
2✔
735
                        .response_to_sync_challenge(sync_challenge)
2✔
736
                        .await;
2✔
737

738
                    match response {
2✔
739
                        Ok(resp) => resp,
×
740
                        Err(e) => {
2✔
741
                            warn!("could not generate sync challenge response:\n{e}");
2✔
742
                            self.punish(NegativePeerSanction::InvalidSyncChallenge)
2✔
743
                                .await?;
2✔
744
                            return Ok(KEEP_CONNECTION_ALIVE);
2✔
745
                        }
746
                    }
747
                };
748

749
                info!(
×
750
                    "Responding to sync challenge from {}",
×
751
                    self.peer_address.ip()
×
752
                );
753
                peer.send(PeerMessage::SyncChallengeResponse(Box::new(response)))
×
754
                    .await?;
×
755

756
                Ok(KEEP_CONNECTION_ALIVE)
×
757
            }
758
            PeerMessage::SyncChallengeResponse(challenge_response) => {
1✔
759
                const SYNC_RESPONSE_TIMEOUT: Timestamp = Timestamp::seconds(45);
760

761
                log_slow_scope!(fn_name!() + "::PeerMessage::SyncChallengeResponse");
1✔
762
                info!(
1✔
763
                    "Got sync challenge response from {}",
×
764
                    self.peer_address.ip()
×
765
                );
766

767
                // The purpose of the sync challenge and sync challenge response
768
                // is to avoid going into sync mode based on a malicious target
769
                // fork. Instead of verifying that the claimed proof-of-work
770
                // number is correct (which would require sending and verifying,
771
                // at least, all blocks between luca (whatever that is) and the
772
                // claimed tip), we use a heuristic that requires less
773
                // communication and less verification work. The downside of
774
                // using a heuristic here is a nonzero false positive and false
775
                // negative rate. Note that the false negative event
776
                // (maliciously sending someone into sync mode based on a bogus
777
                // fork) still requires a significant amount of work from the
778
                // attacker, *in addition* to being lucky. Also, down the line
779
                // succinctness (and more specifically, recursive block
780
                // validation) obviates this entire subprotocol.
781

782
                // Did we issue a challenge?
783
                let Some(issued_challenge) = peer_state_info.sync_challenge else {
1✔
784
                    warn!("Sync challenge response was not prompted.");
×
785
                    self.punish(NegativePeerSanction::UnexpectedSyncChallengeResponse)
×
786
                        .await?;
×
787
                    return Ok(KEEP_CONNECTION_ALIVE);
×
788
                };
789

790
                // Reset the challenge, regardless of the response's success.
791
                peer_state_info.sync_challenge = None;
1✔
792

793
                // Does response match issued challenge?
794
                if !challenge_response
1✔
795
                    .matches(self.global_state_lock.cli().network, issued_challenge)
1✔
796
                {
797
                    self.punish(NegativePeerSanction::InvalidSyncChallengeResponse)
×
798
                        .await?;
×
799
                    return Ok(KEEP_CONNECTION_ALIVE);
×
800
                }
1✔
801

802
                // Does response verify?
803
                let claimed_tip_height = challenge_response.tip.header.height;
1✔
804
                let now = self.now();
1✔
805
                if !challenge_response
1✔
806
                    .is_valid(now, self.global_state_lock.cli().network)
1✔
807
                    .await
1✔
808
                {
809
                    self.punish(NegativePeerSanction::InvalidSyncChallengeResponse)
×
810
                        .await?;
×
811
                    return Ok(KEEP_CONNECTION_ALIVE);
×
812
                }
1✔
813

814
                // Does cumulative proof-of-work evolve reasonably?
815
                let own_tip_header = *self
1✔
816
                    .global_state_lock
1✔
817
                    .lock_guard()
1✔
818
                    .await
1✔
819
                    .chain
820
                    .light_state()
1✔
821
                    .header();
1✔
822
                if !challenge_response
1✔
823
                    .check_pow(self.global_state_lock.cli().network, own_tip_header.height)
1✔
824
                {
825
                    self.punish(NegativePeerSanction::FishyPowEvolutionChallengeResponse)
×
826
                        .await?;
×
827
                    return Ok(KEEP_CONNECTION_ALIVE);
×
828
                }
1✔
829

830
                // Is there some specific (*i.e.*, not aggregate) proof of work?
831
                if !challenge_response.check_difficulty(own_tip_header.difficulty) {
1✔
832
                    self.punish(NegativePeerSanction::FishyDifficultiesChallengeResponse)
×
833
                        .await?;
×
834
                    return Ok(KEEP_CONNECTION_ALIVE);
×
835
                }
1✔
836

837
                // Did it come in time?
838
                if now - issued_challenge.issued_at > SYNC_RESPONSE_TIMEOUT {
1✔
839
                    self.punish(NegativePeerSanction::TimedOutSyncChallengeResponse)
×
840
                        .await?;
×
841
                    return Ok(KEEP_CONNECTION_ALIVE);
×
842
                }
1✔
843

844
                info!("Successful sync challenge response; relaying peer tip info to main loop.");
1✔
845
                peer_state_info.successful_sync_challenge_response_time = Some(now);
1✔
846

847
                let mut sync_mmra_anchor = challenge_response.tip.body.block_mmr_accumulator;
1✔
848
                sync_mmra_anchor.append(issued_challenge.challenge.tip_digest);
1✔
849

850
                // Inform main loop
851
                self.to_main_tx
1✔
852
                    .send(PeerTaskToMain::AddPeerMaxBlockHeight {
1✔
853
                        peer_address: self.peer_address,
1✔
854
                        claimed_height: claimed_tip_height,
1✔
855
                        claimed_cumulative_pow: issued_challenge.accumulated_pow,
1✔
856
                        claimed_block_mmra: sync_mmra_anchor,
1✔
857
                    })
1✔
858
                    .await?;
1✔
859

860
                Ok(KEEP_CONNECTION_ALIVE)
1✔
861
            }
862
            PeerMessage::BlockRequestByHash(block_digest) => {
×
863
                let block = self
×
864
                    .global_state_lock
×
865
                    .lock_guard()
×
866
                    .await
×
867
                    .chain
868
                    .archival_state()
×
869
                    .get_block(block_digest)
×
870
                    .await?;
×
871

872
                match block {
×
873
                    None => {
874
                        // TODO: Consider punishing here
875
                        warn!("Peer requested unknown block with hash {}", block_digest);
×
876
                        Ok(KEEP_CONNECTION_ALIVE)
×
877
                    }
878
                    Some(b) => {
×
879
                        peer.send(PeerMessage::Block(Box::new(b.try_into().unwrap())))
×
880
                            .await?;
×
881
                        Ok(KEEP_CONNECTION_ALIVE)
×
882
                    }
883
                }
884
            }
885
            PeerMessage::BlockRequestByHeight(block_height) => {
16✔
886
                let block_response = {
15✔
887
                    log_slow_scope!(fn_name!() + "::PeerMessage::BlockRequestByHeight");
16✔
888

889
                    debug!("Got BlockRequestByHeight of height {}", block_height);
16✔
890

891
                    let canonical_block_digest = self
16✔
892
                        .global_state_lock
16✔
893
                        .lock_guard()
16✔
894
                        .await
16✔
895
                        .chain
896
                        .archival_state()
16✔
897
                        .archival_block_mmr
898
                        .ammr()
16✔
899
                        .try_get_leaf(block_height.into())
16✔
900
                        .await;
16✔
901

902
                    let Some(canonical_block_digest) = canonical_block_digest else {
16✔
903
                        let own_tip_height = self
1✔
904
                            .global_state_lock
1✔
905
                            .lock_guard()
1✔
906
                            .await
1✔
907
                            .chain
908
                            .light_state()
1✔
909
                            .header()
1✔
910
                            .height;
911
                        warn!("Got block request by height ({block_height}) for unknown block. Own tip height is {own_tip_height}.");
1✔
912
                        self.punish(NegativePeerSanction::BlockRequestUnknownHeight)
1✔
913
                            .await?;
1✔
914

915
                        return Ok(KEEP_CONNECTION_ALIVE);
1✔
916
                    };
917

918
                    let canonical_chain_block: Block = self
15✔
919
                        .global_state_lock
15✔
920
                        .lock_guard()
15✔
921
                        .await
15✔
922
                        .chain
923
                        .archival_state()
15✔
924
                        .get_block(canonical_block_digest)
15✔
925
                        .await?
15✔
926
                        .unwrap();
15✔
927

928
                    PeerMessage::Block(Box::new(canonical_chain_block.try_into().unwrap()))
15✔
929
                };
930

931
                debug!("Sending block");
15✔
932
                peer.send(block_response).await?;
15✔
933
                debug!("Sent block");
15✔
934
                Ok(KEEP_CONNECTION_ALIVE)
15✔
935
            }
936
            PeerMessage::Block(t_block) => {
32✔
937
                log_slow_scope!(fn_name!() + "::PeerMessage::Block");
32✔
938

939
                info!(
32✔
940
                    "Got new block from peer {}, height {}, mined {}",
×
941
                    self.peer_address,
942
                    t_block.header.height,
943
                    t_block.header.timestamp.standard_format()
×
944
                );
945
                let new_block_height = t_block.header.height;
32✔
946

947
                let block = match Block::try_from(*t_block) {
32✔
948
                    Ok(block) => Box::new(block),
32✔
949
                    Err(e) => {
×
950
                        warn!("Peer sent invalid block: {e:?}");
×
951
                        self.punish(NegativePeerSanction::InvalidTransferBlock)
×
952
                            .await?;
×
953

954
                        return Ok(KEEP_CONNECTION_ALIVE);
×
955
                    }
956
                };
957

958
                // Update the value for the highest known height that peer possesses iff
959
                // we are not in a fork reconciliation state.
960
                if peer_state_info.fork_reconciliation_blocks.is_empty() {
32✔
961
                    peer_state_info.highest_shared_block_height = new_block_height;
22✔
962
                }
22✔
963

964
                self.try_ensure_path(block, peer, peer_state_info).await?;
32✔
965

966
                // Reward happens as part of `try_ensure_path`
967

968
                Ok(KEEP_CONNECTION_ALIVE)
31✔
969
            }
970
            PeerMessage::BlockRequestBatch(BlockRequestBatch {
971
                known_blocks,
8✔
972
                max_response_len,
8✔
973
                anchor,
8✔
974
            }) => {
975
                debug!(
8✔
976
                    "Received BlockRequestBatch from peer {}, max_response_len: {max_response_len}",
×
977
                    self.peer_address
978
                );
979

980
                if known_blocks.len() > MAX_NUM_DIGESTS_IN_BATCH_REQUEST {
8✔
981
                    self.punish(NegativePeerSanction::BatchBlocksRequestTooManyDigests)
×
982
                        .await?;
×
983

984
                    return Ok(KEEP_CONNECTION_ALIVE);
×
985
                }
8✔
986

987
                // The last block in the list of the peers known block is the
988
                // earliest block, block with lowest height, the peer has
989
                // requested. If it does not belong to canonical chain, none of
990
                // the later will. So we can do an early abort in that case.
991
                let least_preferred = match known_blocks.last() {
8✔
992
                    Some(least_preferred) => *least_preferred,
8✔
993
                    None => {
994
                        self.punish(NegativePeerSanction::BatchBlocksRequestEmpty)
×
995
                            .await?;
×
996

997
                        return Ok(KEEP_CONNECTION_ALIVE);
×
998
                    }
999
                };
1000

1001
                let state = self.global_state_lock.lock_guard().await;
8✔
1002
                let block_mmr_num_leafs = state.chain.light_state().header().height.next().into();
8✔
1003
                let luca_is_known = state
8✔
1004
                    .chain
8✔
1005
                    .archival_state()
8✔
1006
                    .block_belongs_to_canonical_chain(least_preferred)
8✔
1007
                    .await;
8✔
1008
                if !luca_is_known || anchor.num_leafs() > block_mmr_num_leafs {
8✔
1009
                    drop(state);
×
1010
                    self.punish(NegativePeerSanction::BatchBlocksUnknownRequest)
×
1011
                        .await?;
×
1012
                    peer.send(PeerMessage::UnableToSatisfyBatchRequest).await?;
×
1013

1014
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1015
                }
8✔
1016

1017
                // Happy case: At least *one* of the blocks referenced by peer
1018
                // is known to us.
1019
                let first_block_in_response = {
8✔
1020
                    let mut first_block_in_response: Option<BlockHeight> = None;
8✔
1021
                    for block_digest in known_blocks {
10✔
1022
                        if state
10✔
1023
                            .chain
10✔
1024
                            .archival_state()
10✔
1025
                            .block_belongs_to_canonical_chain(block_digest)
10✔
1026
                            .await
10✔
1027
                        {
1028
                            let height = state
8✔
1029
                                .chain
8✔
1030
                                .archival_state()
8✔
1031
                                .get_block_header(block_digest)
8✔
1032
                                .await
8✔
1033
                                .unwrap()
8✔
1034
                                .height;
1035
                            first_block_in_response = Some(height);
8✔
1036
                            debug!(
8✔
1037
                                "Found block in canonical chain for batch response: {}",
×
1038
                                block_digest
1039
                            );
1040
                            break;
8✔
1041
                        }
2✔
1042
                    }
1043

1044
                    first_block_in_response
8✔
1045
                        .expect("existence of LUCA should have been established already.")
8✔
1046
                };
1047

1048
                debug!(
8✔
1049
                    "Peer's most preferred block has height {first_block_in_response}.\
×
1050
                 Now building response from that height."
×
1051
                );
1052

1053
                // Get the relevant blocks, at most batch-size many, descending from the
1054
                // peer's (alleged) most canonical block. Don't exceed `max_response_len`
1055
                // or `STANDARD_BLOCK_BATCH_SIZE` number of blocks in response.
1056
                let max_response_len = cmp::min(
8✔
1057
                    max_response_len,
8✔
1058
                    self.global_state_lock.cli().sync_mode_threshold,
8✔
1059
                );
1060
                let max_response_len = cmp::max(max_response_len, MINIMUM_BLOCK_BATCH_SIZE);
8✔
1061
                let max_response_len = cmp::min(max_response_len, STANDARD_BLOCK_BATCH_SIZE);
8✔
1062

1063
                let mut digests_of_returned_blocks = Vec::with_capacity(max_response_len);
8✔
1064
                let response_start_height: u64 = first_block_in_response.into();
8✔
1065
                let mut i: u64 = 1;
8✔
1066
                while digests_of_returned_blocks.len() < max_response_len {
31✔
1067
                    let block_height = response_start_height + i;
31✔
1068
                    match state
31✔
1069
                        .chain
31✔
1070
                        .archival_state()
31✔
1071
                        .archival_block_mmr
31✔
1072
                        .ammr()
31✔
1073
                        .try_get_leaf(block_height)
31✔
1074
                        .await
31✔
1075
                    {
1076
                        Some(digest) => {
23✔
1077
                            digests_of_returned_blocks.push(digest);
23✔
1078
                        }
23✔
1079
                        None => break,
8✔
1080
                    }
1081
                    i += 1;
23✔
1082
                }
1083

1084
                let mut returned_blocks: Vec<Block> =
8✔
1085
                    Vec::with_capacity(digests_of_returned_blocks.len());
8✔
1086
                for block_digest in digests_of_returned_blocks {
31✔
1087
                    let block = state
23✔
1088
                        .chain
23✔
1089
                        .archival_state()
23✔
1090
                        .get_block(block_digest)
23✔
1091
                        .await?
23✔
1092
                        .unwrap();
23✔
1093
                    returned_blocks.push(block);
23✔
1094
                }
1095

1096
                let response = Self::batch_response(&state, returned_blocks, &anchor).await;
8✔
1097

1098
                // issue 457. do not hold lock across a peer.send(), nor self.punish()
1099
                drop(state);
8✔
1100

1101
                let Some(response) = response else {
8✔
1102
                    warn!("Unable to satisfy batch-block request");
×
1103
                    self.punish(NegativePeerSanction::BatchBlocksUnknownRequest)
×
1104
                        .await?;
×
1105
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1106
                };
1107

1108
                debug!("Returning {} blocks in batch response", response.len());
8✔
1109

1110
                let response = PeerMessage::BlockResponseBatch(response);
8✔
1111
                peer.send(response).await?;
8✔
1112

1113
                Ok(KEEP_CONNECTION_ALIVE)
8✔
1114
            }
1115
            PeerMessage::BlockResponseBatch(authenticated_blocks) => {
×
1116
                log_slow_scope!(fn_name!() + "::PeerMessage::BlockResponseBatch");
×
1117

1118
                debug!(
×
1119
                    "handling block response batch with {} blocks",
×
1120
                    authenticated_blocks.len()
×
1121
                );
1122

1123
                // (Alan:) why is there even a minimum?
1124
                if authenticated_blocks.len() < MINIMUM_BLOCK_BATCH_SIZE {
×
1125
                    warn!("Got smaller batch response than allowed");
×
1126
                    self.punish(NegativePeerSanction::TooShortBlockBatch)
×
1127
                        .await?;
×
1128
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1129
                }
×
1130

1131
                // Verify that we are in fact in syncing mode
1132
                // TODO: Separate peer messages into those allowed under syncing
1133
                // and those that are not
1134
                let Some(sync_anchor) = self
×
1135
                    .global_state_lock
×
1136
                    .lock_guard()
×
1137
                    .await
×
1138
                    .net
1139
                    .sync_anchor
1140
                    .clone()
×
1141
                else {
1142
                    warn!("Received a batch of blocks without being in syncing mode");
×
1143
                    self.punish(NegativePeerSanction::ReceivedBatchBlocksOutsideOfSync)
×
1144
                        .await?;
×
1145
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1146
                };
1147

1148
                // Verify that the response matches the current state
1149
                // We get the latest block from the DB here since this message is
1150
                // only valid for archival nodes.
1151
                let (first_block, _) = &authenticated_blocks[0];
×
1152
                let first_blocks_parent_digest: Digest = first_block.header.prev_block_digest;
×
1153
                let most_canonical_own_block_match: Option<Block> = self
×
1154
                    .global_state_lock
×
1155
                    .lock_guard()
×
1156
                    .await
×
1157
                    .chain
1158
                    .archival_state()
×
1159
                    .get_block(first_blocks_parent_digest)
×
1160
                    .await
×
1161
                    .expect("Block lookup must succeed");
×
1162
                let most_canonical_own_block_match: Block = match most_canonical_own_block_match {
×
1163
                    Some(block) => block,
×
1164
                    None => {
1165
                        warn!("Got batch response with invalid start block");
×
1166
                        self.punish(NegativePeerSanction::BatchBlocksInvalidStartHeight)
×
1167
                            .await?;
×
1168
                        return Ok(KEEP_CONNECTION_ALIVE);
×
1169
                    }
1170
                };
1171

1172
                // Convert all blocks to Block objects
1173
                debug!(
×
1174
                    "Found own block of height {} to match received batch",
×
1175
                    most_canonical_own_block_match.kernel.header.height
×
1176
                );
1177
                let mut received_blocks = vec![];
×
1178
                for (t_block, membership_proof) in authenticated_blocks {
×
1179
                    let Ok(block) = Block::try_from(t_block) else {
×
1180
                        warn!("Received invalid transfer block from peer");
×
1181
                        self.punish(NegativePeerSanction::InvalidTransferBlock)
×
1182
                            .await?;
×
1183
                        return Ok(KEEP_CONNECTION_ALIVE);
×
1184
                    };
1185

1186
                    if !membership_proof.verify(
×
1187
                        block.header().height.into(),
×
1188
                        block.hash(),
×
1189
                        &sync_anchor.block_mmr.peaks(),
×
1190
                        sync_anchor.block_mmr.num_leafs(),
×
1191
                    ) {
×
1192
                        warn!("Authentication of received block fails relative to anchor");
×
1193
                        self.punish(NegativePeerSanction::InvalidBlockMmrAuthentication)
×
1194
                            .await?;
×
1195
                        return Ok(KEEP_CONNECTION_ALIVE);
×
1196
                    }
×
1197

1198
                    received_blocks.push(block);
×
1199
                }
1200

1201
                // Get the latest block that we know of and handle all received blocks
1202
                self.handle_blocks(received_blocks, most_canonical_own_block_match)
×
1203
                    .await?;
×
1204

1205
                // Reward happens as part of `handle_blocks`.
1206

1207
                Ok(KEEP_CONNECTION_ALIVE)
×
1208
            }
1209
            PeerMessage::UnableToSatisfyBatchRequest => {
1210
                log_slow_scope!(fn_name!() + "::PeerMessage::UnableToSatisfyBatchRequest");
×
1211
                warn!(
×
1212
                    "Peer {} reports inability to satisfy batch request.",
×
1213
                    self.peer_address
1214
                );
1215

1216
                Ok(KEEP_CONNECTION_ALIVE)
×
1217
            }
1218
            PeerMessage::Handshake(_) => {
1219
                log_slow_scope!(fn_name!() + "::PeerMessage::Handshake");
×
1220

1221
                // The handshake should have been sent during connection
1222
                // initialization. Here it is out of order at best, malicious at
1223
                // worst.
1224
                self.punish(NegativePeerSanction::InvalidMessage).await?;
×
1225
                Ok(KEEP_CONNECTION_ALIVE)
×
1226
            }
1227
            PeerMessage::ConnectionStatus(_) => {
1228
                log_slow_scope!(fn_name!() + "::PeerMessage::ConnectionStatus");
×
1229

1230
                // The connection status should have been sent during connection
1231
                // initialization. Here it is out of order at best, malicious at
1232
                // worst.
1233

1234
                self.punish(NegativePeerSanction::InvalidMessage).await?;
×
1235
                Ok(KEEP_CONNECTION_ALIVE)
×
1236
            }
1237
            PeerMessage::Transaction(transaction) => {
7✔
1238
                log_slow_scope!(fn_name!() + "::PeerMessage::Transaction");
7✔
1239

1240
                debug!(
7✔
1241
                    "`peer_loop` received following transaction from peer. {} inputs, {} outputs. Synced to mutator set hash: {}",
×
1242
                    transaction.kernel.inputs.len(),
×
1243
                    transaction.kernel.outputs.len(),
×
1244
                    transaction.kernel.mutator_set_hash
×
1245
                );
1246

1247
                let transaction: Transaction = (*transaction).into();
7✔
1248

1249
                // 1. If transaction is invalid, punish.
1250
                if !transaction
7✔
1251
                    .is_valid(self.global_state_lock.cli().network)
7✔
1252
                    .await
7✔
1253
                {
1254
                    warn!("Received invalid tx");
×
1255
                    self.punish(NegativePeerSanction::InvalidTransaction)
×
1256
                        .await?;
×
1257
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1258
                }
7✔
1259

1260
                // 2. If transaction has coinbase, punish.
1261
                // Transactions received from peers have not been mined yet.
1262
                // Only the miner is allowed to produce transactions with non-empty coinbase fields.
1263
                if transaction.kernel.coinbase.is_some() {
7✔
1264
                    warn!("Received non-mined transaction with coinbase.");
×
1265
                    self.punish(NegativePeerSanction::NonMinedTransactionHasCoinbase)
×
1266
                        .await?;
×
1267
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1268
                }
7✔
1269

1270
                // 3. If negative fee, punish.
1271
                if transaction.kernel.fee.is_negative() {
7✔
1272
                    warn!("Received negative-fee transaction.");
×
1273
                    self.punish(NegativePeerSanction::TransactionWithNegativeFee)
×
1274
                        .await?;
×
1275
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1276
                }
7✔
1277

1278
                // 4. If transaction is already known, ignore.
1279
                if self
7✔
1280
                    .global_state_lock
7✔
1281
                    .lock_guard()
7✔
1282
                    .await
7✔
1283
                    .mempool
1284
                    .contains_with_higher_proof_quality(
7✔
1285
                        transaction.kernel.txid(),
7✔
1286
                        transaction.proof.proof_quality()?,
7✔
1287
                        transaction.kernel.mutator_set_hash,
7✔
1288
                    )
1289
                {
1290
                    warn!("Received transaction that was already known");
×
1291

1292
                    // We received a transaction that we *probably* haven't requested.
1293
                    // Consider punishing here, if this is abused.
1294
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1295
                }
7✔
1296

1297
                // 5. if transaction is not confirmable, punish.
1298
                let (tip, mutator_set_accumulator_after) = {
7✔
1299
                    let state = self.global_state_lock.lock_guard().await;
7✔
1300

1301
                    (
7✔
1302
                        state.chain.light_state().hash(),
7✔
1303
                        state
7✔
1304
                            .chain
7✔
1305
                            .light_state()
7✔
1306
                            .mutator_set_accumulator_after()
7✔
1307
                            .expect("Block from state must have mutator set after"),
7✔
1308
                    )
7✔
1309
                };
1310
                if !transaction.is_confirmable_relative_to(&mutator_set_accumulator_after) {
7✔
1311
                    warn!(
×
1312
                        "Received unconfirmable transaction with TXID {}. Unconfirmable because:",
×
1313
                        transaction.kernel.txid()
×
1314
                    );
1315
                    // get fine-grained error code for informative logging
1316
                    let confirmability_error_code = transaction
×
1317
                        .kernel
×
1318
                        .is_confirmable_relative_to(&mutator_set_accumulator_after);
×
1319
                    match confirmability_error_code {
×
1320
                        Ok(_) => unreachable!(),
×
1321
                        Err(TransactionConfirmabilityError::InvalidRemovalRecord(index)) => {
×
1322
                            warn!("invalid removal record (at index {index})");
×
1323
                            let invalid_removal_record = transaction.kernel.inputs[index].clone();
×
1324
                            let removal_record_error_code = invalid_removal_record
×
1325
                                .validate_inner(&mutator_set_accumulator_after);
×
1326
                            debug!(
×
1327
                                "Absolute index set of removal record {index}: {:?}",
×
1328
                                invalid_removal_record.absolute_indices
1329
                            );
1330
                            match removal_record_error_code {
×
1331
                                Ok(_) => unreachable!(),
×
1332
                                Err(RemovalRecordValidityError::AbsentAuthenticatedChunk) => {
1333
                                    debug!("invalid because membership proof is missing");
×
1334
                                }
1335
                                Err(RemovalRecordValidityError::InvalidSwbfiMmrMp {
1336
                                    chunk_index,
×
1337
                                }) => {
1338
                                    debug!("invalid because membership proof for chunk index {chunk_index} is invalid");
×
1339
                                }
1340
                            };
1341
                            self.punish(NegativePeerSanction::UnconfirmableTransaction)
×
1342
                                .await?;
×
1343
                            return Ok(KEEP_CONNECTION_ALIVE);
×
1344
                        }
1345
                        Err(TransactionConfirmabilityError::DuplicateInputs) => {
1346
                            warn!("duplicate inputs");
×
1347
                            self.punish(NegativePeerSanction::DoubleSpendingTransaction)
×
1348
                                .await?;
×
1349
                            return Ok(KEEP_CONNECTION_ALIVE);
×
1350
                        }
1351
                        Err(TransactionConfirmabilityError::AlreadySpentInput(index)) => {
×
1352
                            warn!("already spent input (at index {index})");
×
1353
                            self.punish(NegativePeerSanction::DoubleSpendingTransaction)
×
1354
                                .await?;
×
1355
                            return Ok(KEEP_CONNECTION_ALIVE);
×
1356
                        }
1357
                    };
1358
                }
7✔
1359

1360
                // If transaction cannot be applied to mutator set, punish.
1361
                // I don't think this can happen when above checks pass but we include
1362
                // the check to ensure that transaction can be applied.
1363
                let ms_update = MutatorSetUpdate::new(
7✔
1364
                    transaction.kernel.inputs.clone(),
7✔
1365
                    transaction.kernel.outputs.clone(),
7✔
1366
                );
1367
                let can_apply = ms_update
7✔
1368
                    .apply_to_accumulator(&mut mutator_set_accumulator_after.clone())
7✔
1369
                    .is_ok();
7✔
1370
                if !can_apply {
7✔
1371
                    warn!("Cannot apply transaction to current mutator set.");
×
1372
                    warn!("Transaction ID: {}", transaction.kernel.txid());
×
1373
                    self.punish(NegativePeerSanction::CannotApplyTransactionToMutatorSet)
×
1374
                        .await?;
×
1375
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1376
                }
7✔
1377

1378
                let tx_timestamp = transaction.kernel.timestamp;
7✔
1379

1380
                // 6. Ignore if transaction is too old
1381
                let now = self.now();
7✔
1382
                if tx_timestamp < now - Timestamp::seconds(MEMPOOL_TX_THRESHOLD_AGE_IN_SECS) {
7✔
1383
                    // TODO: Consider punishing here
1384
                    warn!("Received too old tx");
×
1385
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1386
                }
7✔
1387

1388
                // 7. Ignore if transaction is too far into the future
1389
                if tx_timestamp
7✔
1390
                    > now + Timestamp::seconds(MEMPOOL_IGNORE_TRANSACTIONS_THIS_MANY_SECS_AHEAD)
7✔
1391
                {
1392
                    // TODO: Consider punishing here
1393
                    warn!("Received tx too far into the future. Got timestamp: {tx_timestamp:?}");
×
1394
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1395
                }
7✔
1396

1397
                // Otherwise, relay to main
1398
                let pt2m_transaction = PeerTaskToMainTransaction {
7✔
1399
                    transaction,
7✔
1400
                    confirmable_for_block: tip,
7✔
1401
                };
7✔
1402
                self.to_main_tx
7✔
1403
                    .send(PeerTaskToMain::Transaction(Box::new(pt2m_transaction)))
7✔
1404
                    .await?;
7✔
1405

1406
                Ok(KEEP_CONNECTION_ALIVE)
7✔
1407
            }
1408
            PeerMessage::TransactionNotification(tx_notification) => {
14✔
1409
                // addresses #457
1410
                // new scope for state read-lock to avoid holding across peer.send()
1411
                {
1412
                    log_slow_scope!(fn_name!() + "::PeerMessage::TransactionNotification");
14✔
1413

1414
                    // 1. Ignore if we already know this transaction, and
1415
                    // the proof quality is not higher than what we already know.
1416
                    let state = self.global_state_lock.lock_guard().await;
14✔
1417
                    let transaction_of_same_or_higher_proof_quality_is_known =
14✔
1418
                        state.mempool.contains_with_higher_proof_quality(
14✔
1419
                            tx_notification.txid,
14✔
1420
                            tx_notification.proof_quality,
14✔
1421
                            tx_notification.mutator_set_hash,
14✔
1422
                        );
1423
                    if transaction_of_same_or_higher_proof_quality_is_known {
14✔
1424
                        debug!("transaction with same or higher proof quality was already known");
7✔
1425
                        return Ok(KEEP_CONNECTION_ALIVE);
7✔
1426
                    }
7✔
1427

1428
                    // Only accept transactions that do not require executing
1429
                    // `update`.
1430
                    if state
7✔
1431
                        .chain
7✔
1432
                        .light_state()
7✔
1433
                        .mutator_set_accumulator_after()
7✔
1434
                        .expect("Block from state must have mutator set after")
7✔
1435
                        .hash()
7✔
1436
                        != tx_notification.mutator_set_hash
7✔
1437
                    {
1438
                        debug!("transaction refers to non-canonical mutator set state");
×
1439
                        return Ok(KEEP_CONNECTION_ALIVE);
×
1440
                    }
7✔
1441
                }
1442

1443
                // 2. Request the actual `Transaction` from peer
1444
                debug!("requesting transaction from peer");
7✔
1445
                peer.send(PeerMessage::TransactionRequest(tx_notification.txid))
7✔
1446
                    .await?;
7✔
1447

1448
                Ok(KEEP_CONNECTION_ALIVE)
7✔
1449
            }
1450
            PeerMessage::TransactionRequest(transaction_identifier) => {
5✔
1451
                let state = self.global_state_lock.lock_guard().await;
5✔
1452
                let Some(transaction) = state.mempool.get(transaction_identifier) else {
5✔
1453
                    return Ok(KEEP_CONNECTION_ALIVE);
1✔
1454
                };
1455

1456
                let Ok(transfer_transaction) = transaction.try_into() else {
4✔
1457
                    warn!("Peer requested transaction that cannot be converted to transfer object");
×
1458
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1459
                };
1460

1461
                // Drop state immediately to prevent holding over a response.
1462
                drop(state);
4✔
1463

1464
                peer.send(PeerMessage::Transaction(Box::new(transfer_transaction)))
4✔
1465
                    .await?;
4✔
1466

1467
                Ok(KEEP_CONNECTION_ALIVE)
4✔
1468
            }
1469
            PeerMessage::BlockProposalNotification(block_proposal_notification) => {
1✔
1470
                let verdict = self
1✔
1471
                    .global_state_lock
1✔
1472
                    .lock_guard()
1✔
1473
                    .await
1✔
1474
                    .favor_incoming_block_proposal_legacy(
1✔
1475
                        block_proposal_notification.height,
1✔
1476
                        block_proposal_notification.guesser_fee,
1✔
1477
                    );
1478
                match verdict {
1✔
1479
                    Ok(_) => {
1480
                        peer.send(PeerMessage::BlockProposalRequest(
1✔
1481
                            BlockProposalRequest::new(block_proposal_notification.body_mast_hash),
1✔
1482
                        ))
1✔
1483
                        .await?
1✔
1484
                    }
1485
                    Err(reject_reason) => {
×
1486
                        info!(
×
1487
                        "Rejecting notification of block proposal with guesser fee {} from peer \
×
1488
                        {}. Reason:\n{reject_reason}",
×
1489
                        block_proposal_notification.guesser_fee.display_n_decimals(5),
×
1490
                        self.peer_address
1491
                    )
1492
                    }
1493
                }
1494

1495
                Ok(KEEP_CONNECTION_ALIVE)
1✔
1496
            }
1497
            PeerMessage::BlockProposalRequest(block_proposal_request) => {
×
1498
                let matching_proposal = self
×
1499
                    .global_state_lock
×
1500
                    .lock_guard()
×
1501
                    .await
×
1502
                    .mining_state
1503
                    .block_proposal
1504
                    .filter(|x| x.body().mast_hash() == block_proposal_request.body_mast_hash)
×
1505
                    .map(|x| x.to_owned());
×
1506
                if let Some(proposal) = matching_proposal {
×
1507
                    peer.send(PeerMessage::BlockProposal(Box::new(proposal)))
×
1508
                        .await?;
×
1509
                } else {
1510
                    self.punish(NegativePeerSanction::BlockProposalNotFound)
×
1511
                        .await?;
×
1512
                }
1513

1514
                Ok(KEEP_CONNECTION_ALIVE)
×
1515
            }
1516
            PeerMessage::BlockProposal(new_proposal) => {
1✔
1517
                info!("Got block proposal from peer.");
1✔
1518

1519
                // Is the proposal valid?
1520
                // Lock needs to be held here because race conditions: otherwise
1521
                // the block proposal that was validated might not match with
1522
                // the one whose favorability is being computed.
1523
                let state = self.global_state_lock.lock_guard().await;
1✔
1524
                let tip = state.chain.light_state();
1✔
1525
                let proposal_is_valid = new_proposal
1✔
1526
                    .is_valid(tip, self.now(), self.global_state_lock.cli().network)
1✔
1527
                    .await;
1✔
1528
                if !proposal_is_valid {
1✔
1529
                    drop(state);
×
1530
                    self.punish(NegativePeerSanction::InvalidBlockProposal)
×
1531
                        .await?;
×
1532
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1533
                }
1✔
1534

1535
                // Is block proposal favorable?
1536
                let is_favorable = state.favor_incoming_block_proposal(
1✔
1537
                    new_proposal.header().prev_block_digest,
1✔
1538
                    new_proposal
1✔
1539
                        .total_guesser_reward()
1✔
1540
                        .expect("Block was validated"),
1✔
1541
                );
1✔
1542
                drop(state);
1✔
1543

1544
                if let Err(rejection_reason) = is_favorable {
1✔
1545
                    match rejection_reason {
×
1546
                        // no need to punish and log if the fees are equal.  we just ignore the incoming proposal.
1547
                        BlockProposalRejectError::InsufficientFee { current, received }
×
1548
                            if Some(received) == current =>
×
1549
                        {
1550
                            debug!("ignoring new block proposal because the fee is equal to the present one");
×
1551
                            return Ok(KEEP_CONNECTION_ALIVE);
×
1552
                        }
1553
                        _ => {
1554
                            warn!("Rejecting new block proposal:\n{rejection_reason}");
×
1555
                            self.punish(NegativePeerSanction::NonFavorableBlockProposal)
×
1556
                                .await?;
×
1557
                            return Ok(KEEP_CONNECTION_ALIVE);
×
1558
                        }
1559
                    }
1560
                };
1✔
1561

1562
                self.send_to_main(PeerTaskToMain::BlockProposal(new_proposal), line!())
1✔
1563
                    .await?;
1✔
1564

1565
                // Valuable, new, hard-to-produce information. Reward peer.
1566
                self.reward(PositivePeerSanction::NewBlockProposal).await?;
1✔
1567

1568
                Ok(KEEP_CONNECTION_ALIVE)
1✔
1569
            }
1570
        }
1571
    }
153✔
1572

1573
    /// send msg to main via mpsc channel `to_main_tx` and logs if slow.
1574
    ///
1575
    /// the channel could potentially fill up in which case the send() will
1576
    /// block until there is capacity.  we wrap the send() so we can log if
1577
    /// that ever happens to the extent it passes slow-scope threshold.
1578
    async fn send_to_main(
1✔
1579
        &self,
1✔
1580
        msg: PeerTaskToMain,
1✔
1581
        line: u32,
1✔
1582
    ) -> Result<(), tokio::sync::mpsc::error::SendError<PeerTaskToMain>> {
1✔
1583
        // we measure across the send() in case the channel ever fills up.
1584
        log_slow_scope!(fn_name!() + &format!("peer_loop.rs:{}", line));
1✔
1585

1586
        self.to_main_tx.send(msg).await
1✔
1587
    }
1✔
1588

1589
    /// Handle message from main task. The boolean return value indicates if
1590
    /// the connection should be closed.
1591
    ///
1592
    /// Locking:
1593
    ///   * acquires `global_state_lock` for write via Self::punish()
1594
    async fn handle_main_task_message<S>(
29✔
1595
        &mut self,
29✔
1596
        msg: MainToPeerTask,
29✔
1597
        peer: &mut S,
29✔
1598
        peer_state_info: &mut MutablePeerState,
29✔
1599
    ) -> Result<bool>
29✔
1600
    where
29✔
1601
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
29✔
1602
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
29✔
1603
        <S as TryStream>::Error: std::error::Error,
29✔
1604
    {
29✔
1605
        debug!("Handling {} message from main in peer loop", msg.get_type());
29✔
1606
        match msg {
29✔
1607
            MainToPeerTask::Block(block) => {
23✔
1608
                // We don't currently differentiate whether a new block came from a peer, or from our
1609
                // own miner. It's always shared through this logic.
1610
                let new_block_height = block.kernel.header.height;
23✔
1611
                if new_block_height > peer_state_info.highest_shared_block_height {
23✔
1612
                    debug!("Sending PeerMessage::BlockNotification");
12✔
1613
                    peer_state_info.highest_shared_block_height = new_block_height;
12✔
1614
                    peer.send(PeerMessage::BlockNotification(block.as_ref().into()))
12✔
1615
                        .await?;
12✔
1616
                    debug!("Sent PeerMessage::BlockNotification");
12✔
1617
                }
11✔
1618
                Ok(KEEP_CONNECTION_ALIVE)
23✔
1619
            }
1620
            MainToPeerTask::RequestBlockBatch(batch_block_request) => {
×
1621
                // Only ask one of the peers about the batch of blocks
1622
                if batch_block_request.peer_addr_target != self.peer_address {
×
1623
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1624
                }
×
1625

1626
                let max_response_len = std::cmp::min(
×
1627
                    STANDARD_BLOCK_BATCH_SIZE,
1628
                    self.global_state_lock.cli().sync_mode_threshold,
×
1629
                );
1630

1631
                peer.send(PeerMessage::BlockRequestBatch(BlockRequestBatch {
×
1632
                    known_blocks: batch_block_request.known_blocks,
×
1633
                    max_response_len,
×
1634
                    anchor: batch_block_request.anchor_mmr,
×
1635
                }))
×
1636
                .await?;
×
1637

1638
                Ok(KEEP_CONNECTION_ALIVE)
×
1639
            }
1640
            MainToPeerTask::PeerSynchronizationTimeout(socket_addr) => {
×
1641
                log_slow_scope!(fn_name!() + "::MainToPeerTask::PeerSynchronizationTimeout");
×
1642

1643
                if self.peer_address != socket_addr {
×
1644
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1645
                }
×
1646

1647
                self.punish(NegativePeerSanction::SynchronizationTimeout)
×
1648
                    .await?;
×
1649

1650
                // If this peer failed the last synchronization attempt, we only
1651
                // sanction, we don't disconnect.
1652
                Ok(KEEP_CONNECTION_ALIVE)
×
1653
            }
1654
            MainToPeerTask::MakePeerDiscoveryRequest => {
1655
                peer.send(PeerMessage::PeerListRequest).await?;
×
1656
                Ok(KEEP_CONNECTION_ALIVE)
×
1657
            }
1658
            MainToPeerTask::Disconnect(peer_address) => {
×
1659
                log_slow_scope!(fn_name!() + "::MainToPeerTask::Disconnect");
×
1660

1661
                // Only disconnect from the peer the main task requested a disconnect for.
1662
                if peer_address != self.peer_address {
×
1663
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1664
                }
×
1665
                self.register_peer_disconnection().await;
×
1666

1667
                Ok(DISCONNECT_CONNECTION)
×
1668
            }
1669
            MainToPeerTask::DisconnectAll() => {
1670
                self.register_peer_disconnection().await;
×
1671

1672
                Ok(DISCONNECT_CONNECTION)
×
1673
            }
1674
            MainToPeerTask::MakeSpecificPeerDiscoveryRequest(target_socket_addr) => {
×
1675
                if target_socket_addr == self.peer_address {
×
1676
                    peer.send(PeerMessage::PeerListRequest).await?;
×
1677
                }
×
1678
                Ok(KEEP_CONNECTION_ALIVE)
×
1679
            }
1680
            MainToPeerTask::TransactionNotification(transaction_notification) => {
6✔
1681
                debug!("Sending PeerMessage::TransactionNotification");
6✔
1682
                peer.send(PeerMessage::TransactionNotification(
6✔
1683
                    transaction_notification,
6✔
1684
                ))
6✔
1685
                .await?;
6✔
1686
                debug!("Sent PeerMessage::TransactionNotification");
6✔
1687
                Ok(KEEP_CONNECTION_ALIVE)
6✔
1688
            }
1689
            MainToPeerTask::BlockProposalNotification(block_proposal_notification) => {
×
1690
                debug!("Sending PeerMessage::BlockProposalNotification");
×
1691
                peer.send(PeerMessage::BlockProposalNotification(
×
1692
                    block_proposal_notification,
×
1693
                ))
×
1694
                .await?;
×
1695
                debug!("Sent PeerMessage::BlockProposalNotification");
×
1696
                Ok(KEEP_CONNECTION_ALIVE)
×
1697
            }
1698
        }
1699
    }
29✔
1700

1701
    /// Loop for the peer tasks. Awaits either a message from the peer over TCP,
1702
    /// or a message from main over the main-to-peer-tasks broadcast channel.
1703
    async fn run<S>(
50✔
1704
        &mut self,
50✔
1705
        mut peer: S,
50✔
1706
        mut from_main_rx: broadcast::Receiver<MainToPeerTask>,
50✔
1707
        peer_state_info: &mut MutablePeerState,
50✔
1708
    ) -> Result<()>
50✔
1709
    where
50✔
1710
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
50✔
1711
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
50✔
1712
        <S as TryStream>::Error: std::error::Error,
50✔
1713
    {
50✔
1714
        loop {
1715
            select! {
189✔
1716
                // Handle peer messages
1717
                peer_message = peer.try_next() => {
189✔
1718
                    let peer_address = self.peer_address;
154✔
1719
                    let peer_message = match peer_message {
154✔
1720
                        Ok(message) => message,
154✔
1721
                        Err(err) => {
×
1722
                            let msg = format!("Error when receiving from peer: {peer_address}");
×
1723
                            error!("{msg}. Error: {err}");
×
1724
                            bail!("{msg}. Closing connection.");
×
1725
                        }
1726
                    };
1727
                    let Some(peer_message) = peer_message else {
154✔
1728
                        info!("Peer {peer_address} closed connection.");
1✔
1729
                        break;
1✔
1730
                    };
1731

1732
                    let syncing =
153✔
1733
                        self.global_state_lock.lock(|s| s.net.sync_anchor.is_some()).await;
153✔
1734
                    let message_type = peer_message.get_type();
153✔
1735
                    if peer_message.ignore_during_sync() && syncing {
153✔
1736
                        debug!(
×
1737
                            "Ignoring {message_type} message when syncing, from {peer_address}",
×
1738
                        );
1739
                        continue;
×
1740
                    }
153✔
1741
                    if peer_message.ignore_when_not_sync() && !syncing {
153✔
1742
                        debug!(
×
1743
                            "Ignoring {message_type} message when not syncing, from {peer_address}",
×
1744
                        );
1745
                        continue;
×
1746
                    }
153✔
1747

1748
                    match self
153✔
1749
                        .handle_peer_message(peer_message, &mut peer, peer_state_info)
153✔
1750
                        .await
153✔
1751
                    {
1752
                        Ok(false) => {}
110✔
1753
                        Ok(true) => {
1754
                            info!("Closing connection to {peer_address}");
42✔
1755
                            break;
42✔
1756
                        }
1757
                        Err(err) => {
1✔
1758
                            warn!("Closing connection to {peer_address} because of error {err}.");
1✔
1759
                            bail!("{err}");
1✔
1760
                        }
1761
                    };
1762
                }
1763

1764
                // Handle messages from main task
1765
                main_msg_res = from_main_rx.recv() => {
189✔
1766
                    let main_msg = main_msg_res.unwrap_or_else(|err| {
29✔
1767
                        let err_msg = format!("Failed to read from main loop: {err}");
×
1768
                        error!(err_msg);
×
1769
                        panic!("{err_msg}");
×
1770
                    });
1771
                    let close_connection = self
29✔
1772
                        .handle_main_task_message(main_msg, &mut peer, peer_state_info)
29✔
1773
                        .await
29✔
1774
                        .unwrap_or_else(|err| {
29✔
1775
                            warn!("handle_main_task_message returned an error: {err}");
×
1776
                            true
×
1777
                        });
×
1778

1779
                    if close_connection {
29✔
1780
                        info!(
×
1781
                            "handle_main_task_message is closing the connection to {}",
×
1782
                            self.peer_address
1783
                        );
1784
                        break;
×
1785
                    }
29✔
1786
                }
1787
            }
1788
        }
1789

1790
        Ok(())
43✔
1791
    }
44✔
1792

1793
    /// Function called before entering the peer loop. Reads the potentially stored
1794
    /// peer standing from the database and does other book-keeping before entering
1795
    /// its final resting place: the `peer_loop`. Note that the peer has already been
1796
    /// accepted for a connection for this loop to be entered. So we don't need
1797
    /// to check the standing again.
1798
    ///
1799
    /// Locking:
1800
    ///   * acquires `global_state_lock` for write
1801
    pub(crate) async fn run_wrapper<S>(
38✔
1802
        &mut self,
38✔
1803
        mut peer: S,
38✔
1804
        from_main_rx: broadcast::Receiver<MainToPeerTask>,
38✔
1805
    ) -> Result<()>
38✔
1806
    where
38✔
1807
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
38✔
1808
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
38✔
1809
        <S as TryStream>::Error: std::error::Error,
38✔
1810
    {
38✔
1811
        const TIME_DIFFERENCE_WARN_THRESHOLD_IN_SECONDS: i128 = 120;
1812

1813
        let cli_args = self.global_state_lock.cli().clone();
38✔
1814

1815
        let standing = self
38✔
1816
            .global_state_lock
38✔
1817
            .lock_guard()
38✔
1818
            .await
38✔
1819
            .net
1820
            .peer_databases
1821
            .peer_standings
1822
            .get(self.peer_address.ip())
38✔
1823
            .await
38✔
1824
            .unwrap_or_else(|| PeerStanding::new(cli_args.peer_tolerance));
38✔
1825

1826
        // Add peer to peer map
1827
        let peer_connection_info = PeerConnectionInfo::new(
38✔
1828
            self.peer_handshake_data.listen_port,
38✔
1829
            self.peer_address,
38✔
1830
            self.inbound_connection,
38✔
1831
        );
1832
        let new_peer = PeerInfo::new(
38✔
1833
            peer_connection_info,
38✔
1834
            &self.peer_handshake_data,
38✔
1835
            SystemTime::now(),
38✔
1836
            cli_args.peer_tolerance,
38✔
1837
        )
1838
        .with_standing(standing);
38✔
1839

1840
        // If timestamps are different, we currently just log a warning.
1841
        let peer_clock_ahead_in_seconds = new_peer.time_difference_in_seconds();
38✔
1842
        let own_clock_ahead_in_seconds = -peer_clock_ahead_in_seconds;
38✔
1843
        if peer_clock_ahead_in_seconds > TIME_DIFFERENCE_WARN_THRESHOLD_IN_SECONDS
38✔
1844
            || own_clock_ahead_in_seconds > TIME_DIFFERENCE_WARN_THRESHOLD_IN_SECONDS
38✔
1845
        {
1846
            let own_datetime_utc: DateTime<Utc> =
×
1847
                new_peer.own_timestamp_connection_established.into();
×
1848
            let peer_datetime_utc: DateTime<Utc> =
×
1849
                new_peer.peer_timestamp_connection_established.into();
×
1850
            warn!(
×
1851
                "New peer {} disagrees with us about time. Peer reports time {} but our clock at handshake was {}.",
×
1852
                new_peer.connected_address(),
×
1853
                peer_datetime_utc.format("%Y-%m-%d %H:%M:%S"),
×
1854
                own_datetime_utc.format("%Y-%m-%d %H:%M:%S"));
×
1855
        }
38✔
1856

1857
        // Multiple tasks might attempt to set up a connection concurrently. So
1858
        // even though we've checked that this connection is allowed, this check
1859
        // could have been invalidated by another task, for one accepting an
1860
        // incoming connection from a peer we're currently connecting to. So we
1861
        // need to make the a check again while holding a write-lock, since
1862
        // we're modifying `peer_map` here. Holding a read-lock doesn't work
1863
        // since it would have to be dropped before acquiring the write-lock.
1864
        {
1865
            let mut global_state = self.global_state_lock.lock_guard_mut().await;
38✔
1866
            let peer_map = &mut global_state.net.peer_map;
38✔
1867
            if peer_map
38✔
1868
                .values()
38✔
1869
                .any(|pi| pi.instance_id() == self.peer_handshake_data.instance_id)
38✔
1870
            {
1871
                bail!("Attempted to connect to already connected peer. Aborting connection.");
×
1872
            }
38✔
1873

1874
            if peer_map.len() >= cli_args.max_num_peers {
38✔
1875
                bail!("Attempted to connect to more peers than allowed. Aborting connection.");
×
1876
            }
38✔
1877

1878
            if peer_map.contains_key(&self.peer_address) {
38✔
1879
                // This shouldn't be possible, unless the peer reports a different instance ID than
1880
                // for the other connection. Only a malignant client would do that.
1881
                bail!("Already connected to peer. Aborting connection");
×
1882
            }
38✔
1883

1884
            peer_map.insert(self.peer_address, new_peer);
38✔
1885
        }
1886

1887
        // `MutablePeerState` contains the part of the peer-loop's state that is mutable
1888
        let mut peer_state = MutablePeerState::new(self.peer_handshake_data.tip_header.height);
38✔
1889

1890
        // If peer indicates more canonical block, request a block notification to catch up ASAP
1891
        if self.peer_handshake_data.tip_header.cumulative_proof_of_work
38✔
1892
            > self
38✔
1893
                .global_state_lock
38✔
1894
                .lock_guard()
38✔
1895
                .await
38✔
1896
                .chain
1897
                .light_state()
38✔
1898
                .kernel
1899
                .header
1900
                .cumulative_proof_of_work
1901
        {
1902
            // Send block notification request to catch up ASAP, in case we're
1903
            // behind the newly-connected peer.
1904
            peer.send(PeerMessage::BlockNotificationRequest).await?;
×
1905
        }
38✔
1906

1907
        let res = self.run(peer, from_main_rx, &mut peer_state).await;
38✔
1908
        debug!("Exited peer loop for {}", self.peer_address);
32✔
1909

1910
        close_peer_connected_callback(
32✔
1911
            self.global_state_lock.clone(),
32✔
1912
            self.peer_address,
32✔
1913
            &self.to_main_tx,
32✔
1914
        )
32✔
1915
        .await;
32✔
1916

1917
        debug!("Ending peer loop for {}", self.peer_address);
31✔
1918

1919
        // Return any error that `run` returned. Returning and not suppressing errors is a quite nice
1920
        // feature to have for testing purposes.
1921
        res
31✔
1922
    }
31✔
1923

1924
    /// Register graceful peer disconnection in the global state.
1925
    ///
1926
    /// See also [`NetworkingState::register_peer_disconnection`][1].
1927
    ///
1928
    /// # Locking:
1929
    ///   * acquires `global_state_lock` for write
1930
    ///
1931
    /// [1]: crate::models::state::networking_state::NetworkingState::register_peer_disconnection
1932
    async fn register_peer_disconnection(&mut self) {
×
1933
        let peer_id = self.peer_handshake_data.instance_id;
×
1934
        self.global_state_lock
×
1935
            .lock_guard_mut()
×
1936
            .await
×
1937
            .net
1938
            .register_peer_disconnection(peer_id, SystemTime::now());
×
1939
    }
×
1940
}
1941

1942
#[cfg(test)]
1943
#[cfg_attr(coverage_nightly, coverage(off))]
1944
mod tests {
1945
    use macro_rules_attr::apply;
1946
    use rand::rngs::StdRng;
1947
    use rand::Rng;
1948
    use rand::SeedableRng;
1949
    use tokio::sync::mpsc::error::TryRecvError;
1950
    use tracing_test::traced_test;
1951

1952
    use super::*;
1953
    use crate::config_models::cli_args;
1954
    use crate::config_models::network::Network;
1955
    use crate::models::blockchain::type_scripts::native_currency_amount::NativeCurrencyAmount;
1956
    use crate::models::peer::peer_block_notifications::PeerBlockNotification;
1957
    use crate::models::peer::transaction_notification::TransactionNotification;
1958
    use crate::models::state::mempool::upgrade_priority::UpgradePriority;
1959
    use crate::models::state::tx_creation_config::TxCreationConfig;
1960
    use crate::models::state::tx_proving_capability::TxProvingCapability;
1961
    use crate::models::state::wallet::wallet_entropy::WalletEntropy;
1962
    use crate::tests::shared::blocks::fake_valid_block_for_tests;
1963
    use crate::tests::shared::blocks::fake_valid_sequence_of_blocks_for_tests;
1964
    use crate::tests::shared::globalstate::get_dummy_handshake_data_for_genesis;
1965
    use crate::tests::shared::globalstate::get_dummy_peer_connection_data_genesis;
1966
    use crate::tests::shared::globalstate::get_dummy_socket_address;
1967
    use crate::tests::shared::globalstate::get_test_genesis_setup;
1968
    use crate::tests::shared::mock_tx::invalid_empty_single_proof_transaction;
1969
    use crate::tests::shared::Action;
1970
    use crate::tests::shared::Mock;
1971
    use crate::tests::shared_tokio_runtime;
1972

1973
    #[traced_test]
1974
    #[apply(shared_tokio_runtime)]
1975
    async fn test_peer_loop_bye() -> Result<()> {
1976
        let mock = Mock::new(vec![Action::Read(PeerMessage::Bye)]);
1977

1978
        let (peer_broadcast_tx, _from_main_rx_clone, to_main_tx, _to_main_rx1, state_lock, hsd) =
1979
            get_test_genesis_setup(Network::Beta, 2, cli_args::Args::default()).await?;
1980

1981
        let peer_address = get_dummy_socket_address(2);
1982
        let from_main_rx_clone = peer_broadcast_tx.subscribe();
1983
        let mut peer_loop_handler =
1984
            PeerLoopHandler::new(to_main_tx, state_lock.clone(), peer_address, hsd, true, 1);
1985
        peer_loop_handler
1986
            .run_wrapper(mock, from_main_rx_clone)
1987
            .await?;
1988

1989
        assert_eq!(
1990
            2,
1991
            state_lock.lock_guard().await.net.peer_map.len(),
1992
            "peer map length must be back to 2 after goodbye"
1993
        );
1994

1995
        Ok(())
1996
    }
1997

1998
    #[traced_test]
1999
    #[apply(shared_tokio_runtime)]
2000
    async fn test_peer_loop_peer_list() {
2001
        let (peer_broadcast_tx, _from_main_rx_clone, to_main_tx, _to_main_rx1, state_lock, _hsd) =
2002
            get_test_genesis_setup(Network::Beta, 2, cli_args::Args::default())
2003
                .await
2004
                .unwrap();
2005

2006
        let mut peer_infos = state_lock
2007
            .lock_guard()
2008
            .await
2009
            .net
2010
            .peer_map
2011
            .clone()
2012
            .into_values()
2013
            .collect::<Vec<_>>();
2014
        peer_infos.sort_by_cached_key(|x| x.connected_address());
2015
        let (peer_address0, instance_id0) = (
2016
            peer_infos[0].connected_address(),
2017
            peer_infos[0].instance_id(),
2018
        );
2019
        let (peer_address1, instance_id1) = (
2020
            peer_infos[1].connected_address(),
2021
            peer_infos[1].instance_id(),
2022
        );
2023

2024
        let (hsd2, sa2) = get_dummy_peer_connection_data_genesis(Network::Beta, 2);
2025
        let expected_response = vec![
2026
            (peer_address0, instance_id0),
2027
            (peer_address1, instance_id1),
2028
            (sa2, hsd2.instance_id),
2029
        ];
2030
        let mock = Mock::new(vec![
2031
            Action::Read(PeerMessage::PeerListRequest),
2032
            Action::Write(PeerMessage::PeerListResponse(expected_response)),
2033
            Action::Read(PeerMessage::Bye),
2034
        ]);
2035

2036
        let from_main_rx_clone = peer_broadcast_tx.subscribe();
2037

2038
        let mut peer_loop_handler =
2039
            PeerLoopHandler::new(to_main_tx, state_lock.clone(), sa2, hsd2, true, 0);
2040
        peer_loop_handler
2041
            .run_wrapper(mock, from_main_rx_clone)
2042
            .await
2043
            .unwrap();
2044

2045
        assert_eq!(
2046
            2,
2047
            state_lock.lock_guard().await.net.peer_map.len(),
2048
            "peer map must have length 2 after saying goodbye to peer 2"
2049
        );
2050
    }
2051

2052
    #[traced_test]
2053
    #[apply(shared_tokio_runtime)]
2054
    async fn node_does_not_record_disconnection_time_when_peer_initiates_disconnect() -> Result<()>
2055
    {
2056
        let args = cli_args::Args::default();
2057
        let network = args.network;
2058
        let (from_main_tx, from_main_rx, to_main_tx, to_main_rx, state_lock, _) =
2059
            get_test_genesis_setup(network, 0, args).await?;
2060

2061
        let peer_address = get_dummy_socket_address(0);
2062
        let peer_handshake_data = get_dummy_handshake_data_for_genesis(network);
2063
        let peer_id = peer_handshake_data.instance_id;
2064
        let mut peer_loop_handler = PeerLoopHandler::new(
2065
            to_main_tx,
2066
            state_lock.clone(),
2067
            peer_address,
2068
            peer_handshake_data,
2069
            true,
2070
            1,
2071
        );
2072
        let mock = Mock::new(vec![Action::Read(PeerMessage::Bye)]);
2073
        peer_loop_handler.run_wrapper(mock, from_main_rx).await?;
2074

2075
        let global_state = state_lock.lock_guard().await;
2076
        assert!(global_state
2077
            .net
2078
            .last_disconnection_time_of_peer(peer_id)
2079
            .is_none());
2080

2081
        drop(to_main_rx);
2082
        drop(from_main_tx);
2083

2084
        Ok(())
2085
    }
2086

2087
    mod blocks {
2088
        use super::*;
2089

2090
        #[traced_test]
2091
        #[apply(shared_tokio_runtime)]
2092
        async fn different_genesis_test() -> Result<()> {
2093
            // In this scenario a peer provides another genesis block than what has been
2094
            // hardcoded. This should lead to the closing of the connection to this peer
2095
            // and a ban.
2096

2097
            let network = Network::Main;
2098
            let (
2099
                _peer_broadcast_tx,
2100
                from_main_rx_clone,
2101
                to_main_tx,
2102
                mut to_main_rx1,
2103
                state_lock,
2104
                hsd,
2105
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2106
            assert_eq!(1000, state_lock.cli().peer_tolerance);
2107
            let peer_address = get_dummy_socket_address(0);
2108

2109
            // Although the database is empty, `get_latest_block` still returns the genesis block,
2110
            // since that block is hardcoded.
2111
            let mut different_genesis_block = state_lock
2112
                .lock_guard()
2113
                .await
2114
                .chain
2115
                .archival_state()
2116
                .get_tip()
2117
                .await;
2118

2119
            different_genesis_block.set_header_nonce(StdRng::seed_from_u64(5550001).random());
2120
            let [block_1_with_different_genesis] = fake_valid_sequence_of_blocks_for_tests(
2121
                &different_genesis_block,
2122
                Timestamp::hours(1),
2123
                StdRng::seed_from_u64(5550001).random(),
2124
                network,
2125
            )
2126
            .await;
2127
            let mock = Mock::new(vec![Action::Read(PeerMessage::Block(Box::new(
2128
                block_1_with_different_genesis.try_into().unwrap(),
2129
            )))]);
2130

2131
            let mut peer_loop_handler = PeerLoopHandler::new(
2132
                to_main_tx.clone(),
2133
                state_lock.clone(),
2134
                peer_address,
2135
                hsd,
2136
                true,
2137
                1,
2138
            );
2139
            let res = peer_loop_handler
2140
                .run_wrapper(mock, from_main_rx_clone)
2141
                .await;
2142
            assert!(
2143
                res.is_err(),
2144
                "run_wrapper must return failure when genesis is different"
2145
            );
2146

2147
            match to_main_rx1.recv().await {
2148
                Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
2149
                _ => bail!("Must receive remove of peer block max height"),
2150
            }
2151

2152
            // Verify that no further message was sent to main loop
2153
            match to_main_rx1.try_recv() {
2154
                Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (),
2155
                _ => bail!("Block notification must not be sent for block with invalid PoW"),
2156
            };
2157

2158
            drop(to_main_tx);
2159

2160
            let peer_standing = state_lock
2161
                .lock_guard()
2162
                .await
2163
                .net
2164
                .get_peer_standing_from_database(peer_address.ip())
2165
                .await;
2166
            assert_eq!(
2167
                -i32::from(state_lock.cli().peer_tolerance),
2168
                peer_standing.unwrap().standing
2169
            );
2170
            assert_eq!(
2171
                NegativePeerSanction::DifferentGenesis,
2172
                peer_standing.unwrap().latest_punishment.unwrap().0
2173
            );
2174

2175
            Ok(())
2176
        }
2177

2178
        #[traced_test]
2179
        #[apply(shared_tokio_runtime)]
2180
        async fn block_without_valid_pow_test() -> Result<()> {
2181
            // In this scenario, a block without a valid PoW is received. This block should be rejected
2182
            // by the peer loop and a notification should never reach the main loop.
2183

2184
            let network = Network::Main;
2185
            let (
2186
                peer_broadcast_tx,
2187
                _from_main_rx_clone,
2188
                to_main_tx,
2189
                mut to_main_rx1,
2190
                state_lock,
2191
                hsd,
2192
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2193
            let peer_address = get_dummy_socket_address(0);
2194
            let genesis_block: Block = state_lock
2195
                .lock_guard()
2196
                .await
2197
                .chain
2198
                .archival_state()
2199
                .get_tip()
2200
                .await;
2201

2202
            // Make a with hash above what the implied threshold from
2203
            let [mut block_without_valid_pow] = fake_valid_sequence_of_blocks_for_tests(
2204
                &genesis_block,
2205
                Timestamp::hours(1),
2206
                StdRng::seed_from_u64(5550001).random(),
2207
                network,
2208
            )
2209
            .await;
2210

2211
            // This *probably* is invalid PoW -- and needs to be for this test to
2212
            // work.
2213
            block_without_valid_pow.set_header_nonce(Digest::default());
2214

2215
            // Sending an invalid block will not necessarily result in a ban. This depends on the peer
2216
            // tolerance that is set in the client. For this reason, we include a "Bye" here.
2217
            let mock = Mock::new(vec![
2218
                Action::Read(PeerMessage::Block(Box::new(
2219
                    block_without_valid_pow.clone().try_into().unwrap(),
2220
                ))),
2221
                Action::Read(PeerMessage::Bye),
2222
            ]);
2223

2224
            let from_main_rx_clone = peer_broadcast_tx.subscribe();
2225

2226
            let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
2227
                to_main_tx.clone(),
2228
                state_lock.clone(),
2229
                peer_address,
2230
                hsd,
2231
                true,
2232
                1,
2233
                block_without_valid_pow.header().timestamp,
2234
            );
2235
            peer_loop_handler
2236
                .run_wrapper(mock, from_main_rx_clone)
2237
                .await
2238
                .expect("sending (one) invalid block should not result in closed connection");
2239

2240
            match to_main_rx1.recv().await {
2241
                Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
2242
                _ => bail!("Must receive remove of peer block max height"),
2243
            }
2244

2245
            // Verify that no further message was sent to main loop
2246
            match to_main_rx1.try_recv() {
2247
                Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (),
2248
                _ => bail!("Block notification must not be sent for block with invalid PoW"),
2249
            };
2250

2251
            // We need to have the transmitter in scope until we have received from it
2252
            // otherwise the receiver will report the disconnected error when we attempt
2253
            // to read from it. And the purpose is to verify that the channel is empty,
2254
            // not that it has been closed.
2255
            drop(to_main_tx);
2256

2257
            // Verify that peer standing was stored in database
2258
            let standing = state_lock
2259
                .lock_guard()
2260
                .await
2261
                .net
2262
                .peer_databases
2263
                .peer_standings
2264
                .get(peer_address.ip())
2265
                .await
2266
                .unwrap();
2267
            assert!(
2268
                standing.standing < 0,
2269
                "Peer must be sanctioned for sending a bad block"
2270
            );
2271

2272
            Ok(())
2273
        }
2274

2275
        #[traced_test]
2276
        #[apply(shared_tokio_runtime)]
2277
        async fn test_peer_loop_block_with_block_in_db() -> Result<()> {
2278
            // The scenario tested here is that a client receives a block that is already
2279
            // known and stored. The expected behavior is to ignore the block and not send
2280
            // a message to the main task.
2281

2282
            let network = Network::Main;
2283
            let (
2284
                peer_broadcast_tx,
2285
                _from_main_rx_clone,
2286
                to_main_tx,
2287
                mut to_main_rx1,
2288
                mut alice,
2289
                hsd,
2290
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2291
            let peer_address = get_dummy_socket_address(0);
2292
            let genesis_block: Block = Block::genesis(network);
2293

2294
            let now = genesis_block.header().timestamp + Timestamp::hours(1);
2295
            let block_1 =
2296
                fake_valid_block_for_tests(&alice, StdRng::seed_from_u64(5550001).random()).await;
2297
            assert!(
2298
                block_1.is_valid(&genesis_block, now, network).await,
2299
                "Block must be valid for this test to make sense"
2300
            );
2301
            alice.set_new_tip(block_1.clone()).await?;
2302

2303
            let mock_peer_messages = Mock::new(vec![
2304
                Action::Read(PeerMessage::Block(Box::new(
2305
                    block_1.clone().try_into().unwrap(),
2306
                ))),
2307
                Action::Read(PeerMessage::Bye),
2308
            ]);
2309

2310
            let from_main_rx_clone = peer_broadcast_tx.subscribe();
2311

2312
            let mut alice_peer_loop_handler = PeerLoopHandler::with_mocked_time(
2313
                to_main_tx.clone(),
2314
                alice.clone(),
2315
                peer_address,
2316
                hsd,
2317
                false,
2318
                1,
2319
                block_1.header().timestamp,
2320
            );
2321
            alice_peer_loop_handler
2322
                .run_wrapper(mock_peer_messages, from_main_rx_clone)
2323
                .await?;
2324

2325
            match to_main_rx1.recv().await {
2326
                Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
2327
                other => bail!("Must receive remove of peer block max height. Got:\n {other:?}"),
2328
            }
2329
            match to_main_rx1.try_recv() {
2330
                Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (),
2331
                _ => bail!("Block notification must not be sent for block with invalid PoW"),
2332
            };
2333
            drop(to_main_tx);
2334

2335
            if !alice.lock_guard().await.net.peer_map.is_empty() {
2336
                bail!("peer map must be empty after closing connection gracefully");
2337
            }
2338

2339
            Ok(())
2340
        }
2341

2342
        #[traced_test]
2343
        #[apply(shared_tokio_runtime)]
2344
        async fn block_request_batch_simple() {
2345
            // Scenario: Six blocks (including genesis) are known. Peer requests
2346
            // from all possible starting points, and client responds with the
2347
            // correct list of blocks.
2348
            let network = Network::Main;
2349
            let (
2350
                _peer_broadcast_tx,
2351
                from_main_rx_clone,
2352
                to_main_tx,
2353
                _to_main_rx1,
2354
                mut state_lock,
2355
                hsd,
2356
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default())
2357
                .await
2358
                .unwrap();
2359
            let genesis_block: Block = Block::genesis(network);
2360
            let peer_address = get_dummy_socket_address(0);
2361
            let [block_1, block_2, block_3, block_4, block_5] =
2362
                fake_valid_sequence_of_blocks_for_tests(
2363
                    &genesis_block,
2364
                    Timestamp::hours(1),
2365
                    StdRng::seed_from_u64(5550001).random(),
2366
                    network,
2367
                )
2368
                .await;
2369
            let blocks = vec![
2370
                genesis_block,
2371
                block_1,
2372
                block_2,
2373
                block_3,
2374
                block_4,
2375
                block_5.clone(),
2376
            ];
2377
            for block in blocks.iter().skip(1) {
2378
                state_lock.set_new_tip(block.to_owned()).await.unwrap();
2379
            }
2380

2381
            let mmra = state_lock
2382
                .lock_guard()
2383
                .await
2384
                .chain
2385
                .archival_state()
2386
                .archival_block_mmr
2387
                .ammr()
2388
                .to_accumulator_async()
2389
                .await;
2390
            for i in 0..=4 {
2391
                let expected_response = {
2392
                    let state = state_lock.lock_guard().await;
2393
                    let blocks_for_response = blocks.iter().skip(i + 1).cloned().collect_vec();
2394
                    PeerLoopHandler::batch_response(&state, blocks_for_response, &mmra)
2395
                        .await
2396
                        .unwrap()
2397
                };
2398
                let mock = Mock::new(vec![
2399
                    Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch {
2400
                        known_blocks: vec![blocks[i].hash()],
2401
                        max_response_len: 14,
2402
                        anchor: mmra.clone(),
2403
                    })),
2404
                    Action::Write(PeerMessage::BlockResponseBatch(expected_response)),
2405
                    Action::Read(PeerMessage::Bye),
2406
                ]);
2407
                let mut peer_loop_handler = PeerLoopHandler::new(
2408
                    to_main_tx.clone(),
2409
                    state_lock.clone(),
2410
                    peer_address,
2411
                    hsd.clone(),
2412
                    false,
2413
                    1,
2414
                );
2415

2416
                peer_loop_handler
2417
                    .run_wrapper(mock, from_main_rx_clone.resubscribe())
2418
                    .await
2419
                    .unwrap();
2420
            }
2421
        }
2422

2423
        #[traced_test]
2424
        #[apply(shared_tokio_runtime)]
2425
        async fn block_request_batch_in_order_test() -> Result<()> {
2426
            // Scenario: A fork began at block 2, node knows two blocks of height 2 and two of height 3.
2427
            // A peer requests a batch of blocks starting from block 1. Ensure that the correct blocks
2428
            // are returned.
2429

2430
            let network = Network::Main;
2431
            let (
2432
                _peer_broadcast_tx,
2433
                from_main_rx_clone,
2434
                to_main_tx,
2435
                _to_main_rx1,
2436
                mut state_lock,
2437
                hsd,
2438
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2439
            let genesis_block: Block = Block::genesis(network);
2440
            let peer_address = get_dummy_socket_address(0);
2441
            let [block_1, block_2_a, block_3_a] = fake_valid_sequence_of_blocks_for_tests(
2442
                &genesis_block,
2443
                Timestamp::hours(1),
2444
                StdRng::seed_from_u64(5550001).random(),
2445
                network,
2446
            )
2447
            .await;
2448
            let [block_2_b, block_3_b] = fake_valid_sequence_of_blocks_for_tests(
2449
                &block_1,
2450
                Timestamp::hours(1),
2451
                StdRng::seed_from_u64(5550002).random(),
2452
                network,
2453
            )
2454
            .await;
2455
            assert_ne!(block_2_b.hash(), block_2_a.hash());
2456

2457
            state_lock.set_new_tip(block_1.clone()).await?;
2458
            state_lock.set_new_tip(block_2_a.clone()).await?;
2459
            state_lock.set_new_tip(block_2_b.clone()).await?;
2460
            state_lock.set_new_tip(block_3_b.clone()).await?;
2461
            state_lock.set_new_tip(block_3_a.clone()).await?;
2462

2463
            let anchor = state_lock
2464
                .lock_guard()
2465
                .await
2466
                .chain
2467
                .archival_state()
2468
                .archival_block_mmr
2469
                .ammr()
2470
                .to_accumulator_async()
2471
                .await;
2472
            let response_1 = {
2473
                let state_lock = state_lock.lock_guard().await;
2474
                PeerLoopHandler::batch_response(
2475
                    &state_lock,
2476
                    vec![block_1.clone(), block_2_a.clone(), block_3_a.clone()],
2477
                    &anchor,
2478
                )
2479
                .await
2480
                .unwrap()
2481
            };
2482

2483
            let mut mock = Mock::new(vec![
2484
                Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch {
2485
                    known_blocks: vec![genesis_block.hash()],
2486
                    max_response_len: 14,
2487
                    anchor: anchor.clone(),
2488
                })),
2489
                Action::Write(PeerMessage::BlockResponseBatch(response_1)),
2490
                Action::Read(PeerMessage::Bye),
2491
            ]);
2492

2493
            let mut peer_loop_handler_1 = PeerLoopHandler::with_mocked_time(
2494
                to_main_tx.clone(),
2495
                state_lock.clone(),
2496
                peer_address,
2497
                hsd.clone(),
2498
                false,
2499
                1,
2500
                block_3_a.header().timestamp,
2501
            );
2502

2503
            peer_loop_handler_1
2504
                .run_wrapper(mock, from_main_rx_clone.resubscribe())
2505
                .await?;
2506

2507
            // Peer knows block 2_b, verify that canonical chain with 2_a is returned
2508
            let response_2 = {
2509
                let state_lock = state_lock.lock_guard().await;
2510
                PeerLoopHandler::batch_response(
2511
                    &state_lock,
2512
                    vec![block_2_a, block_3_a.clone()],
2513
                    &anchor,
2514
                )
2515
                .await
2516
                .unwrap()
2517
            };
2518
            mock = Mock::new(vec![
2519
                Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch {
2520
                    known_blocks: vec![block_2_b.hash(), block_1.hash(), genesis_block.hash()],
2521
                    max_response_len: 14,
2522
                    anchor,
2523
                })),
2524
                Action::Write(PeerMessage::BlockResponseBatch(response_2)),
2525
                Action::Read(PeerMessage::Bye),
2526
            ]);
2527

2528
            let mut peer_loop_handler_2 = PeerLoopHandler::with_mocked_time(
2529
                to_main_tx.clone(),
2530
                state_lock.clone(),
2531
                peer_address,
2532
                hsd,
2533
                false,
2534
                1,
2535
                block_3_a.header().timestamp,
2536
            );
2537

2538
            peer_loop_handler_2
2539
                .run_wrapper(mock, from_main_rx_clone)
2540
                .await?;
2541

2542
            Ok(())
2543
        }
2544

2545
        #[traced_test]
2546
        #[apply(shared_tokio_runtime)]
2547
        async fn block_request_batch_out_of_order_test() -> Result<()> {
2548
            // Scenario: A fork began at block 2, node knows two blocks of height 2 and two of height 3.
2549
            // A peer requests a batch of blocks starting from block 1, but the peer supplies their
2550
            // hashes in a wrong order. Ensure that the correct blocks are returned, in the right order.
2551
            // The blocks will be supplied in the correct order but starting from the first digest in
2552
            // the list that is known and canonical.
2553

2554
            let network = Network::Main;
2555
            let (
2556
                _peer_broadcast_tx,
2557
                from_main_rx_clone,
2558
                to_main_tx,
2559
                _to_main_rx1,
2560
                mut state_lock,
2561
                hsd,
2562
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2563
            let genesis_block = Block::genesis(network);
2564
            let peer_address = get_dummy_socket_address(0);
2565
            let [block_1, block_2_a, block_3_a] = fake_valid_sequence_of_blocks_for_tests(
2566
                &genesis_block,
2567
                Timestamp::hours(1),
2568
                StdRng::seed_from_u64(5550001).random(),
2569
                network,
2570
            )
2571
            .await;
2572
            let [block_2_b, block_3_b] = fake_valid_sequence_of_blocks_for_tests(
2573
                &block_1,
2574
                Timestamp::hours(1),
2575
                StdRng::seed_from_u64(5550002).random(),
2576
                network,
2577
            )
2578
            .await;
2579
            assert_ne!(block_2_a.hash(), block_2_b.hash());
2580

2581
            state_lock.set_new_tip(block_1.clone()).await?;
2582
            state_lock.set_new_tip(block_2_a.clone()).await?;
2583
            state_lock.set_new_tip(block_2_b.clone()).await?;
2584
            state_lock.set_new_tip(block_3_b.clone()).await?;
2585
            state_lock.set_new_tip(block_3_a.clone()).await?;
2586

2587
            // Peer knows block 2_b, verify that canonical chain with 2_a is returned
2588
            let mut expected_anchor = block_3_a.body().block_mmr_accumulator.clone();
2589
            expected_anchor.append(block_3_a.hash());
2590
            let state_anchor = state_lock
2591
                .lock_guard()
2592
                .await
2593
                .chain
2594
                .archival_state()
2595
                .archival_block_mmr
2596
                .ammr()
2597
                .to_accumulator_async()
2598
                .await;
2599
            assert_eq!(
2600
                expected_anchor, state_anchor,
2601
                "Catching assumption about MMRA in tip and in archival state"
2602
            );
2603

2604
            let response = {
2605
                let state_lock = state_lock.lock_guard().await;
2606
                PeerLoopHandler::batch_response(
2607
                    &state_lock,
2608
                    vec![block_1.clone(), block_2_a, block_3_a.clone()],
2609
                    &expected_anchor,
2610
                )
2611
                .await
2612
                .unwrap()
2613
            };
2614
            let mock = Mock::new(vec![
2615
                Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch {
2616
                    known_blocks: vec![block_2_b.hash(), genesis_block.hash(), block_1.hash()],
2617
                    max_response_len: 14,
2618
                    anchor: expected_anchor,
2619
                })),
2620
                // Since genesis block is the 1st known in the list of known blocks,
2621
                // it's immediate descendent, block_1, is the first one returned.
2622
                Action::Write(PeerMessage::BlockResponseBatch(response)),
2623
                Action::Read(PeerMessage::Bye),
2624
            ]);
2625

2626
            let mut peer_loop_handler_2 = PeerLoopHandler::with_mocked_time(
2627
                to_main_tx.clone(),
2628
                state_lock.clone(),
2629
                peer_address,
2630
                hsd,
2631
                false,
2632
                1,
2633
                block_3_a.header().timestamp,
2634
            );
2635

2636
            peer_loop_handler_2
2637
                .run_wrapper(mock, from_main_rx_clone)
2638
                .await?;
2639

2640
            Ok(())
2641
        }
2642

2643
        #[traced_test]
2644
        #[apply(shared_tokio_runtime)]
2645
        async fn request_unknown_height_doesnt_crash() {
2646
            // Scenario: Only genesis block is known. Peer requests block of height
2647
            // 2.
2648
            let network = Network::Main;
2649
            let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, state_lock, hsd) =
2650
                get_test_genesis_setup(network, 0, cli_args::Args::default())
2651
                    .await
2652
                    .unwrap();
2653
            let peer_address = get_dummy_socket_address(0);
2654
            let mock = Mock::new(vec![
2655
                Action::Read(PeerMessage::BlockRequestByHeight(2.into())),
2656
                Action::Read(PeerMessage::Bye),
2657
            ]);
2658

2659
            let mut peer_loop_handler = PeerLoopHandler::new(
2660
                to_main_tx.clone(),
2661
                state_lock.clone(),
2662
                peer_address,
2663
                hsd,
2664
                false,
2665
                1,
2666
            );
2667

2668
            // This will return error if seen read/write order does not match that of the
2669
            // mocked object.
2670
            peer_loop_handler
2671
                .run_wrapper(mock, from_main_rx_clone)
2672
                .await
2673
                .unwrap();
2674

2675
            // Verify that peer is sanctioned for this nonsense.
2676
            assert!(state_lock
2677
                .lock_guard()
2678
                .await
2679
                .net
2680
                .get_peer_standing_from_database(peer_address.ip())
2681
                .await
2682
                .unwrap()
2683
                .standing
2684
                .is_negative());
2685
        }
2686

2687
        #[traced_test]
2688
        #[apply(shared_tokio_runtime)]
2689
        async fn find_canonical_chain_when_multiple_blocks_at_same_height_test() -> Result<()> {
2690
            // Scenario: A fork began at block 2, node knows two blocks of height 2 and two of height 3.
2691
            // A peer requests a block at height 2. Verify that the correct block at height 2 is
2692
            // returned.
2693

2694
            let network = Network::Main;
2695
            let (
2696
                _peer_broadcast_tx,
2697
                from_main_rx_clone,
2698
                to_main_tx,
2699
                _to_main_rx1,
2700
                mut state_lock,
2701
                hsd,
2702
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2703
            let genesis_block = Block::genesis(network);
2704
            let peer_address = get_dummy_socket_address(0);
2705

2706
            let [block_1, block_2_a, block_3_a] = fake_valid_sequence_of_blocks_for_tests(
2707
                &genesis_block,
2708
                Timestamp::hours(1),
2709
                StdRng::seed_from_u64(5550001).random(),
2710
                network,
2711
            )
2712
            .await;
2713
            let [block_2_b, block_3_b] = fake_valid_sequence_of_blocks_for_tests(
2714
                &block_1,
2715
                Timestamp::hours(1),
2716
                StdRng::seed_from_u64(5550002).random(),
2717
                network,
2718
            )
2719
            .await;
2720
            assert_ne!(block_2_a.hash(), block_2_b.hash());
2721

2722
            state_lock.set_new_tip(block_1.clone()).await?;
2723
            state_lock.set_new_tip(block_2_a.clone()).await?;
2724
            state_lock.set_new_tip(block_2_b.clone()).await?;
2725
            state_lock.set_new_tip(block_3_b.clone()).await?;
2726
            state_lock.set_new_tip(block_3_a.clone()).await?;
2727

2728
            let mock = Mock::new(vec![
2729
                Action::Read(PeerMessage::BlockRequestByHeight(2.into())),
2730
                Action::Write(PeerMessage::Block(Box::new(block_2_a.try_into().unwrap()))),
2731
                Action::Read(PeerMessage::BlockRequestByHeight(3.into())),
2732
                Action::Write(PeerMessage::Block(Box::new(
2733
                    block_3_a.clone().try_into().unwrap(),
2734
                ))),
2735
                Action::Read(PeerMessage::Bye),
2736
            ]);
2737

2738
            let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
2739
                to_main_tx.clone(),
2740
                state_lock.clone(),
2741
                peer_address,
2742
                hsd,
2743
                false,
2744
                1,
2745
                block_3_a.header().timestamp,
2746
            );
2747

2748
            // This will return error if seen read/write order does not match that of the
2749
            // mocked object.
2750
            peer_loop_handler
2751
                .run_wrapper(mock, from_main_rx_clone)
2752
                .await?;
2753

2754
            Ok(())
2755
        }
2756

2757
        #[traced_test]
2758
        #[apply(shared_tokio_runtime)]
2759
        async fn receival_of_block_notification_height_1() {
2760
            // Scenario: client only knows genesis block. Then receives block
2761
            // notification of height 1. Must request block 1.
2762
            let network = Network::Main;
2763
            let mut rng = StdRng::seed_from_u64(5552401);
2764
            let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, to_main_rx1, state_lock, hsd) =
2765
                get_test_genesis_setup(network, 0, cli_args::Args::default())
2766
                    .await
2767
                    .unwrap();
2768
            let block_1 = fake_valid_block_for_tests(&state_lock, rng.random()).await;
2769
            let notification_height1 = (&block_1).into();
2770
            let mock = Mock::new(vec![
2771
                Action::Read(PeerMessage::BlockNotification(notification_height1)),
2772
                Action::Write(PeerMessage::BlockRequestByHeight(1u64.into())),
2773
                Action::Read(PeerMessage::Bye),
2774
            ]);
2775

2776
            let peer_address = get_dummy_socket_address(0);
2777
            let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
2778
                to_main_tx.clone(),
2779
                state_lock.clone(),
2780
                peer_address,
2781
                hsd,
2782
                false,
2783
                1,
2784
                block_1.header().timestamp,
2785
            );
2786
            peer_loop_handler
2787
                .run_wrapper(mock, from_main_rx_clone)
2788
                .await
2789
                .unwrap();
2790

2791
            drop(to_main_rx1);
2792
        }
2793

2794
        #[traced_test]
2795
        #[apply(shared_tokio_runtime)]
2796
        async fn receive_block_request_by_height_block_7() {
2797
            // Scenario: client only knows blocks up to height 7. Then receives block-
2798
            // request-by-height for height 7. Must respond with block 7.
2799
            let network = Network::Main;
2800
            let mut rng = StdRng::seed_from_u64(5552401);
2801
            let (
2802
                _peer_broadcast_tx,
2803
                from_main_rx_clone,
2804
                to_main_tx,
2805
                to_main_rx1,
2806
                mut state_lock,
2807
                hsd,
2808
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default())
2809
                .await
2810
                .unwrap();
2811
            let genesis_block = Block::genesis(network);
2812
            let blocks: [Block; 7] = fake_valid_sequence_of_blocks_for_tests(
2813
                &genesis_block,
2814
                Timestamp::hours(1),
2815
                rng.random(),
2816
                network,
2817
            )
2818
            .await;
2819
            let block7 = blocks.last().unwrap().to_owned();
2820
            let tip_height: u64 = block7.header().height.into();
2821
            assert_eq!(7, tip_height);
2822

2823
            for block in &blocks {
2824
                state_lock.set_new_tip(block.to_owned()).await.unwrap();
2825
            }
2826

2827
            let block7_response = PeerMessage::Block(Box::new(block7.try_into().unwrap()));
2828
            let mock = Mock::new(vec![
2829
                Action::Read(PeerMessage::BlockRequestByHeight(7u64.into())),
2830
                Action::Write(block7_response),
2831
                Action::Read(PeerMessage::Bye),
2832
            ]);
2833

2834
            let peer_address = get_dummy_socket_address(0);
2835
            let mut peer_loop_handler = PeerLoopHandler::new(
2836
                to_main_tx.clone(),
2837
                state_lock.clone(),
2838
                peer_address,
2839
                hsd,
2840
                false,
2841
                1,
2842
            );
2843
            peer_loop_handler
2844
                .run_wrapper(mock, from_main_rx_clone)
2845
                .await
2846
                .unwrap();
2847

2848
            drop(to_main_rx1);
2849
        }
2850

2851
        #[traced_test]
2852
        #[apply(shared_tokio_runtime)]
2853
        async fn test_peer_loop_receival_of_first_block() -> Result<()> {
2854
            // Scenario: client only knows genesis block. Then receives block 1.
2855

2856
            let network = Network::Main;
2857
            let mut rng = StdRng::seed_from_u64(5550001);
2858
            let (
2859
                _peer_broadcast_tx,
2860
                from_main_rx_clone,
2861
                to_main_tx,
2862
                mut to_main_rx1,
2863
                state_lock,
2864
                hsd,
2865
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2866
            let peer_address = get_dummy_socket_address(0);
2867

2868
            let block_1 = fake_valid_block_for_tests(&state_lock, rng.random()).await;
2869
            let mock = Mock::new(vec![
2870
                Action::Read(PeerMessage::Block(Box::new(
2871
                    block_1.clone().try_into().unwrap(),
2872
                ))),
2873
                Action::Read(PeerMessage::Bye),
2874
            ]);
2875

2876
            let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
2877
                to_main_tx.clone(),
2878
                state_lock.clone(),
2879
                peer_address,
2880
                hsd,
2881
                false,
2882
                1,
2883
                block_1.header().timestamp,
2884
            );
2885
            peer_loop_handler
2886
                .run_wrapper(mock, from_main_rx_clone)
2887
                .await?;
2888

2889
            // Verify that a block was sent to `main_loop`
2890
            match to_main_rx1.recv().await {
2891
                Some(PeerTaskToMain::NewBlocks(_block)) => (),
2892
                _ => bail!("Did not find msg sent to main task"),
2893
            };
2894

2895
            match to_main_rx1.recv().await {
2896
                Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
2897
                _ => bail!("Must receive remove of peer block max height"),
2898
            }
2899

2900
            if !state_lock.lock_guard().await.net.peer_map.is_empty() {
2901
                bail!("peer map must be empty after closing connection gracefully");
2902
            }
2903

2904
            Ok(())
2905
        }
2906

2907
        #[traced_test]
2908
        #[apply(shared_tokio_runtime)]
2909
        async fn test_peer_loop_receival_of_second_block_no_blocks_in_db() -> Result<()> {
2910
            // In this scenario, the client only knows the genesis block (block 0) and then
2911
            // receives block 2, meaning that block 1 will have to be requested.
2912

2913
            let network = Network::Main;
2914
            let (
2915
                _peer_broadcast_tx,
2916
                from_main_rx_clone,
2917
                to_main_tx,
2918
                mut to_main_rx1,
2919
                state_lock,
2920
                hsd,
2921
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2922
            let peer_address = get_dummy_socket_address(0);
2923
            let genesis_block: Block = state_lock
2924
                .lock_guard()
2925
                .await
2926
                .chain
2927
                .archival_state()
2928
                .get_tip()
2929
                .await;
2930
            let [block_1, block_2] = fake_valid_sequence_of_blocks_for_tests(
2931
                &genesis_block,
2932
                Timestamp::hours(1),
2933
                StdRng::seed_from_u64(5550001).random(),
2934
                network,
2935
            )
2936
            .await;
2937

2938
            let mock = Mock::new(vec![
2939
                Action::Read(PeerMessage::Block(Box::new(
2940
                    block_2.clone().try_into().unwrap(),
2941
                ))),
2942
                Action::Write(PeerMessage::BlockRequestByHash(block_1.hash())),
2943
                Action::Read(PeerMessage::Block(Box::new(
2944
                    block_1.clone().try_into().unwrap(),
2945
                ))),
2946
                Action::Read(PeerMessage::Bye),
2947
            ]);
2948

2949
            let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
2950
                to_main_tx.clone(),
2951
                state_lock.clone(),
2952
                peer_address,
2953
                hsd,
2954
                true,
2955
                1,
2956
                block_2.header().timestamp,
2957
            );
2958
            peer_loop_handler
2959
                .run_wrapper(mock, from_main_rx_clone)
2960
                .await?;
2961

2962
            match to_main_rx1.recv().await {
2963
                Some(PeerTaskToMain::NewBlocks(blocks)) => {
2964
                    if blocks[0].hash() != block_1.hash() {
2965
                        bail!("1st received block by main loop must be block 1");
2966
                    }
2967
                    if blocks[1].hash() != block_2.hash() {
2968
                        bail!("2nd received block by main loop must be block 2");
2969
                    }
2970
                }
2971
                _ => bail!("Did not find msg sent to main task 1"),
2972
            };
2973
            match to_main_rx1.recv().await {
2974
                Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
2975
                _ => bail!("Must receive remove of peer block max height"),
2976
            }
2977

2978
            if !state_lock.lock_guard().await.net.peer_map.is_empty() {
2979
                bail!("peer map must be empty after closing connection gracefully");
2980
            }
2981

2982
            Ok(())
2983
        }
2984

2985
        #[traced_test]
2986
        #[apply(shared_tokio_runtime)]
2987
        async fn prevent_ram_exhaustion_test() -> Result<()> {
2988
            // In this scenario the peer sends more blocks than the client allows to store in the
2989
            // fork-reconciliation field. This should result in abandonment of the fork-reconciliation
2990
            // process as the alternative is that the program will crash because it runs out of RAM.
2991

2992
            let network = Network::Main;
2993
            let mut rng = StdRng::seed_from_u64(5550001);
2994
            let (
2995
                _peer_broadcast_tx,
2996
                from_main_rx_clone,
2997
                to_main_tx,
2998
                mut to_main_rx1,
2999
                mut state_lock,
3000
                _hsd,
3001
            ) = get_test_genesis_setup(network, 1, cli_args::Args::default()).await?;
3002
            let genesis_block = Block::genesis(network);
3003

3004
            // Restrict max number of blocks held in memory to 2.
3005
            let mut cli = state_lock.cli().clone();
3006
            cli.sync_mode_threshold = 2;
3007
            state_lock.set_cli(cli).await;
3008

3009
            let (hsd1, peer_address1) = get_dummy_peer_connection_data_genesis(Network::Beta, 1);
3010
            let [block_1, _block_2, block_3, block_4] = fake_valid_sequence_of_blocks_for_tests(
3011
                &genesis_block,
3012
                Timestamp::hours(1),
3013
                rng.random(),
3014
                network,
3015
            )
3016
            .await;
3017
            state_lock.set_new_tip(block_1.clone()).await?;
3018

3019
            let mock = Mock::new(vec![
3020
                Action::Read(PeerMessage::Block(Box::new(
3021
                    block_4.clone().try_into().unwrap(),
3022
                ))),
3023
                Action::Write(PeerMessage::BlockRequestByHash(block_3.hash())),
3024
                Action::Read(PeerMessage::Block(Box::new(
3025
                    block_3.clone().try_into().unwrap(),
3026
                ))),
3027
                Action::Read(PeerMessage::Bye),
3028
            ]);
3029

3030
            let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
3031
                to_main_tx.clone(),
3032
                state_lock.clone(),
3033
                peer_address1,
3034
                hsd1,
3035
                true,
3036
                1,
3037
                block_4.header().timestamp,
3038
            );
3039
            peer_loop_handler
3040
                .run_wrapper(mock, from_main_rx_clone)
3041
                .await?;
3042

3043
            match to_main_rx1.recv().await {
3044
                Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
3045
                _ => bail!("Must receive remove of peer block max height"),
3046
            }
3047

3048
            // Verify that no block is sent to main loop.
3049
            match to_main_rx1.try_recv() {
3050
            Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (),
3051
            _ => bail!("Peer must not handle more fork-reconciliation blocks than specified in CLI arguments"),
3052
        };
3053
            drop(to_main_tx);
3054

3055
            // Verify that peer is sanctioned for failed fork reconciliation attempt
3056
            assert!(state_lock
3057
                .lock_guard()
3058
                .await
3059
                .net
3060
                .get_peer_standing_from_database(peer_address1.ip())
3061
                .await
3062
                .unwrap()
3063
                .standing
3064
                .is_negative());
3065

3066
            Ok(())
3067
        }
3068

3069
        #[traced_test]
3070
        #[apply(shared_tokio_runtime)]
3071
        async fn test_peer_loop_receival_of_fourth_block_one_block_in_db() {
3072
            // In this scenario, the client know the genesis block (block 0) and block 1, it
3073
            // then receives block 4, meaning that block 3 and 2 will have to be requested.
3074

3075
            let network = Network::Main;
3076
            let (
3077
                _peer_broadcast_tx,
3078
                from_main_rx_clone,
3079
                to_main_tx,
3080
                mut to_main_rx1,
3081
                mut state_lock,
3082
                hsd,
3083
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default())
3084
                .await
3085
                .unwrap();
3086
            let peer_address: SocketAddr = get_dummy_socket_address(0);
3087
            let genesis_block = Block::genesis(network);
3088
            let [block_1, block_2, block_3, block_4] = fake_valid_sequence_of_blocks_for_tests(
3089
                &genesis_block,
3090
                Timestamp::hours(1),
3091
                StdRng::seed_from_u64(5550001).random(),
3092
                network,
3093
            )
3094
            .await;
3095
            state_lock.set_new_tip(block_1.clone()).await.unwrap();
3096

3097
            let mock = Mock::new(vec![
3098
                Action::Read(PeerMessage::Block(Box::new(
3099
                    block_4.clone().try_into().unwrap(),
3100
                ))),
3101
                Action::Write(PeerMessage::BlockRequestByHash(block_3.hash())),
3102
                Action::Read(PeerMessage::Block(Box::new(
3103
                    block_3.clone().try_into().unwrap(),
3104
                ))),
3105
                Action::Write(PeerMessage::BlockRequestByHash(block_2.hash())),
3106
                Action::Read(PeerMessage::Block(Box::new(
3107
                    block_2.clone().try_into().unwrap(),
3108
                ))),
3109
                Action::Read(PeerMessage::Bye),
3110
            ]);
3111

3112
            let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
3113
                to_main_tx.clone(),
3114
                state_lock.clone(),
3115
                peer_address,
3116
                hsd,
3117
                true,
3118
                1,
3119
                block_4.header().timestamp,
3120
            );
3121
            peer_loop_handler
3122
                .run_wrapper(mock, from_main_rx_clone)
3123
                .await
3124
                .unwrap();
3125

3126
            let Some(PeerTaskToMain::NewBlocks(blocks)) = to_main_rx1.recv().await else {
3127
                panic!("Did not find msg sent to main task");
3128
            };
3129
            assert_eq!(blocks[0].hash(), block_2.hash());
3130
            assert_eq!(blocks[1].hash(), block_3.hash());
3131
            assert_eq!(blocks[2].hash(), block_4.hash());
3132

3133
            let Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) = to_main_rx1.recv().await else {
3134
                panic!("Must receive remove of peer block max height");
3135
            };
3136

3137
            assert!(
3138
                state_lock.lock_guard().await.net.peer_map.is_empty(),
3139
                "peer map must be empty after closing connection gracefully"
3140
            );
3141
        }
3142

3143
        #[traced_test]
3144
        #[apply(shared_tokio_runtime)]
3145
        async fn test_peer_loop_receival_of_third_block_no_blocks_in_db() -> Result<()> {
3146
            // In this scenario, the client only knows the genesis block (block 0) and then
3147
            // receives block 3, meaning that block 2 and 1 will have to be requested.
3148

3149
            let network = Network::Main;
3150
            let (
3151
                _peer_broadcast_tx,
3152
                from_main_rx_clone,
3153
                to_main_tx,
3154
                mut to_main_rx1,
3155
                state_lock,
3156
                hsd,
3157
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
3158
            let peer_address = get_dummy_socket_address(0);
3159
            let genesis_block = Block::genesis(network);
3160

3161
            let [block_1, block_2, block_3] = fake_valid_sequence_of_blocks_for_tests(
3162
                &genesis_block,
3163
                Timestamp::hours(1),
3164
                StdRng::seed_from_u64(5550001).random(),
3165
                network,
3166
            )
3167
            .await;
3168

3169
            let mock = Mock::new(vec![
3170
                Action::Read(PeerMessage::Block(Box::new(
3171
                    block_3.clone().try_into().unwrap(),
3172
                ))),
3173
                Action::Write(PeerMessage::BlockRequestByHash(block_2.hash())),
3174
                Action::Read(PeerMessage::Block(Box::new(
3175
                    block_2.clone().try_into().unwrap(),
3176
                ))),
3177
                Action::Write(PeerMessage::BlockRequestByHash(block_1.hash())),
3178
                Action::Read(PeerMessage::Block(Box::new(
3179
                    block_1.clone().try_into().unwrap(),
3180
                ))),
3181
                Action::Read(PeerMessage::Bye),
3182
            ]);
3183

3184
            let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
3185
                to_main_tx.clone(),
3186
                state_lock.clone(),
3187
                peer_address,
3188
                hsd,
3189
                true,
3190
                1,
3191
                block_3.header().timestamp,
3192
            );
3193
            peer_loop_handler
3194
                .run_wrapper(mock, from_main_rx_clone)
3195
                .await?;
3196

3197
            match to_main_rx1.recv().await {
3198
                Some(PeerTaskToMain::NewBlocks(blocks)) => {
3199
                    if blocks[0].hash() != block_1.hash() {
3200
                        bail!("1st received block by main loop must be block 1");
3201
                    }
3202
                    if blocks[1].hash() != block_2.hash() {
3203
                        bail!("2nd received block by main loop must be block 2");
3204
                    }
3205
                    if blocks[2].hash() != block_3.hash() {
3206
                        bail!("3rd received block by main loop must be block 3");
3207
                    }
3208
                }
3209
                _ => bail!("Did not find msg sent to main task"),
3210
            };
3211
            match to_main_rx1.recv().await {
3212
                Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
3213
                _ => bail!("Must receive remove of peer block max height"),
3214
            }
3215

3216
            if !state_lock.lock_guard().await.net.peer_map.is_empty() {
3217
                bail!("peer map must be empty after closing connection gracefully");
3218
            }
3219

3220
            Ok(())
3221
        }
3222

3223
        #[traced_test]
3224
        #[apply(shared_tokio_runtime)]
3225
        async fn test_block_reconciliation_interrupted_by_block_notification() -> Result<()> {
3226
            // In this scenario, the client know the genesis block (block 0) and block 1, it
3227
            // then receives block 4, meaning that block 3, 2, and 1 will have to be requested.
3228
            // But the requests are interrupted by the peer sending another message: a new block
3229
            // notification.
3230

3231
            let network = Network::Main;
3232
            let (
3233
                _peer_broadcast_tx,
3234
                from_main_rx_clone,
3235
                to_main_tx,
3236
                mut to_main_rx1,
3237
                mut state_lock,
3238
                hsd,
3239
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
3240
            let peer_socket_address: SocketAddr = get_dummy_socket_address(0);
3241
            let genesis_block: Block = state_lock
3242
                .lock_guard()
3243
                .await
3244
                .chain
3245
                .archival_state()
3246
                .get_tip()
3247
                .await;
3248

3249
            let [block_1, block_2, block_3, block_4, block_5] =
3250
                fake_valid_sequence_of_blocks_for_tests(
3251
                    &genesis_block,
3252
                    Timestamp::hours(1),
3253
                    StdRng::seed_from_u64(5550001).random(),
3254
                    network,
3255
                )
3256
                .await;
3257
            state_lock.set_new_tip(block_1.clone()).await?;
3258

3259
            let mock = Mock::new(vec![
3260
                Action::Read(PeerMessage::Block(Box::new(
3261
                    block_4.clone().try_into().unwrap(),
3262
                ))),
3263
                Action::Write(PeerMessage::BlockRequestByHash(block_3.hash())),
3264
                Action::Read(PeerMessage::Block(Box::new(
3265
                    block_3.clone().try_into().unwrap(),
3266
                ))),
3267
                Action::Write(PeerMessage::BlockRequestByHash(block_2.hash())),
3268
                //
3269
                // Now make the interruption of the block reconciliation process
3270
                Action::Read(PeerMessage::BlockNotification((&block_5).into())),
3271
                //
3272
                // Complete the block reconciliation process by requesting the last block
3273
                // in this process, to get back to a mutually known block.
3274
                Action::Read(PeerMessage::Block(Box::new(
3275
                    block_2.clone().try_into().unwrap(),
3276
                ))),
3277
                //
3278
                // Then anticipate the request of the block that was announced
3279
                // in the interruption.
3280
                // Note that we cannot anticipate the response, as only the main
3281
                // task writes to the database. And the database needs to be updated
3282
                // for the handling of block 5 to be done correctly.
3283
                Action::Write(PeerMessage::BlockRequestByHeight(
3284
                    block_5.kernel.header.height,
3285
                )),
3286
                Action::Read(PeerMessage::Bye),
3287
            ]);
3288

3289
            let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
3290
                to_main_tx.clone(),
3291
                state_lock.clone(),
3292
                peer_socket_address,
3293
                hsd,
3294
                false,
3295
                1,
3296
                block_5.header().timestamp,
3297
            );
3298
            peer_loop_handler
3299
                .run_wrapper(mock, from_main_rx_clone)
3300
                .await?;
3301

3302
            match to_main_rx1.recv().await {
3303
                Some(PeerTaskToMain::NewBlocks(blocks)) => {
3304
                    if blocks[0].hash() != block_2.hash() {
3305
                        bail!("1st received block by main loop must be block 1");
3306
                    }
3307
                    if blocks[1].hash() != block_3.hash() {
3308
                        bail!("2nd received block by main loop must be block 2");
3309
                    }
3310
                    if blocks[2].hash() != block_4.hash() {
3311
                        bail!("3rd received block by main loop must be block 3");
3312
                    }
3313
                }
3314
                _ => bail!("Did not find msg sent to main task"),
3315
            };
3316
            match to_main_rx1.recv().await {
3317
                Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
3318
                _ => bail!("Must receive remove of peer block max height"),
3319
            }
3320

3321
            if !state_lock.lock_guard().await.net.peer_map.is_empty() {
3322
                bail!("peer map must be empty after closing connection gracefully");
3323
            }
3324

3325
            Ok(())
3326
        }
3327

3328
        #[traced_test]
3329
        #[apply(shared_tokio_runtime)]
3330
        async fn test_block_reconciliation_interrupted_by_peer_list_request() -> Result<()> {
3331
            // In this scenario, the client knows the genesis block (block 0) and block 1, it
3332
            // then receives block 4, meaning that block 3, 2, and 1 will have to be requested.
3333
            // But the requests are interrupted by the peer sending another message: a request
3334
            // for a list of peers.
3335

3336
            let network = Network::Main;
3337
            let (
3338
                _peer_broadcast_tx,
3339
                from_main_rx_clone,
3340
                to_main_tx,
3341
                mut to_main_rx1,
3342
                mut state_lock,
3343
                _hsd,
3344
            ) = get_test_genesis_setup(network, 1, cli_args::Args::default()).await?;
3345
            let genesis_block = Block::genesis(network);
3346
            let peer_infos: Vec<PeerInfo> = state_lock
3347
                .lock_guard()
3348
                .await
3349
                .net
3350
                .peer_map
3351
                .clone()
3352
                .into_values()
3353
                .collect::<Vec<_>>();
3354

3355
            let [block_1, block_2, block_3, block_4] = fake_valid_sequence_of_blocks_for_tests(
3356
                &genesis_block,
3357
                Timestamp::hours(1),
3358
                StdRng::seed_from_u64(5550001).random(),
3359
                network,
3360
            )
3361
            .await;
3362
            state_lock.set_new_tip(block_1.clone()).await?;
3363

3364
            let (hsd_1, sa_1) = get_dummy_peer_connection_data_genesis(network, 1);
3365
            let expected_peer_list_resp = vec![
3366
                (
3367
                    peer_infos[0].listen_address().unwrap(),
3368
                    peer_infos[0].instance_id(),
3369
                ),
3370
                (sa_1, hsd_1.instance_id),
3371
            ];
3372
            let mock = Mock::new(vec![
3373
                Action::Read(PeerMessage::Block(Box::new(
3374
                    block_4.clone().try_into().unwrap(),
3375
                ))),
3376
                Action::Write(PeerMessage::BlockRequestByHash(block_3.hash())),
3377
                Action::Read(PeerMessage::Block(Box::new(
3378
                    block_3.clone().try_into().unwrap(),
3379
                ))),
3380
                Action::Write(PeerMessage::BlockRequestByHash(block_2.hash())),
3381
                //
3382
                // Now make the interruption of the block reconciliation process
3383
                Action::Read(PeerMessage::PeerListRequest),
3384
                //
3385
                // Answer the request for a peer list
3386
                Action::Write(PeerMessage::PeerListResponse(expected_peer_list_resp)),
3387
                //
3388
                // Complete the block reconciliation process by requesting the last block
3389
                // in this process, to get back to a mutually known block.
3390
                Action::Read(PeerMessage::Block(Box::new(
3391
                    block_2.clone().try_into().unwrap(),
3392
                ))),
3393
                Action::Read(PeerMessage::Bye),
3394
            ]);
3395

3396
            let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
3397
                to_main_tx,
3398
                state_lock.clone(),
3399
                sa_1,
3400
                hsd_1,
3401
                true,
3402
                1,
3403
                block_4.header().timestamp,
3404
            );
3405
            peer_loop_handler
3406
                .run_wrapper(mock, from_main_rx_clone)
3407
                .await?;
3408

3409
            // Verify that blocks are sent to `main_loop` in expected ordering
3410
            match to_main_rx1.recv().await {
3411
                Some(PeerTaskToMain::NewBlocks(blocks)) => {
3412
                    if blocks[0].hash() != block_2.hash() {
3413
                        bail!("1st received block by main loop must be block 1");
3414
                    }
3415
                    if blocks[1].hash() != block_3.hash() {
3416
                        bail!("2nd received block by main loop must be block 2");
3417
                    }
3418
                    if blocks[2].hash() != block_4.hash() {
3419
                        bail!("3rd received block by main loop must be block 3");
3420
                    }
3421
                }
3422
                _ => bail!("Did not find msg sent to main task"),
3423
            };
3424

3425
            assert_eq!(
3426
                1,
3427
                state_lock.lock_guard().await.net.peer_map.len(),
3428
                "One peer must remain in peer list after peer_1 closed gracefully"
3429
            );
3430

3431
            Ok(())
3432
        }
3433
    }
3434

3435
    mod transactions {
3436
        use crate::main_loop::proof_upgrader::PrimitiveWitnessToProofCollection;
3437
        use crate::main_loop::proof_upgrader::PrimitiveWitnessToSingleProof;
3438
        use crate::models::blockchain::transaction::primitive_witness::PrimitiveWitness;
3439
        use crate::models::proof_abstractions::tasm::program::TritonVmProofJobOptions;
3440
        use crate::tests::shared::blocks::fake_deterministic_successor;
3441
        use crate::tests::shared::mock_tx::genesis_tx_with_proof_type;
3442
        use crate::triton_vm_job_queue::vm_job_queue;
3443

3444
        use super::*;
3445

3446
        #[traced_test]
3447
        #[apply(shared_tokio_runtime)]
3448
        async fn receive_transaction_request() {
3449
            let network = Network::Main;
3450
            let dummy_tx = invalid_empty_single_proof_transaction();
3451
            let txid = dummy_tx.kernel.txid();
3452

3453
            for transaction_is_known in [false, true] {
3454
                let (_peer_broadcast_tx, from_main_rx, to_main_tx, _, mut state_lock, _hsd) =
3455
                    get_test_genesis_setup(network, 1, cli_args::Args::default())
3456
                        .await
3457
                        .unwrap();
3458
                if transaction_is_known {
3459
                    state_lock
3460
                        .lock_guard_mut()
3461
                        .await
3462
                        .mempool_insert(dummy_tx.clone(), UpgradePriority::Irrelevant)
3463
                        .await;
3464
                }
3465

3466
                let mock = if transaction_is_known {
3467
                    Mock::new(vec![
3468
                        Action::Read(PeerMessage::TransactionRequest(txid)),
3469
                        Action::Write(PeerMessage::Transaction(Box::new(
3470
                            (&dummy_tx).try_into().unwrap(),
3471
                        ))),
3472
                        Action::Read(PeerMessage::Bye),
3473
                    ])
3474
                } else {
3475
                    Mock::new(vec![
3476
                        Action::Read(PeerMessage::TransactionRequest(txid)),
3477
                        Action::Read(PeerMessage::Bye),
3478
                    ])
3479
                };
3480

3481
                let hsd = get_dummy_handshake_data_for_genesis(network);
3482
                let mut peer_state = MutablePeerState::new(hsd.tip_header.height);
3483
                let mut peer_loop_handler = PeerLoopHandler::new(
3484
                    to_main_tx,
3485
                    state_lock,
3486
                    get_dummy_socket_address(0),
3487
                    hsd,
3488
                    true,
3489
                    1,
3490
                );
3491

3492
                peer_loop_handler
3493
                    .run(mock, from_main_rx, &mut peer_state)
3494
                    .await
3495
                    .unwrap();
3496
            }
3497
        }
3498

3499
        #[traced_test]
3500
        #[apply(shared_tokio_runtime)]
3501
        async fn empty_mempool_request_tx_test() {
3502
            // In this scenario the client receives a transaction notification from
3503
            // a peer of a transaction it doesn't know; the client must then request it.
3504

3505
            let network = Network::Main;
3506
            let (
3507
                _peer_broadcast_tx,
3508
                from_main_rx_clone,
3509
                to_main_tx,
3510
                mut to_main_rx1,
3511
                state_lock,
3512
                _hsd,
3513
            ) = get_test_genesis_setup(network, 1, cli_args::Args::default())
3514
                .await
3515
                .unwrap();
3516

3517
            let spending_key = state_lock
3518
                .lock_guard()
3519
                .await
3520
                .wallet_state
3521
                .wallet_entropy
3522
                .nth_symmetric_key_for_tests(0);
3523
            let genesis_block = Block::genesis(network);
3524
            let now = genesis_block.kernel.header.timestamp;
3525
            let config = TxCreationConfig::default()
3526
                .recover_change_off_chain(spending_key.into())
3527
                .with_prover_capability(TxProvingCapability::ProofCollection);
3528
            let transaction_1: Transaction = state_lock
3529
                .api()
3530
                .tx_initiator_internal()
3531
                .create_transaction(
3532
                    Default::default(),
3533
                    NativeCurrencyAmount::coins(0),
3534
                    now,
3535
                    config,
3536
                )
3537
                .await
3538
                .unwrap()
3539
                .transaction
3540
                .into();
3541

3542
            // Build the resulting transaction notification
3543
            let tx_notification: TransactionNotification = (&transaction_1).try_into().unwrap();
3544
            let mock = Mock::new(vec![
3545
                Action::Read(PeerMessage::TransactionNotification(tx_notification)),
3546
                Action::Write(PeerMessage::TransactionRequest(tx_notification.txid)),
3547
                Action::Read(PeerMessage::Transaction(Box::new(
3548
                    (&transaction_1).try_into().unwrap(),
3549
                ))),
3550
                Action::Read(PeerMessage::Bye),
3551
            ]);
3552

3553
            let (hsd_1, _sa_1) = get_dummy_peer_connection_data_genesis(network, 1);
3554

3555
            // Mock a timestamp to allow transaction to be considered valid
3556
            let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
3557
                to_main_tx,
3558
                state_lock.clone(),
3559
                get_dummy_socket_address(0),
3560
                hsd_1.clone(),
3561
                true,
3562
                1,
3563
                now,
3564
            );
3565

3566
            let mut peer_state = MutablePeerState::new(hsd_1.tip_header.height);
3567

3568
            assert!(
3569
                state_lock.lock_guard().await.mempool.is_empty(),
3570
                "Mempool must be empty at init"
3571
            );
3572
            peer_loop_handler
3573
                .run(mock, from_main_rx_clone, &mut peer_state)
3574
                .await
3575
                .unwrap();
3576

3577
            // Transaction must be sent to `main_loop`. The transaction is stored to the mempool
3578
            // by the `main_loop`.
3579
            match to_main_rx1.recv().await {
3580
                Some(PeerTaskToMain::Transaction(_)) => (),
3581
                _ => panic!("Must receive remove of peer block max height"),
3582
            };
3583
        }
3584

3585
        #[traced_test]
3586
        #[apply(shared_tokio_runtime)]
3587
        async fn populated_mempool_request_tx_test() -> Result<()> {
3588
            // In this scenario the peer is informed of a transaction that it already knows
3589

3590
            let network = Network::Main;
3591
            let (
3592
                _peer_broadcast_tx,
3593
                from_main_rx_clone,
3594
                to_main_tx,
3595
                mut to_main_rx1,
3596
                mut state_lock,
3597
                _hsd,
3598
            ) = get_test_genesis_setup(network, 1, cli_args::Args::default())
3599
                .await
3600
                .unwrap();
3601
            let spending_key = state_lock
3602
                .lock_guard()
3603
                .await
3604
                .wallet_state
3605
                .wallet_entropy
3606
                .nth_symmetric_key_for_tests(0);
3607

3608
            let genesis_block = Block::genesis(network);
3609
            let now = genesis_block.kernel.header.timestamp;
3610
            let config = TxCreationConfig::default()
3611
                .recover_change_off_chain(spending_key.into())
3612
                .with_prover_capability(TxProvingCapability::ProofCollection);
3613
            let transaction_1: Transaction = state_lock
3614
                .api()
3615
                .tx_initiator_internal()
3616
                .create_transaction(
3617
                    Default::default(),
3618
                    NativeCurrencyAmount::coins(0),
3619
                    now,
3620
                    config,
3621
                )
3622
                .await
3623
                .unwrap()
3624
                .transaction
3625
                .into();
3626

3627
            let (hsd_1, _sa_1) = get_dummy_peer_connection_data_genesis(network, 1);
3628
            let mut peer_loop_handler = PeerLoopHandler::new(
3629
                to_main_tx,
3630
                state_lock.clone(),
3631
                get_dummy_socket_address(0),
3632
                hsd_1.clone(),
3633
                true,
3634
                1,
3635
            );
3636
            let mut peer_state = MutablePeerState::new(hsd_1.tip_header.height);
3637

3638
            assert!(
3639
                state_lock.lock_guard().await.mempool.is_empty(),
3640
                "Mempool must be empty at init"
3641
            );
3642
            state_lock
3643
                .lock_guard_mut()
3644
                .await
3645
                .mempool_insert(transaction_1.clone(), UpgradePriority::Irrelevant)
3646
                .await;
3647
            assert!(
3648
                !state_lock.lock_guard().await.mempool.is_empty(),
3649
                "Mempool must be non-empty after insertion"
3650
            );
3651

3652
            // Run the peer loop and verify expected exchange -- namely that the
3653
            // tx notification is received and the the transaction is *not*
3654
            // requested.
3655
            let tx_notification: TransactionNotification = (&transaction_1).try_into().unwrap();
3656
            let mock = Mock::new(vec![
3657
                Action::Read(PeerMessage::TransactionNotification(tx_notification)),
3658
                Action::Read(PeerMessage::Bye),
3659
            ]);
3660
            peer_loop_handler
3661
                .run(mock, from_main_rx_clone, &mut peer_state)
3662
                .await
3663
                .unwrap();
3664

3665
            // nothing is allowed to be sent to `main_loop`
3666
            match to_main_rx1.try_recv() {
3667
                Err(TryRecvError::Empty) => (),
3668
                Err(TryRecvError::Disconnected) => panic!("to_main channel must still be open"),
3669
                Ok(_) => panic!("to_main channel must be empty"),
3670
            };
3671
            Ok(())
3672
        }
3673

3674
        #[traced_test]
3675
        #[apply(shared_tokio_runtime)]
3676
        async fn accepts_tx_with_updated_mutator_set() {
3677
            // Scenario: node has transaction in mempool and receives
3678
            // transaction notification for the same transaction with an updated
3679
            // mutator set. The node must request the new transaction and it
3680
            // must be passed on to main loop.
3681
            //
3682
            // Both ProofCollection and SingleProof backed transactions are
3683
            // tested.
3684

3685
            enum ProofType {
3686
                ProofCollection,
3687
                SingleProof,
3688
            }
3689

3690
            let network = Network::Main;
3691

3692
            for proof_type in [ProofType::ProofCollection, ProofType::SingleProof] {
3693
                let proof_job_options = TritonVmProofJobOptions::default();
3694
                let upgrade = async |primitive_witness: PrimitiveWitness| match proof_type {
3695
                    ProofType::ProofCollection => {
3696
                        PrimitiveWitnessToProofCollection { primitive_witness }
3697
                            .upgrade(vm_job_queue(), &proof_job_options)
3698
                            .await
3699
                            .unwrap()
3700
                    }
3701
                    ProofType::SingleProof => PrimitiveWitnessToSingleProof { primitive_witness }
3702
                        .upgrade(vm_job_queue(), &proof_job_options)
3703
                        .await
3704
                        .unwrap(),
3705
                };
3706

3707
                let (
3708
                    _peer_broadcast_tx,
3709
                    from_main_rx_clone,
3710
                    to_main_tx,
3711
                    mut to_main_rx1,
3712
                    mut state_lock,
3713
                    _hsd,
3714
                ) = get_test_genesis_setup(network, 1, cli_args::Args::default())
3715
                    .await
3716
                    .unwrap();
3717
                let fee = NativeCurrencyAmount::from_nau(500);
3718
                let pw_genesis =
3719
                    genesis_tx_with_proof_type(TxProvingCapability::PrimitiveWitness, network, fee)
3720
                        .await
3721
                        .proof
3722
                        .clone()
3723
                        .into_primitive_witness();
3724

3725
                let tx_synced_to_genesis = upgrade(pw_genesis.clone()).await;
3726

3727
                let genesis_block = Block::genesis(network);
3728
                let block1 = fake_deterministic_successor(&genesis_block, network).await;
3729
                state_lock
3730
                    .lock_guard_mut()
3731
                    .await
3732
                    .set_new_tip(block1.clone())
3733
                    .await
3734
                    .unwrap();
3735

3736
                state_lock
3737
                    .lock_guard_mut()
3738
                    .await
3739
                    .mempool_insert(tx_synced_to_genesis, UpgradePriority::Irrelevant)
3740
                    .await;
3741

3742
                // Mempool should now contain the unsynced transaction. Tip is block 1.
3743
                let pw_block1 =
3744
                    pw_genesis.update_with_new_ms_data(block1.mutator_set_update().unwrap());
3745
                let tx_synced_to_block1 = upgrade(pw_block1).await;
3746

3747
                let tx_notification: TransactionNotification =
3748
                    (&tx_synced_to_block1).try_into().unwrap();
3749
                let mock = Mock::new(vec![
3750
                    Action::Read(PeerMessage::TransactionNotification(tx_notification)),
3751
                    Action::Write(PeerMessage::TransactionRequest(tx_notification.txid)),
3752
                    Action::Read(PeerMessage::Transaction(Box::new(
3753
                        (&tx_synced_to_block1).try_into().unwrap(),
3754
                    ))),
3755
                    Action::Read(PeerMessage::Bye),
3756
                ]);
3757

3758
                // Mock a timestamp to allow transaction to be considered valid
3759
                let now = tx_synced_to_block1.kernel.timestamp;
3760
                let (hsd_1, _sa_1) = get_dummy_peer_connection_data_genesis(network, 1);
3761
                let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
3762
                    to_main_tx,
3763
                    state_lock.clone(),
3764
                    get_dummy_socket_address(0),
3765
                    hsd_1.clone(),
3766
                    true,
3767
                    1,
3768
                    now,
3769
                );
3770

3771
                let mut peer_state = MutablePeerState::new(hsd_1.tip_header.height);
3772
                peer_loop_handler
3773
                    .run(mock, from_main_rx_clone, &mut peer_state)
3774
                    .await
3775
                    .unwrap();
3776

3777
                // Transaction must be sent to `main_loop`. The transaction is stored to the mempool
3778
                // by the `main_loop`.
3779
                match to_main_rx1.recv().await {
3780
                    Some(PeerTaskToMain::Transaction(_)) => (),
3781
                    _ => panic!("Main loop must receive new transaction"),
3782
                };
3783
            }
3784
        }
3785
    }
3786

3787
    mod block_proposals {
3788
        use super::*;
3789

3790
        struct TestSetup {
3791
            peer_loop_handler: PeerLoopHandler,
3792
            to_main_rx: mpsc::Receiver<PeerTaskToMain>,
3793
            from_main_rx: broadcast::Receiver<MainToPeerTask>,
3794
            peer_state: MutablePeerState,
3795
            to_main_tx: mpsc::Sender<PeerTaskToMain>,
3796
            genesis_block: Block,
3797
            peer_broadcast_tx: broadcast::Sender<MainToPeerTask>,
3798
        }
3799

3800
        async fn genesis_setup(network: Network) -> TestSetup {
3801
            let (peer_broadcast_tx, from_main_rx, to_main_tx, to_main_rx, alice, _hsd) =
3802
                get_test_genesis_setup(network, 0, cli_args::Args::default())
3803
                    .await
3804
                    .unwrap();
3805
            let peer_hsd = get_dummy_handshake_data_for_genesis(network);
3806
            let peer_loop_handler = PeerLoopHandler::new(
3807
                to_main_tx.clone(),
3808
                alice.clone(),
3809
                get_dummy_socket_address(0),
3810
                peer_hsd.clone(),
3811
                true,
3812
                1,
3813
            );
3814
            let peer_state = MutablePeerState::new(peer_hsd.tip_header.height);
3815

3816
            // (peer_loop_handler, to_main_rx1)
3817
            TestSetup {
3818
                peer_broadcast_tx,
3819
                peer_loop_handler,
3820
                to_main_rx,
3821
                from_main_rx,
3822
                peer_state,
3823
                to_main_tx,
3824
                genesis_block: Block::genesis(network),
3825
            }
3826
        }
3827

3828
        #[traced_test]
3829
        #[apply(shared_tokio_runtime)]
3830
        async fn accept_block_proposal_height_one() {
3831
            // Node knows genesis block, receives a block proposal for block 1
3832
            // and must accept this. Verify that main loop is informed of block
3833
            // proposal.
3834
            let TestSetup {
3835
                peer_broadcast_tx,
3836
                mut peer_loop_handler,
3837
                mut to_main_rx,
3838
                from_main_rx,
3839
                mut peer_state,
3840
                to_main_tx,
3841
                genesis_block,
3842
            } = genesis_setup(Network::Main).await;
3843
            let block1 = fake_valid_block_for_tests(
3844
                &peer_loop_handler.global_state_lock,
3845
                StdRng::seed_from_u64(5550001).random(),
3846
            )
3847
            .await;
3848

3849
            let mock = Mock::new(vec![
3850
                Action::Read(PeerMessage::BlockProposal(Box::new(block1))),
3851
                Action::Read(PeerMessage::Bye),
3852
            ]);
3853
            peer_loop_handler
3854
                .run(mock, from_main_rx, &mut peer_state)
3855
                .await
3856
                .unwrap();
3857

3858
            match to_main_rx.try_recv().unwrap() {
3859
                PeerTaskToMain::BlockProposal(block) => {
3860
                    assert_eq!(genesis_block.hash(), block.header().prev_block_digest);
3861
                }
3862
                _ => panic!("Expected main loop to be informed of block proposal"),
3863
            };
3864

3865
            drop(to_main_tx);
3866
            drop(peer_broadcast_tx);
3867
        }
3868

3869
        #[traced_test]
3870
        #[apply(shared_tokio_runtime)]
3871
        async fn accept_block_proposal_notification_height_one() {
3872
            // Node knows genesis block, receives a block proposal notification
3873
            // for block 1 and must accept this by requesting the block
3874
            // proposal from peer.
3875
            let TestSetup {
3876
                peer_broadcast_tx,
3877
                mut peer_loop_handler,
3878
                to_main_rx: _,
3879
                from_main_rx,
3880
                mut peer_state,
3881
                to_main_tx,
3882
                ..
3883
            } = genesis_setup(Network::Main).await;
3884
            let block1 = fake_valid_block_for_tests(
3885
                &peer_loop_handler.global_state_lock,
3886
                StdRng::seed_from_u64(5550001).random(),
3887
            )
3888
            .await;
3889

3890
            let mock = Mock::new(vec![
3891
                Action::Read(PeerMessage::BlockProposalNotification((&block1).into())),
3892
                Action::Write(PeerMessage::BlockProposalRequest(
3893
                    BlockProposalRequest::new(block1.body().mast_hash()),
3894
                )),
3895
                Action::Read(PeerMessage::Bye),
3896
            ]);
3897
            peer_loop_handler
3898
                .run(mock, from_main_rx, &mut peer_state)
3899
                .await
3900
                .unwrap();
3901

3902
            drop(to_main_tx);
3903
            drop(peer_broadcast_tx);
3904
        }
3905
    }
3906

3907
    mod proof_qualities {
3908
        use strum::IntoEnumIterator;
3909

3910
        use super::*;
3911
        use crate::config_models::cli_args;
3912
        use crate::models::blockchain::transaction::Transaction;
3913
        use crate::models::peer::transfer_transaction::TransactionProofQuality;
3914
        use crate::models::state::wallet::transaction_output::TxOutput;
3915
        use crate::tests::shared::globalstate::mock_genesis_global_state;
3916

3917
        async fn tx_of_proof_quality(
3918
            network: Network,
3919
            quality: TransactionProofQuality,
3920
        ) -> Transaction {
3921
            let wallet_secret = WalletEntropy::devnet_wallet();
3922
            let alice_key = wallet_secret.nth_generation_spending_key_for_tests(0);
3923
            let alice_gsl = mock_genesis_global_state(
3924
                1,
3925
                wallet_secret,
3926
                cli_args::Args::default_with_network(network),
3927
            )
3928
            .await;
3929
            let alice = alice_gsl.lock_guard().await;
3930
            let genesis_block = alice.chain.light_state();
3931
            let in_seven_months = genesis_block.header().timestamp + Timestamp::months(7);
3932
            let prover_capability = match quality {
3933
                TransactionProofQuality::ProofCollection => TxProvingCapability::ProofCollection,
3934
                TransactionProofQuality::SingleProof => TxProvingCapability::SingleProof,
3935
            };
3936
            let config = TxCreationConfig::default()
3937
                .recover_change_off_chain(alice_key.into())
3938
                .with_prover_capability(prover_capability);
3939
            alice_gsl
3940
                .api()
3941
                .tx_initiator_internal()
3942
                .create_transaction(
3943
                    Vec::<TxOutput>::new().into(),
3944
                    NativeCurrencyAmount::coins(1),
3945
                    in_seven_months,
3946
                    config,
3947
                )
3948
                .await
3949
                .unwrap()
3950
                .transaction()
3951
                .clone()
3952
        }
3953

3954
        #[traced_test]
3955
        #[apply(shared_tokio_runtime)]
3956
        async fn client_favors_higher_proof_quality() {
3957
            // In this scenario the peer is informed of a transaction that it
3958
            // already knows, and it's tested that it checks the proof quality
3959
            // field and verifies that it exceeds the proof in the mempool
3960
            // before requesting the transasction.
3961
            let network = Network::Main;
3962
            let proof_collection_tx =
3963
                tx_of_proof_quality(network, TransactionProofQuality::ProofCollection).await;
3964
            let single_proof_tx =
3965
                tx_of_proof_quality(network, TransactionProofQuality::SingleProof).await;
3966

3967
            for (own_tx_pq, new_tx_pq) in
3968
                TransactionProofQuality::iter().cartesian_product(TransactionProofQuality::iter())
3969
            {
3970
                use TransactionProofQuality::*;
3971

3972
                let (
3973
                    _peer_broadcast_tx,
3974
                    from_main_rx_clone,
3975
                    to_main_tx,
3976
                    mut to_main_rx1,
3977
                    mut alice,
3978
                    handshake_data,
3979
                ) = get_test_genesis_setup(network, 1, cli_args::Args::default())
3980
                    .await
3981
                    .unwrap();
3982

3983
                let (own_tx, new_tx) = match (own_tx_pq, new_tx_pq) {
3984
                    (ProofCollection, ProofCollection) => {
3985
                        (&proof_collection_tx, &proof_collection_tx)
3986
                    }
3987
                    (ProofCollection, SingleProof) => (&proof_collection_tx, &single_proof_tx),
3988
                    (SingleProof, ProofCollection) => (&single_proof_tx, &proof_collection_tx),
3989
                    (SingleProof, SingleProof) => (&single_proof_tx, &single_proof_tx),
3990
                };
3991

3992
                alice
3993
                    .lock_guard_mut()
3994
                    .await
3995
                    .mempool_insert((*own_tx).to_owned(), UpgradePriority::Irrelevant)
3996
                    .await;
3997

3998
                let tx_notification: TransactionNotification = new_tx.try_into().unwrap();
3999

4000
                let own_proof_is_supreme = own_tx_pq >= new_tx_pq;
4001
                let mock = if own_proof_is_supreme {
4002
                    Mock::new(vec![
4003
                        Action::Read(PeerMessage::TransactionNotification(tx_notification)),
4004
                        Action::Read(PeerMessage::Bye),
4005
                    ])
4006
                } else {
4007
                    Mock::new(vec![
4008
                        Action::Read(PeerMessage::TransactionNotification(tx_notification)),
4009
                        Action::Write(PeerMessage::TransactionRequest(tx_notification.txid)),
4010
                        Action::Read(PeerMessage::Transaction(Box::new(
4011
                            new_tx.try_into().unwrap(),
4012
                        ))),
4013
                        Action::Read(PeerMessage::Bye),
4014
                    ])
4015
                };
4016

4017
                let now = proof_collection_tx.kernel.timestamp;
4018
                let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
4019
                    to_main_tx,
4020
                    alice.clone(),
4021
                    get_dummy_socket_address(0),
4022
                    handshake_data.clone(),
4023
                    true,
4024
                    1,
4025
                    now,
4026
                );
4027
                let mut peer_state = MutablePeerState::new(handshake_data.tip_header.height);
4028

4029
                peer_loop_handler
4030
                    .run(mock, from_main_rx_clone, &mut peer_state)
4031
                    .await
4032
                    .unwrap();
4033

4034
                if own_proof_is_supreme {
4035
                    match to_main_rx1.try_recv() {
4036
                        Err(TryRecvError::Empty) => (),
4037
                        Err(TryRecvError::Disconnected) => {
4038
                            panic!("to_main channel must still be open")
4039
                        }
4040
                        Ok(_) => panic!("to_main channel must be empty"),
4041
                    }
4042
                } else {
4043
                    match to_main_rx1.try_recv() {
4044
                        Err(TryRecvError::Empty) => panic!("Transaction must be sent to main loop"),
4045
                        Err(TryRecvError::Disconnected) => {
4046
                            panic!("to_main channel must still be open")
4047
                        }
4048
                        Ok(PeerTaskToMain::Transaction(_)) => (),
4049
                        _ => panic!("Unexpected result from channel"),
4050
                    }
4051
                }
4052
            }
4053
        }
4054
    }
4055

4056
    mod sync_challenges {
4057
        use super::*;
4058
        use crate::tests::shared::blocks::fake_valid_sequence_of_blocks_for_tests_dyn;
4059

4060
        #[traced_test]
4061
        #[apply(shared_tokio_runtime)]
4062
        async fn bad_sync_challenge_height_greater_than_tip() {
4063
            // Criterium: Challenge height may not exceed that of tip in the
4064
            // request.
4065

4066
            let network = Network::Main;
4067
            let (
4068
                _alice_main_to_peer_tx,
4069
                alice_main_to_peer_rx,
4070
                alice_peer_to_main_tx,
4071
                alice_peer_to_main_rx,
4072
                mut alice,
4073
                alice_hsd,
4074
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default())
4075
                .await
4076
                .unwrap();
4077
            let genesis_block: Block = Block::genesis(network);
4078
            let blocks: [Block; 11] = fake_valid_sequence_of_blocks_for_tests(
4079
                &genesis_block,
4080
                Timestamp::hours(1),
4081
                Default::default(),
4082
                network,
4083
            )
4084
            .await;
4085
            for block in &blocks {
4086
                alice.set_new_tip(block.clone()).await.unwrap();
4087
            }
4088

4089
            let bh12 = blocks.last().unwrap().header().height;
4090
            let sync_challenge = SyncChallenge {
4091
                tip_digest: blocks[9].hash(),
4092
                challenges: [bh12; 10],
4093
            };
4094
            let alice_p2p_messages = Mock::new(vec![
4095
                Action::Read(PeerMessage::SyncChallenge(sync_challenge)),
4096
                Action::Read(PeerMessage::Bye),
4097
            ]);
4098

4099
            let peer_address = get_dummy_socket_address(0);
4100
            let mut alice_peer_loop_handler = PeerLoopHandler::new(
4101
                alice_peer_to_main_tx.clone(),
4102
                alice.clone(),
4103
                peer_address,
4104
                alice_hsd,
4105
                false,
4106
                1,
4107
            );
4108
            alice_peer_loop_handler
4109
                .run_wrapper(alice_p2p_messages, alice_main_to_peer_rx)
4110
                .await
4111
                .unwrap();
4112

4113
            drop(alice_peer_to_main_rx);
4114

4115
            let latest_sanction = alice
4116
                .lock_guard()
4117
                .await
4118
                .net
4119
                .get_peer_standing_from_database(peer_address.ip())
4120
                .await
4121
                .unwrap();
4122
            assert_eq!(
4123
                NegativePeerSanction::InvalidSyncChallenge,
4124
                latest_sanction
4125
                    .latest_punishment
4126
                    .expect("peer must be sanctioned")
4127
                    .0
4128
            );
4129
        }
4130

4131
        #[traced_test]
4132
        #[apply(shared_tokio_runtime)]
4133
        async fn bad_sync_challenge_genesis_block_doesnt_crash_client() {
4134
            // Criterium: Challenge may not point to genesis block, or block 1, as
4135
            // tip.
4136

4137
            let network = Network::Main;
4138
            let genesis_block: Block = Block::genesis(network);
4139

4140
            let alice_cli = cli_args::Args::default();
4141
            let (
4142
                _alice_main_to_peer_tx,
4143
                alice_main_to_peer_rx,
4144
                alice_peer_to_main_tx,
4145
                alice_peer_to_main_rx,
4146
                alice,
4147
                alice_hsd,
4148
            ) = get_test_genesis_setup(network, 0, alice_cli).await.unwrap();
4149

4150
            let sync_challenge = SyncChallenge {
4151
                tip_digest: genesis_block.hash(),
4152
                challenges: [BlockHeight::genesis(); 10],
4153
            };
4154

4155
            let alice_p2p_messages = Mock::new(vec![
4156
                Action::Read(PeerMessage::SyncChallenge(sync_challenge)),
4157
                Action::Read(PeerMessage::Bye),
4158
            ]);
4159

4160
            let peer_address = get_dummy_socket_address(0);
4161
            let mut alice_peer_loop_handler = PeerLoopHandler::new(
4162
                alice_peer_to_main_tx.clone(),
4163
                alice.clone(),
4164
                peer_address,
4165
                alice_hsd,
4166
                false,
4167
                1,
4168
            );
4169
            alice_peer_loop_handler
4170
                .run_wrapper(alice_p2p_messages, alice_main_to_peer_rx)
4171
                .await
4172
                .unwrap();
4173

4174
            drop(alice_peer_to_main_rx);
4175

4176
            let latest_sanction = alice
4177
                .lock_guard()
4178
                .await
4179
                .net
4180
                .get_peer_standing_from_database(peer_address.ip())
4181
                .await
4182
                .unwrap();
4183
            assert_eq!(
4184
                NegativePeerSanction::InvalidSyncChallenge,
4185
                latest_sanction
4186
                    .latest_punishment
4187
                    .expect("peer must be sanctioned")
4188
                    .0
4189
            );
4190
        }
4191

4192
        #[traced_test]
4193
        #[apply(shared_tokio_runtime)]
4194
        async fn sync_challenge_happy_path() -> Result<()> {
4195
            // Bob notifies Alice of a block whose parameters satisfy the sync mode
4196
            // criterion. Alice issues a challenge. Bob responds. Alice enters into
4197
            // sync mode.
4198

4199
            let mut rng = rand::rng();
4200
            let network = Network::Main;
4201
            let genesis_block: Block = Block::genesis(network);
4202

4203
            const ALICE_SYNC_MODE_THRESHOLD: usize = 10;
4204
            let alice_cli = cli_args::Args {
4205
                sync_mode_threshold: ALICE_SYNC_MODE_THRESHOLD,
4206
                ..Default::default()
4207
            };
4208
            let (
4209
                _alice_main_to_peer_tx,
4210
                alice_main_to_peer_rx,
4211
                alice_peer_to_main_tx,
4212
                mut alice_peer_to_main_rx,
4213
                mut alice,
4214
                alice_hsd,
4215
            ) = get_test_genesis_setup(network, 0, alice_cli).await?;
4216
            let _alice_socket_address = get_dummy_socket_address(0);
4217

4218
            let (
4219
                _bob_main_to_peer_tx,
4220
                _bob_main_to_peer_rx,
4221
                _bob_peer_to_main_tx,
4222
                _bob_peer_to_main_rx,
4223
                mut bob,
4224
                _bob_hsd,
4225
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
4226
            let bob_socket_address = get_dummy_socket_address(0);
4227

4228
            let now = genesis_block.header().timestamp + Timestamp::hours(1);
4229
            let block_1 = fake_valid_block_for_tests(&alice, rng.random()).await;
4230
            assert!(
4231
                block_1.is_valid(&genesis_block, now, network).await,
4232
                "Block must be valid for this test to make sense"
4233
            );
4234
            let alice_tip = &block_1;
4235
            alice.set_new_tip(block_1.clone()).await?;
4236
            bob.set_new_tip(block_1.clone()).await?;
4237

4238
            // produce enough blocks to ensure alice needs to go into sync mode
4239
            // with this block notification.
4240
            let blocks = fake_valid_sequence_of_blocks_for_tests_dyn(
4241
                &block_1,
4242
                network.target_block_interval(),
4243
                (0..rng.random_range(ALICE_SYNC_MODE_THRESHOLD + 1..20))
4244
                    .map(|_| rng.random())
4245
                    .collect_vec(),
4246
                network,
4247
            )
4248
            .await;
4249
            for block in &blocks {
4250
                bob.set_new_tip(block.clone()).await?;
4251
            }
4252
            let bob_tip = blocks.last().unwrap();
4253

4254
            let block_notification_from_bob = PeerBlockNotification {
4255
                hash: bob_tip.hash(),
4256
                height: bob_tip.header().height,
4257
                cumulative_proof_of_work: bob_tip.header().cumulative_proof_of_work,
4258
            };
4259

4260
            let alice_rng_seed = rng.random::<[u8; 32]>();
4261
            let mut alice_rng_clone = StdRng::from_seed(alice_rng_seed);
4262
            let sync_challenge_from_alice = SyncChallenge::generate(
4263
                &block_notification_from_bob,
4264
                alice_tip.header().height,
4265
                alice_rng_clone.random(),
4266
            );
4267

4268
            println!(
4269
                "sync challenge from alice:\n{:?}",
4270
                sync_challenge_from_alice
4271
            );
4272

4273
            let sync_challenge_response_from_bob = bob
4274
                .lock_guard()
4275
                .await
4276
                .response_to_sync_challenge(sync_challenge_from_alice)
4277
                .await
4278
                .expect("should be able to respond to sync challenge");
4279

4280
            let alice_p2p_messages = Mock::new(vec![
4281
                Action::Read(PeerMessage::BlockNotification(block_notification_from_bob)),
4282
                Action::Write(PeerMessage::SyncChallenge(sync_challenge_from_alice)),
4283
                Action::Read(PeerMessage::SyncChallengeResponse(Box::new(
4284
                    sync_challenge_response_from_bob,
4285
                ))),
4286
                Action::Read(PeerMessage::BlockNotification(block_notification_from_bob)),
4287
                // Ensure no 2nd sync challenge is sent, as timeout has not yet passed.
4288
                // The absence of a Write here checks that a 2nd challenge isn't sent
4289
                // when a successful was just received.
4290
                Action::Read(PeerMessage::Bye),
4291
            ]);
4292

4293
            let mut alice_peer_loop_handler = PeerLoopHandler::with_mocked_time(
4294
                alice_peer_to_main_tx.clone(),
4295
                alice.clone(),
4296
                bob_socket_address,
4297
                alice_hsd,
4298
                false,
4299
                1,
4300
                bob_tip.header().timestamp,
4301
            );
4302
            alice_peer_loop_handler.set_rng(StdRng::from_seed(alice_rng_seed));
4303
            alice_peer_loop_handler
4304
                .run_wrapper(alice_p2p_messages, alice_main_to_peer_rx)
4305
                .await?;
4306

4307
            // AddPeerMaxBlockHeight message triggered *after* sync challenge
4308
            let mut expected_anchor_mmra = bob_tip.body().block_mmr_accumulator.clone();
4309
            expected_anchor_mmra.append(bob_tip.hash());
4310
            let expected_message_from_alice_peer_loop = PeerTaskToMain::AddPeerMaxBlockHeight {
4311
                peer_address: bob_socket_address,
4312
                claimed_height: bob_tip.header().height,
4313
                claimed_cumulative_pow: bob_tip.header().cumulative_proof_of_work,
4314
                claimed_block_mmra: expected_anchor_mmra,
4315
            };
4316
            let observed_message_from_alice_peer_loop = alice_peer_to_main_rx.recv().await.unwrap();
4317
            assert_eq!(
4318
                expected_message_from_alice_peer_loop,
4319
                observed_message_from_alice_peer_loop
4320
            );
4321

4322
            Ok(())
4323
        }
4324
    }
4325
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc