• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Neptune-Crypto / neptune-core / 15570342326

10 Jun 2025 09:09PM UTC coverage: 71.76% (-0.09%) from 71.85%
15570342326

Pull #609

github

web-flow
Merge 6d3fd907c into e78ccf5b5
Pull Request #609: feat: utxo_origin_block call

0 of 17 new or added lines in 1 file covered. (0.0%)

13 existing lines in 2 files now uncovered.

20171 of 28109 relevant lines covered (71.76%)

415231.47 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.47
/src/peer_loop.rs
1
use std::cmp;
2
use std::marker::Unpin;
3
use std::net::SocketAddr;
4
use std::time::SystemTime;
5

6
use anyhow::bail;
7
use anyhow::Result;
8
use chrono::DateTime;
9
use chrono::Utc;
10
use futures::sink::Sink;
11
use futures::sink::SinkExt;
12
use futures::stream::TryStream;
13
use futures::stream::TryStreamExt;
14
use itertools::Itertools;
15
use rand::rngs::StdRng;
16
use rand::Rng;
17
use rand::SeedableRng;
18
use tasm_lib::triton_vm::prelude::Digest;
19
use tasm_lib::twenty_first::prelude::Mmr;
20
use tasm_lib::twenty_first::prelude::MmrMembershipProof;
21
use tasm_lib::twenty_first::util_types::mmr::mmr_accumulator::MmrAccumulator;
22
use tokio::select;
23
use tokio::sync::broadcast;
24
use tokio::sync::mpsc;
25
use tracing::debug;
26
use tracing::error;
27
use tracing::info;
28
use tracing::warn;
29

30
use crate::connect_to_peers::close_peer_connected_callback;
31
use crate::macros::fn_name;
32
use crate::macros::log_slow_scope;
33
use crate::main_loop::MAX_NUM_DIGESTS_IN_BATCH_REQUEST;
34
use crate::models::blockchain::block::block_height::BlockHeight;
35
use crate::models::blockchain::block::mutator_set_update::MutatorSetUpdate;
36
use crate::models::blockchain::block::Block;
37
use crate::models::blockchain::transaction::transaction_kernel::TransactionConfirmabilityError;
38
use crate::models::blockchain::transaction::Transaction;
39
use crate::models::channel::MainToPeerTask;
40
use crate::models::channel::PeerTaskToMain;
41
use crate::models::channel::PeerTaskToMainTransaction;
42
use crate::models::peer::handshake_data::HandshakeData;
43
use crate::models::peer::peer_info::PeerConnectionInfo;
44
use crate::models::peer::peer_info::PeerInfo;
45
use crate::models::peer::transfer_block::TransferBlock;
46
use crate::models::peer::BlockProposalRequest;
47
use crate::models::peer::BlockRequestBatch;
48
use crate::models::peer::IssuedSyncChallenge;
49
use crate::models::peer::MutablePeerState;
50
use crate::models::peer::NegativePeerSanction;
51
use crate::models::peer::PeerMessage;
52
use crate::models::peer::PeerSanction;
53
use crate::models::peer::PeerStanding;
54
use crate::models::peer::PositivePeerSanction;
55
use crate::models::peer::SyncChallenge;
56
use crate::models::proof_abstractions::mast_hash::MastHash;
57
use crate::models::proof_abstractions::timestamp::Timestamp;
58
use crate::models::state::block_proposal::BlockProposalRejectError;
59
use crate::models::state::mempool::MEMPOOL_IGNORE_TRANSACTIONS_THIS_MANY_SECS_AHEAD;
60
use crate::models::state::mempool::MEMPOOL_TX_THRESHOLD_AGE_IN_SECS;
61
use crate::models::state::GlobalState;
62
use crate::models::state::GlobalStateLock;
63
use crate::util_types::mutator_set::removal_record::RemovalRecordValidityError;
64

65
const STANDARD_BLOCK_BATCH_SIZE: usize = 250;
66
const MAX_PEER_LIST_LENGTH: usize = 10;
67
const MINIMUM_BLOCK_BATCH_SIZE: usize = 2;
68

69
const KEEP_CONNECTION_ALIVE: bool = false;
70
const DISCONNECT_CONNECTION: bool = true;
71

72
pub type PeerStandingNumber = i32;
73

74
/// Handles messages from peers via TCP
75
///
76
/// also handles messages from main task over the main-to-peer-tasks broadcast
77
/// channel.
78
#[derive(Debug, Clone)]
79
pub struct PeerLoopHandler {
80
    to_main_tx: mpsc::Sender<PeerTaskToMain>,
81
    global_state_lock: GlobalStateLock,
82
    peer_address: SocketAddr,
83
    peer_handshake_data: HandshakeData,
84
    inbound_connection: bool,
85
    distance: u8,
86
    rng: StdRng,
87
    #[cfg(test)]
88
    mock_now: Option<Timestamp>,
89
}
90

91
impl PeerLoopHandler {
92
    pub(crate) fn new(
28✔
93
        to_main_tx: mpsc::Sender<PeerTaskToMain>,
28✔
94
        global_state_lock: GlobalStateLock,
28✔
95
        peer_address: SocketAddr,
28✔
96
        peer_handshake_data: HandshakeData,
28✔
97
        inbound_connection: bool,
28✔
98
        distance: u8,
28✔
99
    ) -> Self {
28✔
100
        Self {
28✔
101
            to_main_tx,
28✔
102
            global_state_lock,
28✔
103
            peer_address,
28✔
104
            peer_handshake_data,
28✔
105
            inbound_connection,
28✔
106
            distance,
28✔
107
            rng: StdRng::from_rng(&mut rand::rng()),
28✔
108
            #[cfg(test)]
28✔
109
            mock_now: None,
28✔
110
        }
28✔
111
    }
28✔
112

113
    /// Allows for mocked timestamps such that time dependencies may be tested.
114
    #[cfg(test)]
115
    pub(crate) fn with_mocked_time(
20✔
116
        to_main_tx: mpsc::Sender<PeerTaskToMain>,
20✔
117
        global_state_lock: GlobalStateLock,
20✔
118
        peer_address: SocketAddr,
20✔
119
        peer_handshake_data: HandshakeData,
20✔
120
        inbound_connection: bool,
20✔
121
        distance: u8,
20✔
122
        mocked_time: Timestamp,
20✔
123
    ) -> Self {
20✔
124
        Self {
20✔
125
            to_main_tx,
20✔
126
            global_state_lock,
20✔
127
            peer_address,
20✔
128
            peer_handshake_data,
20✔
129
            inbound_connection,
20✔
130
            distance,
20✔
131
            mock_now: Some(mocked_time),
20✔
132
            rng: StdRng::from_rng(&mut rand::rng()),
20✔
133
        }
20✔
134
    }
20✔
135

136
    /// Overwrite the random number generator object with a specific one.
137
    ///
138
    /// Useful for derandomizing tests.
139
    #[cfg(test)]
140
    fn set_rng(&mut self, rng: StdRng) {
1✔
141
        self.rng = rng;
1✔
142
    }
1✔
143

144
    fn now(&self) -> Timestamp {
54✔
145
        #[cfg(not(test))]
146
        {
147
            Timestamp::now()
27✔
148
        }
149
        #[cfg(test)]
150
        {
151
            self.mock_now.unwrap_or(Timestamp::now())
27✔
152
        }
153
    }
54✔
154

155
    /// Punish a peer for bad behavior.
156
    ///
157
    /// Return `Err` if the peer in question is (now) banned.
158
    ///
159
    /// # Locking:
160
    ///   * acquires `global_state_lock` for write
161
    async fn punish(&mut self, reason: NegativePeerSanction) -> Result<()> {
6✔
162
        let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
6✔
163
        warn!("Punishing peer {} for {:?}", self.peer_address.ip(), reason);
6✔
164
        debug!(
6✔
165
            "Peer standing before punishment is {}",
×
166
            global_state_mut
×
167
                .net
×
168
                .peer_map
×
169
                .get(&self.peer_address)
×
170
                .unwrap()
×
171
                .standing
172
        );
173

174
        let Some(peer_info) = global_state_mut.net.peer_map.get_mut(&self.peer_address) else {
6✔
175
            bail!("Could not read peer map.");
×
176
        };
177
        let sanction_result = peer_info.standing.sanction(PeerSanction::Negative(reason));
6✔
178
        if let Err(err) = sanction_result {
6✔
179
            warn!("Banning peer: {err}");
1✔
180
        }
5✔
181

182
        sanction_result.map_err(|err| anyhow::anyhow!("Banning peer: {err}"))
6✔
183
    }
6✔
184

185
    /// Reward a peer for good behavior.
186
    ///
187
    /// Return `Err` if the peer in question is banned.
188
    ///
189
    /// # Locking:
190
    ///   * acquires `global_state_lock` for write
191
    async fn reward(&mut self, reason: PositivePeerSanction) -> Result<()> {
19✔
192
        let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
19✔
193
        info!("Rewarding peer {} for {:?}", self.peer_address.ip(), reason);
19✔
194
        let Some(peer_info) = global_state_mut.net.peer_map.get_mut(&self.peer_address) else {
19✔
195
            error!("Could not read peer map.");
1✔
196
            return Ok(());
1✔
197
        };
198
        let sanction_result = peer_info.standing.sanction(PeerSanction::Positive(reason));
18✔
199
        if sanction_result.is_err() {
18✔
200
            error!("Cannot reward banned peer");
×
201
        }
18✔
202

203
        sanction_result.map_err(|err| anyhow::anyhow!("Cannot reward banned peer: {err}"))
18✔
204
    }
19✔
205

206
    /// Construct a batch response, with blocks and their MMR membership proofs
207
    /// relative to a specified anchor.
208
    ///
209
    /// Returns `None` if the anchor has a lower leaf count than the blocks, or
210
    /// a block height of the response exceeds own tip height.
211
    async fn batch_response(
16✔
212
        state: &GlobalState,
16✔
213
        blocks: Vec<Block>,
16✔
214
        anchor: &MmrAccumulator,
16✔
215
    ) -> Option<Vec<(TransferBlock, MmrMembershipProof)>> {
16✔
216
        let own_tip_height = state.chain.light_state().header().height;
16✔
217
        let block_heights_match_anchor = blocks
16✔
218
            .iter()
16✔
219
            .all(|bl| bl.header().height < anchor.num_leafs().into());
46✔
220
        let block_heights_known = blocks.iter().all(|bl| bl.header().height <= own_tip_height);
46✔
221
        if !block_heights_match_anchor || !block_heights_known {
16✔
222
            let max_block_height = match blocks.iter().map(|bl| bl.header().height).max() {
×
223
                Some(height) => height.to_string(),
×
224
                None => "None".to_owned(),
×
225
            };
226

227
            debug!("max_block_height: {max_block_height}");
×
228
            debug!("own_tip_height: {own_tip_height}");
×
229
            debug!("anchor.num_leafs(): {}", anchor.num_leafs());
×
230
            debug!("block_heights_match_anchor: {block_heights_match_anchor}");
×
231
            debug!("block_heights_known: {block_heights_known}");
×
232
            return None;
×
233
        }
16✔
234

235
        let mut ret = vec![];
16✔
236
        for block in blocks {
62✔
237
            let mmr_mp = state
46✔
238
                .chain
46✔
239
                .archival_state()
46✔
240
                .archival_block_mmr
46✔
241
                .ammr()
46✔
242
                .prove_membership_relative_to_smaller_mmr(
46✔
243
                    block.header().height.into(),
46✔
244
                    anchor.num_leafs(),
46✔
245
                )
46✔
246
                .await;
46✔
247
            let block: TransferBlock = block.try_into().unwrap();
46✔
248
            ret.push((block, mmr_mp));
46✔
249
        }
250

251
        Some(ret)
16✔
252
    }
16✔
253

254
    /// Handle validation and send all blocks to the main task if they're all
255
    /// valid. Use with a list of blocks or a single block. When the
256
    /// `received_blocks` is a list, the parent of the `i+1`th block in the
257
    /// list is the `i`th block. The parent of element zero in this list is
258
    /// `parent_of_first_block`.
259
    ///
260
    /// # Return Value
261
    ///  - `Err` when the connection should be closed;
262
    ///  - `Ok(None)` if some block is invalid
263
    ///  - `Ok(None)` if the last block has insufficient cumulative PoW and we
264
    ///    are not syncing;
265
    ///  - `Ok(None)` if the last block has insufficient height and we are
266
    ///    syncing;
267
    ///  - `Ok(Some(block_height))` otherwise, referring to the block with the
268
    ///    highest height in the batch.
269
    ///
270
    /// A return value of Ok(Some(_)) means that the message was passed on to
271
    /// main loop.
272
    ///
273
    /// # Locking
274
    ///   * Acquires `global_state_lock` for write via `self.punish(..)` and
275
    ///     `self.reward(..)`.
276
    ///
277
    /// # Panics
278
    ///
279
    ///  - Panics if called with the empty list.
280
    async fn handle_blocks(
8✔
281
        &mut self,
8✔
282
        received_blocks: Vec<Block>,
8✔
283
        parent_of_first_block: Block,
8✔
284
    ) -> Result<Option<BlockHeight>> {
20✔
285
        debug!(
20✔
286
            "attempting to validate {} {}",
×
287
            received_blocks.len(),
×
288
            if received_blocks.len() == 1 {
×
289
                "block"
×
290
            } else {
291
                "blocks"
×
292
            }
293
        );
294
        let now = self.now();
20✔
295
        debug!("validating with respect to current timestamp {now}");
20✔
296
        let mut previous_block = &parent_of_first_block;
20✔
297
        for new_block in &received_blocks {
48✔
298
            let new_block_has_proof_of_work = new_block.has_proof_of_work(
29✔
299
                self.global_state_lock.cli().network,
29✔
300
                previous_block.header(),
29✔
301
            );
302
            debug!("new block has proof of work? {new_block_has_proof_of_work}");
29✔
303
            let new_block_is_valid = new_block
29✔
304
                .is_valid(previous_block, now, self.global_state_lock.cli().network)
29✔
305
                .await;
29✔
306
            debug!("new block is valid? {new_block_is_valid}");
29✔
307
            if !new_block_has_proof_of_work {
29✔
308
                warn!(
1✔
309
                    "Received invalid proof-of-work for block of height {} from peer with IP {}",
×
310
                    new_block.kernel.header.height, self.peer_address
×
311
                );
312
                warn!("Difficulty is {}.", previous_block.kernel.header.difficulty);
1✔
313
                warn!(
1✔
314
                    "Proof of work should be {} (or more) but was [{}].",
×
315
                    previous_block.kernel.header.difficulty.target(),
×
316
                    new_block.hash().values().iter().join(", ")
×
317
                );
318
                self.punish(NegativePeerSanction::InvalidBlock((
1✔
319
                    new_block.kernel.header.height,
1✔
320
                    new_block.hash(),
1✔
321
                )))
1✔
322
                .await?;
1✔
323
                warn!("Failed to validate block due to insufficient PoW");
1✔
324
                return Ok(None);
1✔
325
            } else if !new_block_is_valid {
28✔
326
                warn!(
×
327
                    "Received invalid block of height {} from peer with IP {}",
×
328
                    new_block.kernel.header.height, self.peer_address
×
329
                );
330
                self.punish(NegativePeerSanction::InvalidBlock((
×
331
                    new_block.kernel.header.height,
×
332
                    new_block.hash(),
×
333
                )))
×
334
                .await?;
×
335
                warn!("Failed to validate block: invalid block");
×
336
                return Ok(None);
×
337
            }
28✔
338
            info!(
28✔
339
                "Block with height {} is valid. mined: {}",
×
340
                new_block.kernel.header.height,
×
341
                new_block.kernel.header.timestamp.standard_format()
×
342
            );
343

344
            previous_block = new_block;
28✔
345
        }
346

347
        // evaluate the fork choice rule
348
        debug!("Checking last block's canonicity ...");
19✔
349
        let last_block = received_blocks.last().unwrap();
19✔
350
        let is_canonical = self
19✔
351
            .global_state_lock
19✔
352
            .lock_guard()
19✔
353
            .await
19✔
354
            .incoming_block_is_more_canonical(last_block);
19✔
355
        let last_block_height = last_block.header().height;
19✔
356
        let sync_mode_active_and_have_new_champion = self
19✔
357
            .global_state_lock
19✔
358
            .lock_guard()
19✔
359
            .await
19✔
360
            .net
361
            .sync_anchor
362
            .as_ref()
19✔
363
            .is_some_and(|x| {
19✔
364
                x.champion
×
365
                    .is_none_or(|(height, _)| height < last_block_height)
×
366
            });
×
367
        if !is_canonical && !sync_mode_active_and_have_new_champion {
19✔
368
            warn!(
1✔
369
                "Received {} blocks from peer but incoming blocks are less \
×
370
            canonical than current tip, or current sync-champion.",
×
371
                received_blocks.len()
×
372
            );
373
            return Ok(None);
1✔
374
        }
18✔
375

376
        // Send the new blocks to the main task which handles the state update
377
        // and storage to the database.
378
        let number_of_received_blocks = received_blocks.len();
18✔
379
        self.to_main_tx
18✔
380
            .send(PeerTaskToMain::NewBlocks(received_blocks))
18✔
381
            .await?;
18✔
382
        info!(
18✔
383
            "Updated block info by block from peer. block height {}",
×
384
            last_block_height
385
        );
386

387
        // Valuable, new, hard-to-produce information. Reward peer.
388
        self.reward(PositivePeerSanction::ValidBlocks(number_of_received_blocks))
18✔
389
            .await?;
18✔
390

391
        Ok(Some(last_block_height))
18✔
392
    }
20✔
393

394
    /// Take a single block received from a peer and (attempt to) find a path
395
    /// between the received block and a common ancestor stored in the blocks
396
    /// database.
397
    ///
398
    /// This function attempts to find the parent of the received block, either
399
    /// by searching the database or by requesting it from a peer.
400
    ///  - If the parent is not stored, it is requested from the peer and the
401
    ///    received block is pushed to the fork reconciliation list for later
402
    ///    handling by this function. The fork reconciliation list starts out
403
    ///    empty, but grows as more parents are requested and transmitted.
404
    ///  - If the parent is found in the database, a) block handling continues:
405
    ///    the entire list of fork reconciliation blocks are passed down the
406
    ///    pipeline, potentially leading to a state update; and b) the fork
407
    ///    reconciliation list is cleared.
408
    ///
409
    /// Locking:
410
    ///   * Acquires `global_state_lock` for write via `self.punish(..)` and
411
    ///     `self.reward(..)`.
412
    async fn try_ensure_path<S>(
32✔
413
        &mut self,
32✔
414
        received_block: Box<Block>,
32✔
415
        peer: &mut S,
32✔
416
        peer_state: &mut MutablePeerState,
32✔
417
    ) -> Result<()>
32✔
418
    where
32✔
419
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
32✔
420
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
32✔
421
        <S as TryStream>::Error: std::error::Error,
32✔
422
    {
32✔
423
        // Does the received block match the fork reconciliation list?
424
        let received_block_matches_fork_reconciliation_list = if let Some(successor) =
32✔
425
            peer_state.fork_reconciliation_blocks.last()
32✔
426
        {
427
            let valid = successor
10✔
428
                .is_valid(
10✔
429
                    received_block.as_ref(),
10✔
430
                    self.now(),
10✔
431
                    self.global_state_lock.cli().network,
10✔
432
                )
10✔
433
                .await;
10✔
434
            if !valid {
10✔
435
                warn!(
×
436
                        "Fork reconciliation failed after receiving {} blocks: successor of received block is invalid",
×
437
                        peer_state.fork_reconciliation_blocks.len() + 1
×
438
                    );
439
            }
10✔
440
            valid
10✔
441
        } else {
442
            true
22✔
443
        };
444

445
        // Are we running out of RAM?
446
        let too_many_blocks = peer_state.fork_reconciliation_blocks.len() + 1
32✔
447
            >= self.global_state_lock.cli().sync_mode_threshold;
32✔
448
        if too_many_blocks {
32✔
449
            warn!(
1✔
450
                "Fork reconciliation failed after receiving {} blocks: block count exceeds sync mode threshold",
×
451
                peer_state.fork_reconciliation_blocks.len() + 1
×
452
            );
453
        }
31✔
454

455
        // Block mismatch or too many blocks: abort!
456
        if !received_block_matches_fork_reconciliation_list || too_many_blocks {
32✔
457
            self.punish(NegativePeerSanction::ForkResolutionError((
1✔
458
                received_block.header().height,
1✔
459
                peer_state.fork_reconciliation_blocks.len() as u16,
1✔
460
                received_block.hash(),
1✔
461
            )))
1✔
462
            .await?;
1✔
463
            peer_state.fork_reconciliation_blocks = vec![];
1✔
464
            return Ok(());
1✔
465
        }
31✔
466

467
        // otherwise, append
468
        peer_state.fork_reconciliation_blocks.push(*received_block);
31✔
469

470
        // Try fetch parent
471
        let received_block_header = *peer_state
31✔
472
            .fork_reconciliation_blocks
31✔
473
            .last()
31✔
474
            .unwrap()
31✔
475
            .header();
31✔
476

477
        let parent_digest = received_block_header.prev_block_digest;
31✔
478
        let parent_height = received_block_header.height.previous()
31✔
479
            .expect("transferred block must have previous height because genesis block cannot be transferred");
31✔
480
        debug!("Try ensure path: fetching parent block");
31✔
481
        let parent_block = self
31✔
482
            .global_state_lock
31✔
483
            .lock_guard()
31✔
484
            .await
31✔
485
            .chain
486
            .archival_state()
31✔
487
            .get_block(parent_digest)
31✔
488
            .await?;
31✔
489
        debug!(
31✔
490
            "Completed parent block fetching from DB: {}",
×
491
            if parent_block.is_some() {
×
492
                "found".to_string()
×
493
            } else {
494
                "not found".to_string()
×
495
            }
496
        );
497

498
        // If parent is not known (but not genesis) request it.
499
        let Some(parent_block) = parent_block else {
31✔
500
            if parent_height.is_genesis() {
11✔
501
                peer_state.fork_reconciliation_blocks.clear();
1✔
502
                self.punish(NegativePeerSanction::DifferentGenesis).await?;
1✔
503
                return Ok(());
×
504
            }
10✔
505
            info!(
10✔
506
                "Parent not known: Requesting previous block with height {} from peer",
×
507
                parent_height
508
            );
509

510
            peer.send(PeerMessage::BlockRequestByHash(parent_digest))
10✔
511
                .await?;
10✔
512

513
            return Ok(());
10✔
514
        };
515

516
        // We want to treat the received fork reconciliation blocks (plus the
517
        // received block) in reverse order, from oldest to newest, because
518
        // they were requested from high to low block height.
519
        let mut new_blocks = peer_state.fork_reconciliation_blocks.clone();
20✔
520
        new_blocks.reverse();
20✔
521

522
        // Reset the fork resolution state since we got all the way back to a
523
        // block that we have.
524
        let fork_reconciliation_event = !peer_state.fork_reconciliation_blocks.is_empty();
20✔
525
        peer_state.fork_reconciliation_blocks.clear();
20✔
526

527
        if let Some(new_block_height) = self.handle_blocks(new_blocks, parent_block).await? {
20✔
528
            // If `BlockNotification` was received during a block reconciliation
529
            // event, then the peer might have one (or more (unlikely)) blocks
530
            // that we do not have. We should thus request those blocks.
531
            if fork_reconciliation_event
18✔
532
                && peer_state.highest_shared_block_height > new_block_height
18✔
533
            {
534
                peer.send(PeerMessage::BlockRequestByHeight(
1✔
535
                    peer_state.highest_shared_block_height,
1✔
536
                ))
1✔
537
                .await?;
1✔
538
            }
17✔
539
        }
2✔
540

541
        Ok(())
20✔
542
    }
32✔
543

544
    /// Handle peer messages and returns Ok(true) if connection should be closed.
545
    /// Connection should also be closed if an error is returned.
546
    /// Otherwise, returns OK(false).
547
    ///
548
    /// Locking:
549
    ///   * Acquires `global_state_lock` for read.
550
    ///   * Acquires `global_state_lock` for write via `self.punish(..)` and
551
    ///     `self.reward(..)`.
552
    async fn handle_peer_message<S>(
147✔
553
        &mut self,
147✔
554
        msg: PeerMessage,
147✔
555
        peer: &mut S,
147✔
556
        peer_state_info: &mut MutablePeerState,
147✔
557
    ) -> Result<bool>
147✔
558
    where
147✔
559
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
147✔
560
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
147✔
561
        <S as TryStream>::Error: std::error::Error,
147✔
562
    {
147✔
563
        debug!(
147✔
564
            "Received {} from peer {}",
×
565
            msg.get_type(),
×
566
            self.peer_address
567
        );
568
        match msg {
147✔
569
            PeerMessage::Bye => {
570
                // Note that the current peer is not removed from the global_state.peer_map here
571
                // but that this is done by the caller.
572
                info!("Got bye. Closing connection to peer");
40✔
573
                Ok(DISCONNECT_CONNECTION)
40✔
574
            }
575
            PeerMessage::PeerListRequest => {
576
                let peer_info = {
5✔
577
                    log_slow_scope!(fn_name!() + "::PeerMessage::PeerListRequest");
5✔
578

579
                    // We are interested in the address on which peers accept ingoing connections,
580
                    // not in the address in which they are connected to us. We are only interested in
581
                    // peers that accept incoming connections.
582
                    let mut peer_info: Vec<(SocketAddr, u128)> = self
5✔
583
                        .global_state_lock
5✔
584
                        .lock_guard()
5✔
585
                        .await
5✔
586
                        .net
587
                        .peer_map
588
                        .values()
5✔
589
                        .filter(|peer_info| peer_info.listen_address().is_some())
8✔
590
                        .take(MAX_PEER_LIST_LENGTH) // limit length of response
5✔
591
                        .map(|peer_info| {
8✔
592
                            (
8✔
593
                                // unwrap is safe bc of above `filter`
8✔
594
                                peer_info.listen_address().unwrap(),
8✔
595
                                peer_info.instance_id(),
8✔
596
                            )
8✔
597
                        })
8✔
598
                        .collect();
5✔
599

600
                    // We sort the returned list, so this function is easier to test
601
                    peer_info.sort_by_cached_key(|x| x.0);
5✔
602
                    peer_info
5✔
603
                };
604

605
                debug!("Responding with: {:?}", peer_info);
5✔
606
                peer.send(PeerMessage::PeerListResponse(peer_info)).await?;
5✔
607
                Ok(KEEP_CONNECTION_ALIVE)
5✔
608
            }
609
            PeerMessage::PeerListResponse(peers) => {
3✔
610
                log_slow_scope!(fn_name!() + "::PeerMessage::PeerListResponse");
3✔
611

612
                if peers.len() > MAX_PEER_LIST_LENGTH {
3✔
613
                    self.punish(NegativePeerSanction::FloodPeerListResponse)
×
614
                        .await?;
×
615
                }
3✔
616
                self.to_main_tx
3✔
617
                    .send(PeerTaskToMain::PeerDiscoveryAnswer((
3✔
618
                        peers,
3✔
619
                        self.peer_address,
3✔
620
                        // The distance to the revealed peers is 1 + this peer's distance
3✔
621
                        self.distance + 1,
3✔
622
                    )))
3✔
623
                    .await?;
3✔
624
                Ok(KEEP_CONNECTION_ALIVE)
3✔
625
            }
626
            PeerMessage::BlockNotificationRequest => {
627
                debug!("Got BlockNotificationRequest");
×
628

629
                peer.send(PeerMessage::BlockNotification(
×
630
                    self.global_state_lock
×
631
                        .lock_guard()
×
632
                        .await
×
633
                        .chain
634
                        .light_state()
×
635
                        .into(),
×
636
                ))
637
                .await?;
×
638

639
                Ok(KEEP_CONNECTION_ALIVE)
×
640
            }
641
            PeerMessage::BlockNotification(block_notification) => {
16✔
642
                const SYNC_CHALLENGE_COOLDOWN: Timestamp = Timestamp::minutes(10);
643

644
                let (tip_header, sync_anchor_is_set) = {
16✔
645
                    let state = self.global_state_lock.lock_guard().await;
16✔
646
                    (
16✔
647
                        *state.chain.light_state().header(),
16✔
648
                        state.net.sync_anchor.is_some(),
16✔
649
                    )
16✔
650
                };
651
                debug!(
16✔
652
                    "Got BlockNotification of height {}. Own height is {}",
×
653
                    block_notification.height, tip_header.height
654
                );
655

656
                let sync_mode_threshold = self.global_state_lock.cli().sync_mode_threshold;
16✔
657
                let now = self.now();
16✔
658
                let time_since_latest_successful_challenge = peer_state_info
16✔
659
                    .successful_sync_challenge_response_time
16✔
660
                    .map(|then| now - then);
16✔
661
                let cooldown_expired = time_since_latest_successful_challenge
16✔
662
                    .is_none_or(|time_passed| time_passed > SYNC_CHALLENGE_COOLDOWN);
16✔
663
                let exceeds_sync_mode_threshold = GlobalState::sync_mode_threshold_stateless(
16✔
664
                    &tip_header,
16✔
665
                    block_notification.height,
16✔
666
                    block_notification.cumulative_proof_of_work,
16✔
667
                    sync_mode_threshold,
16✔
668
                );
669
                if cooldown_expired && exceeds_sync_mode_threshold {
16✔
670
                    debug!("sync mode criterion satisfied.");
1✔
671

672
                    if peer_state_info.sync_challenge.is_some() {
1✔
673
                        warn!("Cannot launch new sync challenge because one is already on-going.");
×
674
                        return Ok(KEEP_CONNECTION_ALIVE);
×
675
                    }
1✔
676

677
                    info!(
1✔
678
                        "Peer indicates block which satisfies sync mode criterion, issuing challenge."
×
679
                    );
680
                    let challenge = SyncChallenge::generate(
1✔
681
                        &block_notification,
1✔
682
                        tip_header.height,
1✔
683
                        self.rng.random(),
1✔
684
                    );
685
                    peer_state_info.sync_challenge = Some(IssuedSyncChallenge::new(
1✔
686
                        challenge,
1✔
687
                        block_notification.cumulative_proof_of_work,
1✔
688
                        self.now(),
1✔
689
                    ));
1✔
690

691
                    debug!("sending challenge ...");
1✔
692
                    peer.send(PeerMessage::SyncChallenge(challenge)).await?;
1✔
693

694
                    return Ok(KEEP_CONNECTION_ALIVE);
1✔
695
                }
15✔
696

697
                peer_state_info.highest_shared_block_height = block_notification.height;
15✔
698
                let block_is_new = tip_header.cumulative_proof_of_work
15✔
699
                    < block_notification.cumulative_proof_of_work;
15✔
700

701
                debug!("block_is_new: {}", block_is_new);
15✔
702

703
                if block_is_new
15✔
704
                    && peer_state_info.fork_reconciliation_blocks.is_empty()
15✔
705
                    && !sync_anchor_is_set
14✔
706
                    && !exceeds_sync_mode_threshold
14✔
707
                {
708
                    debug!(
13✔
709
                        "sending BlockRequestByHeight to peer for block with height {}",
×
710
                        block_notification.height
711
                    );
712
                    peer.send(PeerMessage::BlockRequestByHeight(block_notification.height))
13✔
713
                        .await?;
13✔
714
                } else {
715
                    debug!(
2✔
716
                        "ignoring peer block. height {}. new: {}, reconciling_fork: {}",
×
717
                        block_notification.height,
718
                        block_is_new,
719
                        !peer_state_info.fork_reconciliation_blocks.is_empty()
×
720
                    );
721
                }
722

723
                Ok(KEEP_CONNECTION_ALIVE)
15✔
724
            }
725
            PeerMessage::SyncChallenge(sync_challenge) => {
2✔
726
                let response = {
×
727
                    log_slow_scope!(fn_name!() + "::PeerMessage::SyncChallenge");
2✔
728

729
                    info!("Got sync challenge from {}", self.peer_address.ip());
2✔
730

731
                    let response = self
2✔
732
                        .global_state_lock
2✔
733
                        .lock_guard()
2✔
734
                        .await
2✔
735
                        .response_to_sync_challenge(sync_challenge)
2✔
736
                        .await;
2✔
737

738
                    match response {
2✔
739
                        Ok(resp) => resp,
×
740
                        Err(e) => {
2✔
741
                            warn!("could not generate sync challenge response:\n{e}");
2✔
742
                            self.punish(NegativePeerSanction::InvalidSyncChallenge)
2✔
743
                                .await?;
2✔
744
                            return Ok(KEEP_CONNECTION_ALIVE);
2✔
745
                        }
746
                    }
747
                };
748

749
                info!(
×
750
                    "Responding to sync challenge from {}",
×
751
                    self.peer_address.ip()
×
752
                );
753
                peer.send(PeerMessage::SyncChallengeResponse(Box::new(response)))
×
754
                    .await?;
×
755

756
                Ok(KEEP_CONNECTION_ALIVE)
×
757
            }
758
            PeerMessage::SyncChallengeResponse(challenge_response) => {
1✔
759
                const SYNC_RESPONSE_TIMEOUT: Timestamp = Timestamp::seconds(45);
760

761
                log_slow_scope!(fn_name!() + "::PeerMessage::SyncChallengeResponse");
1✔
762
                info!(
1✔
763
                    "Got sync challenge response from {}",
×
764
                    self.peer_address.ip()
×
765
                );
766

767
                // The purpose of the sync challenge and sync challenge response
768
                // is to avoid going into sync mode based on a malicious target
769
                // fork. Instead of verifying that the claimed proof-of-work
770
                // number is correct (which would require sending and verifying,
771
                // at least, all blocks between luca (whatever that is) and the
772
                // claimed tip), we use a heuristic that requires less
773
                // communication and less verification work. The downside of
774
                // using a heuristic here is a nonzero false positive and false
775
                // negative rate. Note that the false negative event
776
                // (maliciously sending someone into sync mode based on a bogus
777
                // fork) still requires a significant amount of work from the
778
                // attacker, *in addition* to being lucky. Also, down the line
779
                // succinctness (and more specifically, recursive block
780
                // validation) obviates this entire subprotocol.
781

782
                // Did we issue a challenge?
783
                let Some(issued_challenge) = peer_state_info.sync_challenge else {
1✔
784
                    warn!("Sync challenge response was not prompted.");
×
785
                    self.punish(NegativePeerSanction::UnexpectedSyncChallengeResponse)
×
786
                        .await?;
×
787
                    return Ok(KEEP_CONNECTION_ALIVE);
×
788
                };
789

790
                // Reset the challenge, regardless of the response's success.
791
                peer_state_info.sync_challenge = None;
1✔
792

793
                // Does response match issued challenge?
794
                if !challenge_response
1✔
795
                    .matches(self.global_state_lock.cli().network, issued_challenge)
1✔
796
                {
797
                    self.punish(NegativePeerSanction::InvalidSyncChallengeResponse)
×
798
                        .await?;
×
799
                    return Ok(KEEP_CONNECTION_ALIVE);
×
800
                }
1✔
801

802
                // Does response verify?
803
                let claimed_tip_height = challenge_response.tip.header.height;
1✔
804
                let now = self.now();
1✔
805
                if !challenge_response
1✔
806
                    .is_valid(now, self.global_state_lock.cli().network)
1✔
807
                    .await
1✔
808
                {
809
                    self.punish(NegativePeerSanction::InvalidSyncChallengeResponse)
×
810
                        .await?;
×
811
                    return Ok(KEEP_CONNECTION_ALIVE);
×
812
                }
1✔
813

814
                // Does cumulative proof-of-work evolve reasonably?
815
                let own_tip_header = *self
1✔
816
                    .global_state_lock
1✔
817
                    .lock_guard()
1✔
818
                    .await
1✔
819
                    .chain
820
                    .light_state()
1✔
821
                    .header();
1✔
822
                if !challenge_response
1✔
823
                    .check_pow(self.global_state_lock.cli().network, own_tip_header.height)
1✔
824
                {
825
                    self.punish(NegativePeerSanction::FishyPowEvolutionChallengeResponse)
×
826
                        .await?;
×
827
                    return Ok(KEEP_CONNECTION_ALIVE);
×
828
                }
1✔
829

830
                // Is there some specific (*i.e.*, not aggregate) proof of work?
831
                if !challenge_response.check_difficulty(own_tip_header.difficulty) {
1✔
832
                    self.punish(NegativePeerSanction::FishyDifficultiesChallengeResponse)
×
833
                        .await?;
×
834
                    return Ok(KEEP_CONNECTION_ALIVE);
×
835
                }
1✔
836

837
                // Did it come in time?
838
                if now - issued_challenge.issued_at > SYNC_RESPONSE_TIMEOUT {
1✔
839
                    self.punish(NegativePeerSanction::TimedOutSyncChallengeResponse)
×
840
                        .await?;
×
841
                    return Ok(KEEP_CONNECTION_ALIVE);
×
842
                }
1✔
843

844
                info!("Successful sync challenge response; relaying peer tip info to main loop.");
1✔
845
                peer_state_info.successful_sync_challenge_response_time = Some(now);
1✔
846

847
                let mut sync_mmra_anchor = challenge_response.tip.body.block_mmr_accumulator;
1✔
848
                sync_mmra_anchor.append(issued_challenge.challenge.tip_digest);
1✔
849

850
                // Inform main loop
851
                self.to_main_tx
1✔
852
                    .send(PeerTaskToMain::AddPeerMaxBlockHeight {
1✔
853
                        peer_address: self.peer_address,
1✔
854
                        claimed_height: claimed_tip_height,
1✔
855
                        claimed_cumulative_pow: issued_challenge.accumulated_pow,
1✔
856
                        claimed_block_mmra: sync_mmra_anchor,
1✔
857
                    })
1✔
858
                    .await?;
1✔
859

860
                Ok(KEEP_CONNECTION_ALIVE)
1✔
861
            }
862
            PeerMessage::BlockRequestByHash(block_digest) => {
×
863
                let block = self
×
864
                    .global_state_lock
×
865
                    .lock_guard()
×
866
                    .await
×
867
                    .chain
868
                    .archival_state()
×
869
                    .get_block(block_digest)
×
870
                    .await?;
×
871

872
                match block {
×
873
                    None => {
874
                        // TODO: Consider punishing here
875
                        warn!("Peer requested unknown block with hash {}", block_digest);
×
876
                        Ok(KEEP_CONNECTION_ALIVE)
×
877
                    }
878
                    Some(b) => {
×
879
                        peer.send(PeerMessage::Block(Box::new(b.try_into().unwrap())))
×
880
                            .await?;
×
881
                        Ok(KEEP_CONNECTION_ALIVE)
×
882
                    }
883
                }
884
            }
885
            PeerMessage::BlockRequestByHeight(block_height) => {
16✔
886
                let block_response = {
15✔
887
                    log_slow_scope!(fn_name!() + "::PeerMessage::BlockRequestByHeight");
16✔
888

889
                    debug!("Got BlockRequestByHeight of height {}", block_height);
16✔
890

891
                    let canonical_block_digest = self
16✔
892
                        .global_state_lock
16✔
893
                        .lock_guard()
16✔
894
                        .await
16✔
895
                        .chain
896
                        .archival_state()
16✔
897
                        .archival_block_mmr
898
                        .ammr()
16✔
899
                        .try_get_leaf(block_height.into())
16✔
900
                        .await;
16✔
901

902
                    let Some(canonical_block_digest) = canonical_block_digest else {
16✔
903
                        let own_tip_height = self
1✔
904
                            .global_state_lock
1✔
905
                            .lock_guard()
1✔
906
                            .await
1✔
907
                            .chain
908
                            .light_state()
1✔
909
                            .header()
1✔
910
                            .height;
911
                        warn!("Got block request by height ({block_height}) for unknown block. Own tip height is {own_tip_height}.");
1✔
912
                        self.punish(NegativePeerSanction::BlockRequestUnknownHeight)
1✔
913
                            .await?;
1✔
914

915
                        return Ok(KEEP_CONNECTION_ALIVE);
1✔
916
                    };
917

918
                    let canonical_chain_block: Block = self
15✔
919
                        .global_state_lock
15✔
920
                        .lock_guard()
15✔
921
                        .await
15✔
922
                        .chain
923
                        .archival_state()
15✔
924
                        .get_block(canonical_block_digest)
15✔
925
                        .await?
15✔
926
                        .unwrap();
15✔
927

928
                    PeerMessage::Block(Box::new(canonical_chain_block.try_into().unwrap()))
15✔
929
                };
930

931
                debug!("Sending block");
15✔
932
                peer.send(block_response).await?;
15✔
933
                debug!("Sent block");
15✔
934
                Ok(KEEP_CONNECTION_ALIVE)
15✔
935
            }
936
            PeerMessage::Block(t_block) => {
32✔
937
                log_slow_scope!(fn_name!() + "::PeerMessage::Block");
32✔
938

939
                info!(
32✔
940
                    "Got new block from peer {}, height {}, mined {}",
×
941
                    self.peer_address,
942
                    t_block.header.height,
943
                    t_block.header.timestamp.standard_format()
×
944
                );
945
                let new_block_height = t_block.header.height;
32✔
946

947
                let block = match Block::try_from(*t_block) {
32✔
948
                    Ok(block) => Box::new(block),
32✔
949
                    Err(e) => {
×
950
                        warn!("Peer sent invalid block: {e:?}");
×
951
                        self.punish(NegativePeerSanction::InvalidTransferBlock)
×
952
                            .await?;
×
953

954
                        return Ok(KEEP_CONNECTION_ALIVE);
×
955
                    }
956
                };
957

958
                // Update the value for the highest known height that peer possesses iff
959
                // we are not in a fork reconciliation state.
960
                if peer_state_info.fork_reconciliation_blocks.is_empty() {
32✔
961
                    peer_state_info.highest_shared_block_height = new_block_height;
22✔
962
                }
22✔
963

964
                self.try_ensure_path(block, peer, peer_state_info).await?;
32✔
965

966
                // Reward happens as part of `try_ensure_path`
967

968
                Ok(KEEP_CONNECTION_ALIVE)
31✔
969
            }
970
            PeerMessage::BlockRequestBatch(BlockRequestBatch {
971
                known_blocks,
8✔
972
                max_response_len,
8✔
973
                anchor,
8✔
974
            }) => {
975
                debug!(
8✔
976
                    "Received BlockRequestBatch from peer {}, max_response_len: {max_response_len}",
×
977
                    self.peer_address
978
                );
979

980
                if known_blocks.len() > MAX_NUM_DIGESTS_IN_BATCH_REQUEST {
8✔
981
                    self.punish(NegativePeerSanction::BatchBlocksRequestTooManyDigests)
×
982
                        .await?;
×
983

984
                    return Ok(KEEP_CONNECTION_ALIVE);
×
985
                }
8✔
986

987
                // The last block in the list of the peers known block is the
988
                // earliest block, block with lowest height, the peer has
989
                // requested. If it does not belong to canonical chain, none of
990
                // the later will. So we can do an early abort in that case.
991
                let least_preferred = match known_blocks.last() {
8✔
992
                    Some(least_preferred) => *least_preferred,
8✔
993
                    None => {
994
                        self.punish(NegativePeerSanction::BatchBlocksRequestEmpty)
×
995
                            .await?;
×
996

997
                        return Ok(KEEP_CONNECTION_ALIVE);
×
998
                    }
999
                };
1000

1001
                let state = self.global_state_lock.lock_guard().await;
8✔
1002
                let block_mmr_num_leafs = state.chain.light_state().header().height.next().into();
8✔
1003
                let luca_is_known = state
8✔
1004
                    .chain
8✔
1005
                    .archival_state()
8✔
1006
                    .block_belongs_to_canonical_chain(least_preferred)
8✔
1007
                    .await;
8✔
1008
                if !luca_is_known || anchor.num_leafs() > block_mmr_num_leafs {
8✔
1009
                    drop(state);
×
1010
                    self.punish(NegativePeerSanction::BatchBlocksUnknownRequest)
×
1011
                        .await?;
×
1012
                    peer.send(PeerMessage::UnableToSatisfyBatchRequest).await?;
×
1013

1014
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1015
                }
8✔
1016

1017
                // Happy case: At least *one* of the blocks referenced by peer
1018
                // is known to us.
1019
                let first_block_in_response = {
8✔
1020
                    let mut first_block_in_response: Option<BlockHeight> = None;
8✔
1021
                    for block_digest in known_blocks {
10✔
1022
                        if state
10✔
1023
                            .chain
10✔
1024
                            .archival_state()
10✔
1025
                            .block_belongs_to_canonical_chain(block_digest)
10✔
1026
                            .await
10✔
1027
                        {
1028
                            let height = state
8✔
1029
                                .chain
8✔
1030
                                .archival_state()
8✔
1031
                                .get_block_header(block_digest)
8✔
1032
                                .await
8✔
1033
                                .unwrap()
8✔
1034
                                .height;
1035
                            first_block_in_response = Some(height);
8✔
1036
                            debug!(
8✔
1037
                                "Found block in canonical chain for batch response: {}",
×
1038
                                block_digest
1039
                            );
1040
                            break;
8✔
1041
                        }
2✔
1042
                    }
1043

1044
                    first_block_in_response
8✔
1045
                        .expect("existence of LUCA should have been established already.")
8✔
1046
                };
1047

1048
                debug!(
8✔
1049
                    "Peer's most preferred block has height {first_block_in_response}.\
×
1050
                 Now building response from that height."
×
1051
                );
1052

1053
                // Get the relevant blocks, at most batch-size many, descending from the
1054
                // peer's (alleged) most canonical block. Don't exceed `max_response_len`
1055
                // or `STANDARD_BLOCK_BATCH_SIZE` number of blocks in response.
1056
                let max_response_len = cmp::min(
8✔
1057
                    max_response_len,
8✔
1058
                    self.global_state_lock.cli().sync_mode_threshold,
8✔
1059
                );
1060
                let max_response_len = cmp::max(max_response_len, MINIMUM_BLOCK_BATCH_SIZE);
8✔
1061
                let max_response_len = cmp::min(max_response_len, STANDARD_BLOCK_BATCH_SIZE);
8✔
1062

1063
                let mut digests_of_returned_blocks = Vec::with_capacity(max_response_len);
8✔
1064
                let response_start_height: u64 = first_block_in_response.into();
8✔
1065
                let mut i: u64 = 1;
8✔
1066
                while digests_of_returned_blocks.len() < max_response_len {
31✔
1067
                    let block_height = response_start_height + i;
31✔
1068
                    match state
31✔
1069
                        .chain
31✔
1070
                        .archival_state()
31✔
1071
                        .archival_block_mmr
31✔
1072
                        .ammr()
31✔
1073
                        .try_get_leaf(block_height)
31✔
1074
                        .await
31✔
1075
                    {
1076
                        Some(digest) => {
23✔
1077
                            digests_of_returned_blocks.push(digest);
23✔
1078
                        }
23✔
1079
                        None => break,
8✔
1080
                    }
1081
                    i += 1;
23✔
1082
                }
1083

1084
                let mut returned_blocks: Vec<Block> =
8✔
1085
                    Vec::with_capacity(digests_of_returned_blocks.len());
8✔
1086
                for block_digest in digests_of_returned_blocks {
31✔
1087
                    let block = state
23✔
1088
                        .chain
23✔
1089
                        .archival_state()
23✔
1090
                        .get_block(block_digest)
23✔
1091
                        .await?
23✔
1092
                        .unwrap();
23✔
1093
                    returned_blocks.push(block);
23✔
1094
                }
1095

1096
                let response = Self::batch_response(&state, returned_blocks, &anchor).await;
8✔
1097

1098
                // issue 457. do not hold lock across a peer.send(), nor self.punish()
1099
                drop(state);
8✔
1100

1101
                let Some(response) = response else {
8✔
1102
                    warn!("Unable to satisfy batch-block request");
×
1103
                    self.punish(NegativePeerSanction::BatchBlocksUnknownRequest)
×
1104
                        .await?;
×
1105
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1106
                };
1107

1108
                debug!("Returning {} blocks in batch response", response.len());
8✔
1109

1110
                let response = PeerMessage::BlockResponseBatch(response);
8✔
1111
                peer.send(response).await?;
8✔
1112

1113
                Ok(KEEP_CONNECTION_ALIVE)
8✔
1114
            }
1115
            PeerMessage::BlockResponseBatch(authenticated_blocks) => {
×
1116
                log_slow_scope!(fn_name!() + "::PeerMessage::BlockResponseBatch");
×
1117

1118
                debug!(
×
1119
                    "handling block response batch with {} blocks",
×
1120
                    authenticated_blocks.len()
×
1121
                );
1122

1123
                // (Alan:) why is there even a minimum?
1124
                if authenticated_blocks.len() < MINIMUM_BLOCK_BATCH_SIZE {
×
1125
                    warn!("Got smaller batch response than allowed");
×
1126
                    self.punish(NegativePeerSanction::TooShortBlockBatch)
×
1127
                        .await?;
×
1128
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1129
                }
×
1130

1131
                // Verify that we are in fact in syncing mode
1132
                // TODO: Separate peer messages into those allowed under syncing
1133
                // and those that are not
1134
                let Some(sync_anchor) = self
×
1135
                    .global_state_lock
×
1136
                    .lock_guard()
×
1137
                    .await
×
1138
                    .net
1139
                    .sync_anchor
1140
                    .clone()
×
1141
                else {
1142
                    warn!("Received a batch of blocks without being in syncing mode");
×
1143
                    self.punish(NegativePeerSanction::ReceivedBatchBlocksOutsideOfSync)
×
1144
                        .await?;
×
1145
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1146
                };
1147

1148
                // Verify that the response matches the current state
1149
                // We get the latest block from the DB here since this message is
1150
                // only valid for archival nodes.
1151
                let (first_block, _) = &authenticated_blocks[0];
×
1152
                let first_blocks_parent_digest: Digest = first_block.header.prev_block_digest;
×
1153
                let most_canonical_own_block_match: Option<Block> = self
×
1154
                    .global_state_lock
×
1155
                    .lock_guard()
×
1156
                    .await
×
1157
                    .chain
1158
                    .archival_state()
×
1159
                    .get_block(first_blocks_parent_digest)
×
1160
                    .await
×
1161
                    .expect("Block lookup must succeed");
×
1162
                let most_canonical_own_block_match: Block = match most_canonical_own_block_match {
×
1163
                    Some(block) => block,
×
1164
                    None => {
1165
                        warn!("Got batch response with invalid start block");
×
1166
                        self.punish(NegativePeerSanction::BatchBlocksInvalidStartHeight)
×
1167
                            .await?;
×
1168
                        return Ok(KEEP_CONNECTION_ALIVE);
×
1169
                    }
1170
                };
1171

1172
                // Convert all blocks to Block objects
1173
                debug!(
×
1174
                    "Found own block of height {} to match received batch",
×
1175
                    most_canonical_own_block_match.kernel.header.height
×
1176
                );
1177
                let mut received_blocks = vec![];
×
1178
                for (t_block, membership_proof) in authenticated_blocks {
×
1179
                    let Ok(block) = Block::try_from(t_block) else {
×
1180
                        warn!("Received invalid transfer block from peer");
×
1181
                        self.punish(NegativePeerSanction::InvalidTransferBlock)
×
1182
                            .await?;
×
1183
                        return Ok(KEEP_CONNECTION_ALIVE);
×
1184
                    };
1185

1186
                    if !membership_proof.verify(
×
1187
                        block.header().height.into(),
×
1188
                        block.hash(),
×
1189
                        &sync_anchor.block_mmr.peaks(),
×
1190
                        sync_anchor.block_mmr.num_leafs(),
×
1191
                    ) {
×
1192
                        warn!("Authentication of received block fails relative to anchor");
×
1193
                        self.punish(NegativePeerSanction::InvalidBlockMmrAuthentication)
×
1194
                            .await?;
×
1195
                        return Ok(KEEP_CONNECTION_ALIVE);
×
1196
                    }
×
1197

1198
                    received_blocks.push(block);
×
1199
                }
1200

1201
                // Get the latest block that we know of and handle all received blocks
1202
                self.handle_blocks(received_blocks, most_canonical_own_block_match)
×
1203
                    .await?;
×
1204

1205
                // Reward happens as part of `handle_blocks`.
1206

1207
                Ok(KEEP_CONNECTION_ALIVE)
×
1208
            }
1209
            PeerMessage::UnableToSatisfyBatchRequest => {
1210
                log_slow_scope!(fn_name!() + "::PeerMessage::UnableToSatisfyBatchRequest");
×
1211
                warn!(
×
1212
                    "Peer {} reports inability to satisfy batch request.",
×
1213
                    self.peer_address
1214
                );
1215

1216
                Ok(KEEP_CONNECTION_ALIVE)
×
1217
            }
1218
            PeerMessage::Handshake(_) => {
1219
                log_slow_scope!(fn_name!() + "::PeerMessage::Handshake");
×
1220

1221
                // The handshake should have been sent during connection
1222
                // initialization. Here it is out of order at best, malicious at
1223
                // worst.
1224
                self.punish(NegativePeerSanction::InvalidMessage).await?;
×
1225
                Ok(KEEP_CONNECTION_ALIVE)
×
1226
            }
1227
            PeerMessage::ConnectionStatus(_) => {
1228
                log_slow_scope!(fn_name!() + "::PeerMessage::ConnectionStatus");
×
1229

1230
                // The connection status should have been sent during connection
1231
                // initialization. Here it is out of order at best, malicious at
1232
                // worst.
1233

1234
                self.punish(NegativePeerSanction::InvalidMessage).await?;
×
1235
                Ok(KEEP_CONNECTION_ALIVE)
×
1236
            }
1237
            PeerMessage::Transaction(transaction) => {
5✔
1238
                log_slow_scope!(fn_name!() + "::PeerMessage::Transaction");
5✔
1239

1240
                debug!(
5✔
1241
                    "`peer_loop` received following transaction from peer. {} inputs, {} outputs. Synced to mutator set hash: {}",
×
1242
                    transaction.kernel.inputs.len(),
×
1243
                    transaction.kernel.outputs.len(),
×
1244
                    transaction.kernel.mutator_set_hash
×
1245
                );
1246

1247
                let transaction: Transaction = (*transaction).into();
5✔
1248

1249
                // 1. If transaction is invalid, punish.
1250
                if !transaction
5✔
1251
                    .is_valid(self.global_state_lock.cli().network)
5✔
1252
                    .await
5✔
1253
                {
1254
                    warn!("Received invalid tx");
×
1255
                    self.punish(NegativePeerSanction::InvalidTransaction)
×
1256
                        .await?;
×
1257
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1258
                }
5✔
1259

1260
                // 2. If transaction has coinbase, punish.
1261
                // Transactions received from peers have not been mined yet.
1262
                // Only the miner is allowed to produce transactions with non-empty coinbase fields.
1263
                if transaction.kernel.coinbase.is_some() {
5✔
1264
                    warn!("Received non-mined transaction with coinbase.");
×
1265
                    self.punish(NegativePeerSanction::NonMinedTransactionHasCoinbase)
×
1266
                        .await?;
×
1267
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1268
                }
5✔
1269

1270
                // 3. If negative fee, punish.
1271
                if transaction.kernel.fee.is_negative() {
5✔
1272
                    warn!("Received negative-fee transaction.");
×
1273
                    self.punish(NegativePeerSanction::TransactionWithNegativeFee)
×
1274
                        .await?;
×
1275
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1276
                }
5✔
1277

1278
                // 4. If transaction is already known, ignore.
1279
                if self
5✔
1280
                    .global_state_lock
5✔
1281
                    .lock_guard()
5✔
1282
                    .await
5✔
1283
                    .mempool
1284
                    .contains_with_higher_proof_quality(
5✔
1285
                        transaction.kernel.txid(),
5✔
1286
                        transaction.proof.proof_quality()?,
5✔
1287
                    )
1288
                {
1289
                    warn!("Received transaction that was already known");
×
1290

1291
                    // We received a transaction that we *probably* haven't requested.
1292
                    // Consider punishing here, if this is abused.
1293
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1294
                }
5✔
1295

1296
                // 5. if transaction is not confirmable, punish.
1297
                let (tip, mutator_set_accumulator_after) = {
5✔
1298
                    let state = self.global_state_lock.lock_guard().await;
5✔
1299

1300
                    (
5✔
1301
                        state.chain.light_state().hash(),
5✔
1302
                        state.chain.light_state().mutator_set_accumulator_after(),
5✔
1303
                    )
5✔
1304
                };
1305
                if !transaction.is_confirmable_relative_to(&mutator_set_accumulator_after) {
5✔
1306
                    warn!(
×
1307
                        "Received unconfirmable transaction with TXID {}. Unconfirmable because:",
×
1308
                        transaction.kernel.txid()
×
1309
                    );
1310
                    // get fine-grained error code for informative logging
1311
                    let confirmability_error_code = transaction
×
1312
                        .kernel
×
1313
                        .is_confirmable_relative_to(&mutator_set_accumulator_after);
×
1314
                    match confirmability_error_code {
×
1315
                        Ok(_) => unreachable!(),
×
1316
                        Err(TransactionConfirmabilityError::InvalidRemovalRecord(index)) => {
×
1317
                            warn!("invalid removal record (at index {index})");
×
1318
                            let invalid_removal_record = transaction.kernel.inputs[index].clone();
×
1319
                            let removal_record_error_code = invalid_removal_record
×
1320
                                .validate_inner(&mutator_set_accumulator_after);
×
1321
                            debug!(
×
1322
                                "Absolute index set of removal record {index}: {:?}",
×
1323
                                invalid_removal_record.absolute_indices
1324
                            );
1325
                            match removal_record_error_code {
×
1326
                                Ok(_) => unreachable!(),
×
1327
                                Err(RemovalRecordValidityError::AbsentAuthenticatedChunk) => {
1328
                                    debug!("invalid because membership proof is missing");
×
1329
                                }
1330
                                Err(RemovalRecordValidityError::InvalidSwbfiMmrMp {
1331
                                    chunk_index,
×
1332
                                }) => {
1333
                                    debug!("invalid because membership proof for chunk index {chunk_index} is invalid");
×
1334
                                }
1335
                            };
1336
                            self.punish(NegativePeerSanction::UnconfirmableTransaction)
×
1337
                                .await?;
×
1338
                            return Ok(KEEP_CONNECTION_ALIVE);
×
1339
                        }
1340
                        Err(TransactionConfirmabilityError::DuplicateInputs) => {
1341
                            warn!("duplicate inputs");
×
1342
                            self.punish(NegativePeerSanction::DoubleSpendingTransaction)
×
1343
                                .await?;
×
1344
                            return Ok(KEEP_CONNECTION_ALIVE);
×
1345
                        }
1346
                        Err(TransactionConfirmabilityError::AlreadySpentInput(index)) => {
×
1347
                            warn!("already spent input (at index {index})");
×
1348
                            self.punish(NegativePeerSanction::DoubleSpendingTransaction)
×
1349
                                .await?;
×
1350
                            return Ok(KEEP_CONNECTION_ALIVE);
×
1351
                        }
1352
                    };
1353
                }
5✔
1354

1355
                // If transaction cannot be applied to mutator set, punish.
1356
                // I don't think this can happen when above checks pass but we include
1357
                // the check to ensure that transaction can be applied.
1358
                let ms_update = MutatorSetUpdate::new(
5✔
1359
                    transaction.kernel.inputs.clone(),
5✔
1360
                    transaction.kernel.outputs.clone(),
5✔
1361
                );
1362
                let can_apply = ms_update
5✔
1363
                    .apply_to_accumulator(&mut mutator_set_accumulator_after.clone())
5✔
1364
                    .is_ok();
5✔
1365
                if !can_apply {
5✔
1366
                    warn!("Cannot apply transaction to current mutator set.");
×
1367
                    warn!("Transaction ID: {}", transaction.kernel.txid());
×
1368
                    self.punish(NegativePeerSanction::CannotApplyTransactionToMutatorSet)
×
1369
                        .await?;
×
1370
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1371
                }
5✔
1372

1373
                let tx_timestamp = transaction.kernel.timestamp;
5✔
1374

1375
                // 6. Ignore if transaction is too old
1376
                let now = self.now();
5✔
1377
                if tx_timestamp < now - Timestamp::seconds(MEMPOOL_TX_THRESHOLD_AGE_IN_SECS) {
5✔
1378
                    // TODO: Consider punishing here
1379
                    warn!("Received too old tx");
×
1380
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1381
                }
5✔
1382

1383
                // 7. Ignore if transaction is too far into the future
1384
                if tx_timestamp
5✔
1385
                    > now + Timestamp::seconds(MEMPOOL_IGNORE_TRANSACTIONS_THIS_MANY_SECS_AHEAD)
5✔
1386
                {
1387
                    // TODO: Consider punishing here
1388
                    warn!("Received tx too far into the future. Got timestamp: {tx_timestamp:?}");
×
1389
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1390
                }
5✔
1391

1392
                // Otherwise, relay to main
1393
                let pt2m_transaction = PeerTaskToMainTransaction {
5✔
1394
                    transaction,
5✔
1395
                    confirmable_for_block: tip,
5✔
1396
                };
5✔
1397
                self.to_main_tx
5✔
1398
                    .send(PeerTaskToMain::Transaction(Box::new(pt2m_transaction)))
5✔
1399
                    .await?;
5✔
1400

1401
                Ok(KEEP_CONNECTION_ALIVE)
5✔
1402
            }
1403
            PeerMessage::TransactionNotification(tx_notification) => {
12✔
1404
                // addresses #457
1405
                // new scope for state read-lock to avoid holding across peer.send()
1406
                {
1407
                    log_slow_scope!(fn_name!() + "::PeerMessage::TransactionNotification");
12✔
1408

1409
                    // 1. Ignore if we already know this transaction, and
1410
                    // the proof quality is not higher than what we already know.
1411
                    let state = self.global_state_lock.lock_guard().await;
12✔
1412
                    let transaction_of_same_or_higher_proof_quality_is_known =
12✔
1413
                        state.mempool.contains_with_higher_proof_quality(
12✔
1414
                            tx_notification.txid,
12✔
1415
                            tx_notification.proof_quality,
12✔
1416
                        );
1417
                    if transaction_of_same_or_higher_proof_quality_is_known {
12✔
1418
                        debug!("transaction with same or higher proof quality was already known");
7✔
1419
                        return Ok(KEEP_CONNECTION_ALIVE);
7✔
1420
                    }
5✔
1421

1422
                    // Only accept transactions that do not require executing
1423
                    // `update`.
1424
                    if state
5✔
1425
                        .chain
5✔
1426
                        .light_state()
5✔
1427
                        .mutator_set_accumulator_after()
5✔
1428
                        .hash()
5✔
1429
                        != tx_notification.mutator_set_hash
5✔
1430
                    {
1431
                        debug!("transaction refers to non-canonical mutator set state");
×
1432
                        return Ok(KEEP_CONNECTION_ALIVE);
×
1433
                    }
5✔
1434
                }
1435

1436
                // 2. Request the actual `Transaction` from peer
1437
                debug!("requesting transaction from peer");
5✔
1438
                peer.send(PeerMessage::TransactionRequest(tx_notification.txid))
5✔
1439
                    .await?;
5✔
1440

1441
                Ok(KEEP_CONNECTION_ALIVE)
5✔
1442
            }
1443
            PeerMessage::TransactionRequest(transaction_identifier) => {
5✔
1444
                let state = self.global_state_lock.lock_guard().await;
5✔
1445
                let Some(transaction) = state.mempool.get(transaction_identifier) else {
5✔
1446
                    return Ok(KEEP_CONNECTION_ALIVE);
1✔
1447
                };
1448

1449
                let Ok(transfer_transaction) = transaction.try_into() else {
4✔
1450
                    warn!("Peer requested transaction that cannot be converted to transfer object");
×
1451
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1452
                };
1453

1454
                // Drop state immediately to prevent holding over a response.
1455
                drop(state);
4✔
1456

1457
                peer.send(PeerMessage::Transaction(Box::new(transfer_transaction)))
4✔
1458
                    .await?;
4✔
1459

1460
                Ok(KEEP_CONNECTION_ALIVE)
4✔
1461
            }
1462
            PeerMessage::BlockProposalNotification(block_proposal_notification) => {
1✔
1463
                let verdict = self
1✔
1464
                    .global_state_lock
1✔
1465
                    .lock_guard()
1✔
1466
                    .await
1✔
1467
                    .favor_incoming_block_proposal(
1✔
1468
                        block_proposal_notification.height,
1✔
1469
                        block_proposal_notification.guesser_fee,
1✔
1470
                    );
1471
                match verdict {
1✔
1472
                    Ok(_) => {
1473
                        peer.send(PeerMessage::BlockProposalRequest(
1✔
1474
                            BlockProposalRequest::new(block_proposal_notification.body_mast_hash),
1✔
1475
                        ))
1✔
1476
                        .await?
1✔
1477
                    }
1478
                    Err(reject_reason) => {
×
1479
                        info!(
×
1480
                        "Rejecting notification of block proposal with guesser fee {} from peer \
×
1481
                        {}. Reason:\n{reject_reason}",
×
1482
                        block_proposal_notification.guesser_fee.display_n_decimals(5),
×
1483
                        self.peer_address
1484
                    )
1485
                    }
1486
                }
1487

1488
                Ok(KEEP_CONNECTION_ALIVE)
1✔
1489
            }
1490
            PeerMessage::BlockProposalRequest(block_proposal_request) => {
×
1491
                let matching_proposal = self
×
1492
                    .global_state_lock
×
1493
                    .lock_guard()
×
1494
                    .await
×
1495
                    .mining_state
1496
                    .block_proposal
1497
                    .filter(|x| x.body().mast_hash() == block_proposal_request.body_mast_hash)
×
1498
                    .map(|x| x.to_owned());
×
1499
                if let Some(proposal) = matching_proposal {
×
1500
                    peer.send(PeerMessage::BlockProposal(Box::new(proposal)))
×
1501
                        .await?;
×
1502
                } else {
1503
                    self.punish(NegativePeerSanction::BlockProposalNotFound)
×
1504
                        .await?;
×
1505
                }
1506

1507
                Ok(KEEP_CONNECTION_ALIVE)
×
1508
            }
1509
            PeerMessage::BlockProposal(block) => {
1✔
1510
                info!("Got block proposal from peer.");
1✔
1511

1512
                let should_punish = {
1✔
1513
                    log_slow_scope!(fn_name!() + "::PeerMessage::BlockProposal::should_punish");
1✔
1514

1515
                    let (verdict, tip) = {
1✔
1516
                        let state = self.global_state_lock.lock_guard().await;
1✔
1517

1518
                        let verdict = state.favor_incoming_block_proposal(
1✔
1519
                            block.header().height,
1✔
1520
                            block.total_guesser_reward(),
1✔
1521
                        );
1✔
1522
                        let tip = state.chain.light_state().to_owned();
1✔
1523
                        (verdict, tip)
1✔
1524
                    };
1525

1526
                    if let Err(rejection_reason) = verdict {
1✔
1527
                        match rejection_reason {
×
1528
                            // no need to punish and log if the fees are equal.  we just ignore the incoming proposal.
1529
                            BlockProposalRejectError::InsufficientFee { current, received }
×
1530
                                if Some(received) == current =>
×
1531
                            {
1532
                                debug!("ignoring new block proposal because the fee is equal to the present one");
×
1533
                                None
×
1534
                            }
1535
                            _ => {
1536
                                warn!("Rejecting new block proposal:\n{rejection_reason}");
×
1537
                                Some(NegativePeerSanction::NonFavorableBlockProposal)
×
1538
                            }
1539
                        }
1540
                    } else {
1541
                        // Verify validity and that proposal is child of current tip
1542
                        if block
1✔
1543
                            .is_valid(&tip, self.now(), self.global_state_lock.cli().network)
1✔
1544
                            .await
1✔
1545
                        {
1546
                            None // all is well, no punishment.
1✔
1547
                        } else {
1548
                            Some(NegativePeerSanction::InvalidBlockProposal)
×
1549
                        }
1550
                    }
1551
                };
1552

1553
                if let Some(sanction) = should_punish {
1✔
1554
                    self.punish(sanction).await?;
×
1555
                } else {
1556
                    self.send_to_main(PeerTaskToMain::BlockProposal(block), line!())
1✔
1557
                        .await?;
1✔
1558

1559
                    // Valuable, new, hard-to-produce information. Reward peer.
1560
                    self.reward(PositivePeerSanction::NewBlockProposal).await?;
1✔
1561
                }
1562

1563
                Ok(KEEP_CONNECTION_ALIVE)
1✔
1564
            }
1565
        }
1566
    }
147✔
1567

1568
    /// send msg to main via mpsc channel `to_main_tx` and logs if slow.
1569
    ///
1570
    /// the channel could potentially fill up in which case the send() will
1571
    /// block until there is capacity.  we wrap the send() so we can log if
1572
    /// that ever happens to the extent it passes slow-scope threshold.
1573
    async fn send_to_main(
1✔
1574
        &self,
1✔
1575
        msg: PeerTaskToMain,
1✔
1576
        line: u32,
1✔
1577
    ) -> Result<(), tokio::sync::mpsc::error::SendError<PeerTaskToMain>> {
1✔
1578
        // we measure across the send() in case the channel ever fills up.
1579
        log_slow_scope!(fn_name!() + &format!("peer_loop.rs:{}", line));
1✔
1580

1581
        self.to_main_tx.send(msg).await
1✔
1582
    }
1✔
1583

1584
    /// Handle message from main task. The boolean return value indicates if
1585
    /// the connection should be closed.
1586
    ///
1587
    /// Locking:
1588
    ///   * acquires `global_state_lock` for write via Self::punish()
1589
    async fn handle_main_task_message<S>(
28✔
1590
        &mut self,
28✔
1591
        msg: MainToPeerTask,
28✔
1592
        peer: &mut S,
28✔
1593
        peer_state_info: &mut MutablePeerState,
28✔
1594
    ) -> Result<bool>
28✔
1595
    where
28✔
1596
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
28✔
1597
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
28✔
1598
        <S as TryStream>::Error: std::error::Error,
28✔
1599
    {
28✔
1600
        debug!("Handling {} message from main in peer loop", msg.get_type());
28✔
1601
        match msg {
28✔
1602
            MainToPeerTask::Block(block) => {
22✔
1603
                // We don't currently differentiate whether a new block came from a peer, or from our
1604
                // own miner. It's always shared through this logic.
1605
                let new_block_height = block.kernel.header.height;
22✔
1606
                if new_block_height > peer_state_info.highest_shared_block_height {
22✔
1607
                    debug!("Sending PeerMessage::BlockNotification");
12✔
1608
                    peer_state_info.highest_shared_block_height = new_block_height;
12✔
1609
                    peer.send(PeerMessage::BlockNotification(block.as_ref().into()))
12✔
1610
                        .await?;
12✔
1611
                    debug!("Sent PeerMessage::BlockNotification");
12✔
1612
                }
10✔
1613
                Ok(KEEP_CONNECTION_ALIVE)
22✔
1614
            }
1615
            MainToPeerTask::RequestBlockBatch(batch_block_request) => {
×
1616
                // Only ask one of the peers about the batch of blocks
1617
                if batch_block_request.peer_addr_target != self.peer_address {
×
1618
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1619
                }
×
1620

1621
                let max_response_len = std::cmp::min(
×
1622
                    STANDARD_BLOCK_BATCH_SIZE,
1623
                    self.global_state_lock.cli().sync_mode_threshold,
×
1624
                );
1625

1626
                peer.send(PeerMessage::BlockRequestBatch(BlockRequestBatch {
×
1627
                    known_blocks: batch_block_request.known_blocks,
×
1628
                    max_response_len,
×
1629
                    anchor: batch_block_request.anchor_mmr,
×
1630
                }))
×
1631
                .await?;
×
1632

1633
                Ok(KEEP_CONNECTION_ALIVE)
×
1634
            }
1635
            MainToPeerTask::PeerSynchronizationTimeout(socket_addr) => {
×
1636
                log_slow_scope!(fn_name!() + "::MainToPeerTask::PeerSynchronizationTimeout");
×
1637

1638
                if self.peer_address != socket_addr {
×
1639
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1640
                }
×
1641

1642
                self.punish(NegativePeerSanction::SynchronizationTimeout)
×
1643
                    .await?;
×
1644

1645
                // If this peer failed the last synchronization attempt, we only
1646
                // sanction, we don't disconnect.
1647
                Ok(KEEP_CONNECTION_ALIVE)
×
1648
            }
1649
            MainToPeerTask::MakePeerDiscoveryRequest => {
1650
                peer.send(PeerMessage::PeerListRequest).await?;
×
1651
                Ok(KEEP_CONNECTION_ALIVE)
×
1652
            }
1653
            MainToPeerTask::Disconnect(peer_address) => {
×
1654
                log_slow_scope!(fn_name!() + "::MainToPeerTask::Disconnect");
×
1655

1656
                // Only disconnect from the peer the main task requested a disconnect for.
1657
                if peer_address != self.peer_address {
×
1658
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1659
                }
×
1660
                self.register_peer_disconnection().await;
×
1661

1662
                Ok(DISCONNECT_CONNECTION)
×
1663
            }
1664
            MainToPeerTask::DisconnectAll() => {
1665
                self.register_peer_disconnection().await;
×
1666

1667
                Ok(DISCONNECT_CONNECTION)
×
1668
            }
1669
            MainToPeerTask::MakeSpecificPeerDiscoveryRequest(target_socket_addr) => {
×
1670
                if target_socket_addr == self.peer_address {
×
1671
                    peer.send(PeerMessage::PeerListRequest).await?;
×
1672
                }
×
1673
                Ok(KEEP_CONNECTION_ALIVE)
×
1674
            }
1675
            MainToPeerTask::TransactionNotification(transaction_notification) => {
6✔
1676
                debug!("Sending PeerMessage::TransactionNotification");
6✔
1677
                peer.send(PeerMessage::TransactionNotification(
6✔
1678
                    transaction_notification,
6✔
1679
                ))
6✔
1680
                .await?;
6✔
1681
                debug!("Sent PeerMessage::TransactionNotification");
6✔
1682
                Ok(KEEP_CONNECTION_ALIVE)
6✔
1683
            }
1684
            MainToPeerTask::BlockProposalNotification(block_proposal_notification) => {
×
1685
                debug!("Sending PeerMessage::BlockProposalNotification");
×
1686
                peer.send(PeerMessage::BlockProposalNotification(
×
1687
                    block_proposal_notification,
×
1688
                ))
×
1689
                .await?;
×
1690
                debug!("Sent PeerMessage::BlockProposalNotification");
×
1691
                Ok(KEEP_CONNECTION_ALIVE)
×
1692
            }
1693
        }
1694
    }
28✔
1695

1696
    /// Loop for the peer tasks. Awaits either a message from the peer over TCP,
1697
    /// or a message from main over the main-to-peer-tasks broadcast channel.
1698
    async fn run<S>(
47✔
1699
        &mut self,
47✔
1700
        mut peer: S,
47✔
1701
        mut from_main_rx: broadcast::Receiver<MainToPeerTask>,
47✔
1702
        peer_state_info: &mut MutablePeerState,
47✔
1703
    ) -> Result<()>
47✔
1704
    where
47✔
1705
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
47✔
1706
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
47✔
1707
        <S as TryStream>::Error: std::error::Error,
47✔
1708
    {
47✔
1709
        loop {
1710
            select! {
181✔
1711
                // Handle peer messages
1712
                peer_message = peer.try_next() => {
181✔
1713
                    let peer_address = self.peer_address;
147✔
1714
                    let peer_message = match peer_message {
147✔
1715
                        Ok(message) => message,
147✔
1716
                        Err(err) => {
×
1717
                            let msg = format!("Error when receiving from peer: {peer_address}");
×
1718
                            error!("{msg}. Error: {err}");
×
1719
                            bail!("{msg}. Closing connection.");
×
1720
                        }
1721
                    };
1722
                    let Some(peer_message) = peer_message else {
147✔
1723
                        info!("Peer {peer_address} closed connection.");
×
1724
                        break;
×
1725
                    };
1726

1727
                    let syncing =
147✔
1728
                        self.global_state_lock.lock(|s| s.net.sync_anchor.is_some()).await;
147✔
1729
                    let message_type = peer_message.get_type();
147✔
1730
                    if peer_message.ignore_during_sync() && syncing {
147✔
1731
                        debug!(
×
1732
                            "Ignoring {message_type} message when syncing, from {peer_address}",
×
1733
                        );
1734
                        continue;
×
1735
                    }
147✔
1736
                    if peer_message.ignore_when_not_sync() && !syncing {
147✔
1737
                        debug!(
×
1738
                            "Ignoring {message_type} message when not syncing, from {peer_address}",
×
1739
                        );
1740
                        continue;
×
1741
                    }
147✔
1742

1743
                    match self
147✔
1744
                        .handle_peer_message(peer_message, &mut peer, peer_state_info)
147✔
1745
                        .await
147✔
1746
                    {
1747
                        Ok(false) => {}
106✔
1748
                        Ok(true) => {
1749
                            info!("Closing connection to {peer_address}");
40✔
1750
                            break;
40✔
1751
                        }
1752
                        Err(err) => {
1✔
1753
                            warn!("Closing connection to {peer_address} because of error {err}.");
1✔
1754
                            bail!("{err}");
1✔
1755
                        }
1756
                    };
1757
                }
1758

1759
                // Handle messages from main task
1760
                main_msg_res = from_main_rx.recv() => {
181✔
1761
                    let main_msg = main_msg_res.unwrap_or_else(|err| {
28✔
UNCOV
1762
                        let err_msg = format!("Failed to read from main loop: {err}");
×
UNCOV
1763
                        error!(err_msg);
×
UNCOV
1764
                        panic!("{err_msg}");
×
1765
                    });
1766
                    let close_connection = self
28✔
1767
                        .handle_main_task_message(main_msg, &mut peer, peer_state_info)
28✔
1768
                        .await
28✔
1769
                        .unwrap_or_else(|err| {
28✔
1770
                            warn!("handle_main_task_message returned an error: {err}");
×
1771
                            true
×
1772
                        });
×
1773

1774
                    if close_connection {
28✔
1775
                        info!(
×
1776
                            "handle_main_task_message is closing the connection to {}",
×
1777
                            self.peer_address
1778
                        );
1779
                        break;
×
1780
                    }
28✔
1781
                }
1782
            }
1783
        }
1784

1785
        Ok(())
40✔
1786
    }
41✔
1787

1788
    /// Function called before entering the peer loop. Reads the potentially stored
1789
    /// peer standing from the database and does other book-keeping before entering
1790
    /// its final resting place: the `peer_loop`. Note that the peer has already been
1791
    /// accepted for a connection for this loop to be entered. So we don't need
1792
    /// to check the standing again.
1793
    ///
1794
    /// Locking:
1795
    ///   * acquires `global_state_lock` for write
1796
    pub(crate) async fn run_wrapper<S>(
38✔
1797
        &mut self,
38✔
1798
        mut peer: S,
38✔
1799
        from_main_rx: broadcast::Receiver<MainToPeerTask>,
38✔
1800
    ) -> Result<()>
38✔
1801
    where
38✔
1802
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
38✔
1803
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
38✔
1804
        <S as TryStream>::Error: std::error::Error,
38✔
1805
    {
38✔
1806
        const TIME_DIFFERENCE_WARN_THRESHOLD_IN_SECONDS: i128 = 120;
1807

1808
        let cli_args = self.global_state_lock.cli().clone();
38✔
1809

1810
        let standing = self
38✔
1811
            .global_state_lock
38✔
1812
            .lock_guard()
38✔
1813
            .await
38✔
1814
            .net
1815
            .peer_databases
1816
            .peer_standings
1817
            .get(self.peer_address.ip())
38✔
1818
            .await
38✔
1819
            .unwrap_or_else(|| PeerStanding::new(cli_args.peer_tolerance));
37✔
1820

1821
        // Add peer to peer map
1822
        let peer_connection_info = PeerConnectionInfo::new(
37✔
1823
            self.peer_handshake_data.listen_port,
37✔
1824
            self.peer_address,
37✔
1825
            self.inbound_connection,
37✔
1826
        );
1827
        let new_peer = PeerInfo::new(
37✔
1828
            peer_connection_info,
37✔
1829
            &self.peer_handshake_data,
37✔
1830
            SystemTime::now(),
37✔
1831
            cli_args.peer_tolerance,
37✔
1832
        )
1833
        .with_standing(standing);
37✔
1834

1835
        // If timestamps are different, we currently just log a warning.
1836
        let peer_clock_ahead_in_seconds = new_peer.time_difference_in_seconds();
37✔
1837
        let own_clock_ahead_in_seconds = -peer_clock_ahead_in_seconds;
37✔
1838
        if peer_clock_ahead_in_seconds > TIME_DIFFERENCE_WARN_THRESHOLD_IN_SECONDS
37✔
1839
            || own_clock_ahead_in_seconds > TIME_DIFFERENCE_WARN_THRESHOLD_IN_SECONDS
37✔
1840
        {
1841
            let own_datetime_utc: DateTime<Utc> =
×
1842
                new_peer.own_timestamp_connection_established.into();
×
1843
            let peer_datetime_utc: DateTime<Utc> =
×
1844
                new_peer.peer_timestamp_connection_established.into();
×
1845
            warn!(
×
1846
                "New peer {} disagrees with us about time. Peer reports time {} but our clock at handshake was {}.",
×
1847
                new_peer.connected_address(),
×
1848
                peer_datetime_utc.format("%Y-%m-%d %H:%M:%S"),
×
1849
                own_datetime_utc.format("%Y-%m-%d %H:%M:%S"));
×
1850
        }
37✔
1851

1852
        // Multiple tasks might attempt to set up a connection concurrently. So
1853
        // even though we've checked that this connection is allowed, this check
1854
        // could have been invalidated by another task, for one accepting an
1855
        // incoming connection from a peer we're currently connecting to. So we
1856
        // need to make the a check again while holding a write-lock, since
1857
        // we're modifying `peer_map` here. Holding a read-lock doesn't work
1858
        // since it would have to be dropped before acquiring the write-lock.
1859
        {
1860
            let mut global_state = self.global_state_lock.lock_guard_mut().await;
37✔
1861
            let peer_map = &mut global_state.net.peer_map;
37✔
1862
            if peer_map
37✔
1863
                .values()
37✔
1864
                .any(|pi| pi.instance_id() == self.peer_handshake_data.instance_id)
37✔
1865
            {
1866
                bail!("Attempted to connect to already connected peer. Aborting connection.");
×
1867
            }
37✔
1868

1869
            if peer_map.len() >= cli_args.max_num_peers {
37✔
1870
                bail!("Attempted to connect to more peers than allowed. Aborting connection.");
×
1871
            }
37✔
1872

1873
            if peer_map.contains_key(&self.peer_address) {
37✔
1874
                // This shouldn't be possible, unless the peer reports a different instance ID than
1875
                // for the other connection. Only a malignant client would do that.
1876
                bail!("Already connected to peer. Aborting connection");
×
1877
            }
37✔
1878

1879
            peer_map.insert(self.peer_address, new_peer);
37✔
1880
        }
1881

1882
        // `MutablePeerState` contains the part of the peer-loop's state that is mutable
1883
        let mut peer_state = MutablePeerState::new(self.peer_handshake_data.tip_header.height);
37✔
1884

1885
        // If peer indicates more canonical block, request a block notification to catch up ASAP
1886
        if self.peer_handshake_data.tip_header.cumulative_proof_of_work
37✔
1887
            > self
37✔
1888
                .global_state_lock
37✔
1889
                .lock_guard()
37✔
1890
                .await
37✔
1891
                .chain
1892
                .light_state()
37✔
1893
                .kernel
1894
                .header
1895
                .cumulative_proof_of_work
1896
        {
1897
            // Send block notification request to catch up ASAP, in case we're
1898
            // behind the newly-connected peer.
1899
            peer.send(PeerMessage::BlockNotificationRequest).await?;
×
1900
        }
37✔
1901

1902
        let res = self.run(peer, from_main_rx, &mut peer_state).await;
37✔
1903
        debug!("Exited peer loop for {}", self.peer_address);
31✔
1904

1905
        close_peer_connected_callback(
31✔
1906
            self.global_state_lock.clone(),
31✔
1907
            self.peer_address,
31✔
1908
            &self.to_main_tx,
31✔
1909
        )
31✔
1910
        .await;
31✔
1911

1912
        debug!("Ending peer loop for {}", self.peer_address);
31✔
1913

1914
        // Return any error that `run` returned. Returning and not suppressing errors is a quite nice
1915
        // feature to have for testing purposes.
1916
        res
31✔
1917
    }
31✔
1918

1919
    /// Register graceful peer disconnection in the global state.
1920
    ///
1921
    /// See also [`NetworkingState::register_peer_disconnection`][1].
1922
    ///
1923
    /// # Locking:
1924
    ///   * acquires `global_state_lock` for write
1925
    ///
1926
    /// [1]: crate::models::state::networking_state::NetworkingState::register_peer_disconnection
1927
    async fn register_peer_disconnection(&mut self) {
×
1928
        let peer_id = self.peer_handshake_data.instance_id;
×
1929
        self.global_state_lock
×
1930
            .lock_guard_mut()
×
1931
            .await
×
1932
            .net
1933
            .register_peer_disconnection(peer_id, SystemTime::now());
×
1934
    }
×
1935
}
1936

1937
#[cfg(test)]
1938
#[cfg_attr(coverage_nightly, coverage(off))]
1939
mod tests {
1940
    use macro_rules_attr::apply;
1941
    use rand::rngs::StdRng;
1942
    use rand::Rng;
1943
    use rand::SeedableRng;
1944
    use tokio::sync::mpsc::error::TryRecvError;
1945
    use tracing_test::traced_test;
1946

1947
    use super::*;
1948
    use crate::config_models::cli_args;
1949
    use crate::config_models::network::Network;
1950
    use crate::models::blockchain::type_scripts::native_currency_amount::NativeCurrencyAmount;
1951
    use crate::models::peer::peer_block_notifications::PeerBlockNotification;
1952
    use crate::models::peer::transaction_notification::TransactionNotification;
1953
    use crate::models::state::mempool::TransactionOrigin;
1954
    use crate::models::state::tx_creation_config::TxCreationConfig;
1955
    use crate::models::state::tx_proving_capability::TxProvingCapability;
1956
    use crate::models::state::wallet::wallet_entropy::WalletEntropy;
1957
    use crate::tests::shared::fake_valid_block_for_tests;
1958
    use crate::tests::shared::fake_valid_sequence_of_blocks_for_tests;
1959
    use crate::tests::shared::get_dummy_handshake_data_for_genesis;
1960
    use crate::tests::shared::get_dummy_peer_connection_data_genesis;
1961
    use crate::tests::shared::get_dummy_socket_address;
1962
    use crate::tests::shared::get_test_genesis_setup;
1963
    use crate::tests::shared::invalid_empty_single_proof_transaction;
1964
    use crate::tests::shared::Action;
1965
    use crate::tests::shared::Mock;
1966
    use crate::tests::shared_tokio_runtime;
1967

1968
    #[traced_test]
1969
    #[apply(shared_tokio_runtime)]
1970
    async fn test_peer_loop_bye() -> Result<()> {
1971
        let mock = Mock::new(vec![Action::Read(PeerMessage::Bye)]);
1972

1973
        let (peer_broadcast_tx, _from_main_rx_clone, to_main_tx, _to_main_rx1, state_lock, hsd) =
1974
            get_test_genesis_setup(Network::Beta, 2, cli_args::Args::default()).await?;
1975

1976
        let peer_address = get_dummy_socket_address(2);
1977
        let from_main_rx_clone = peer_broadcast_tx.subscribe();
1978
        let mut peer_loop_handler =
1979
            PeerLoopHandler::new(to_main_tx, state_lock.clone(), peer_address, hsd, true, 1);
1980
        peer_loop_handler
1981
            .run_wrapper(mock, from_main_rx_clone)
1982
            .await?;
1983

1984
        assert_eq!(
1985
            2,
1986
            state_lock.lock_guard().await.net.peer_map.len(),
1987
            "peer map length must be back to 2 after goodbye"
1988
        );
1989

1990
        Ok(())
1991
    }
1992

1993
    #[traced_test]
1994
    #[apply(shared_tokio_runtime)]
1995
    async fn test_peer_loop_peer_list() {
1996
        let (peer_broadcast_tx, _from_main_rx_clone, to_main_tx, _to_main_rx1, state_lock, _hsd) =
1997
            get_test_genesis_setup(Network::Beta, 2, cli_args::Args::default())
1998
                .await
1999
                .unwrap();
2000

2001
        let mut peer_infos = state_lock
2002
            .lock_guard()
2003
            .await
2004
            .net
2005
            .peer_map
2006
            .clone()
2007
            .into_values()
2008
            .collect::<Vec<_>>();
2009
        peer_infos.sort_by_cached_key(|x| x.connected_address());
2010
        let (peer_address0, instance_id0) = (
2011
            peer_infos[0].connected_address(),
2012
            peer_infos[0].instance_id(),
2013
        );
2014
        let (peer_address1, instance_id1) = (
2015
            peer_infos[1].connected_address(),
2016
            peer_infos[1].instance_id(),
2017
        );
2018

2019
        let (hsd2, sa2) = get_dummy_peer_connection_data_genesis(Network::Beta, 2);
2020
        let expected_response = vec![
2021
            (peer_address0, instance_id0),
2022
            (peer_address1, instance_id1),
2023
            (sa2, hsd2.instance_id),
2024
        ];
2025
        let mock = Mock::new(vec![
2026
            Action::Read(PeerMessage::PeerListRequest),
2027
            Action::Write(PeerMessage::PeerListResponse(expected_response)),
2028
            Action::Read(PeerMessage::Bye),
2029
        ]);
2030

2031
        let from_main_rx_clone = peer_broadcast_tx.subscribe();
2032

2033
        let mut peer_loop_handler =
2034
            PeerLoopHandler::new(to_main_tx, state_lock.clone(), sa2, hsd2, true, 0);
2035
        peer_loop_handler
2036
            .run_wrapper(mock, from_main_rx_clone)
2037
            .await
2038
            .unwrap();
2039

2040
        assert_eq!(
2041
            2,
2042
            state_lock.lock_guard().await.net.peer_map.len(),
2043
            "peer map must have length 2 after saying goodbye to peer 2"
2044
        );
2045
    }
2046

2047
    #[traced_test]
2048
    #[apply(shared_tokio_runtime)]
2049
    async fn different_genesis_test() -> Result<()> {
2050
        // In this scenario a peer provides another genesis block than what has been
2051
        // hardcoded. This should lead to the closing of the connection to this peer
2052
        // and a ban.
2053

2054
        let network = Network::Main;
2055
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, mut to_main_rx1, state_lock, hsd) =
2056
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2057
        assert_eq!(1000, state_lock.cli().peer_tolerance);
2058
        let peer_address = get_dummy_socket_address(0);
2059

2060
        // Although the database is empty, `get_latest_block` still returns the genesis block,
2061
        // since that block is hardcoded.
2062
        let mut different_genesis_block = state_lock
2063
            .lock_guard()
2064
            .await
2065
            .chain
2066
            .archival_state()
2067
            .get_tip()
2068
            .await;
2069

2070
        different_genesis_block.set_header_nonce(StdRng::seed_from_u64(5550001).random());
2071
        let [block_1_with_different_genesis] = fake_valid_sequence_of_blocks_for_tests(
2072
            &different_genesis_block,
2073
            Timestamp::hours(1),
2074
            StdRng::seed_from_u64(5550001).random(),
2075
            network,
2076
        )
2077
        .await;
2078
        let mock = Mock::new(vec![Action::Read(PeerMessage::Block(Box::new(
2079
            block_1_with_different_genesis.try_into().unwrap(),
2080
        )))]);
2081

2082
        let mut peer_loop_handler = PeerLoopHandler::new(
2083
            to_main_tx.clone(),
2084
            state_lock.clone(),
2085
            peer_address,
2086
            hsd,
2087
            true,
2088
            1,
2089
        );
2090
        let res = peer_loop_handler
2091
            .run_wrapper(mock, from_main_rx_clone)
2092
            .await;
2093
        assert!(
2094
            res.is_err(),
2095
            "run_wrapper must return failure when genesis is different"
2096
        );
2097

2098
        match to_main_rx1.recv().await {
2099
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
2100
            _ => bail!("Must receive remove of peer block max height"),
2101
        }
2102

2103
        // Verify that no further message was sent to main loop
2104
        match to_main_rx1.try_recv() {
2105
            Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (),
2106
            _ => bail!("Block notification must not be sent for block with invalid PoW"),
2107
        };
2108

2109
        drop(to_main_tx);
2110

2111
        let peer_standing = state_lock
2112
            .lock_guard()
2113
            .await
2114
            .net
2115
            .get_peer_standing_from_database(peer_address.ip())
2116
            .await;
2117
        assert_eq!(
2118
            -i32::from(state_lock.cli().peer_tolerance),
2119
            peer_standing.unwrap().standing
2120
        );
2121
        assert_eq!(
2122
            NegativePeerSanction::DifferentGenesis,
2123
            peer_standing.unwrap().latest_punishment.unwrap().0
2124
        );
2125

2126
        Ok(())
2127
    }
2128

2129
    #[traced_test]
2130
    #[apply(shared_tokio_runtime)]
2131
    async fn node_does_not_record_disconnection_time_when_peer_initiates_disconnect() -> Result<()>
2132
    {
2133
        let args = cli_args::Args::default();
2134
        let network = args.network;
2135
        let (from_main_tx, from_main_rx, to_main_tx, to_main_rx, state_lock, _) =
2136
            get_test_genesis_setup(network, 0, args).await?;
2137

2138
        let peer_address = get_dummy_socket_address(0);
2139
        let peer_handshake_data = get_dummy_handshake_data_for_genesis(network);
2140
        let peer_id = peer_handshake_data.instance_id;
2141
        let mut peer_loop_handler = PeerLoopHandler::new(
2142
            to_main_tx,
2143
            state_lock.clone(),
2144
            peer_address,
2145
            peer_handshake_data,
2146
            true,
2147
            1,
2148
        );
2149
        let mock = Mock::new(vec![Action::Read(PeerMessage::Bye)]);
2150
        peer_loop_handler.run_wrapper(mock, from_main_rx).await?;
2151

2152
        let global_state = state_lock.lock_guard().await;
2153
        assert!(global_state
2154
            .net
2155
            .last_disconnection_time_of_peer(peer_id)
2156
            .is_none());
2157

2158
        drop(to_main_rx);
2159
        drop(from_main_tx);
2160

2161
        Ok(())
2162
    }
2163

2164
    #[traced_test]
2165
    #[apply(shared_tokio_runtime)]
2166
    async fn block_without_valid_pow_test() -> Result<()> {
2167
        // In this scenario, a block without a valid PoW is received. This block should be rejected
2168
        // by the peer loop and a notification should never reach the main loop.
2169

2170
        let network = Network::Main;
2171
        let (peer_broadcast_tx, _from_main_rx_clone, to_main_tx, mut to_main_rx1, state_lock, hsd) =
2172
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2173
        let peer_address = get_dummy_socket_address(0);
2174
        let genesis_block: Block = state_lock
2175
            .lock_guard()
2176
            .await
2177
            .chain
2178
            .archival_state()
2179
            .get_tip()
2180
            .await;
2181

2182
        // Make a with hash above what the implied threshold from
2183
        let [mut block_without_valid_pow] = fake_valid_sequence_of_blocks_for_tests(
2184
            &genesis_block,
2185
            Timestamp::hours(1),
2186
            StdRng::seed_from_u64(5550001).random(),
2187
            network,
2188
        )
2189
        .await;
2190

2191
        // This *probably* is invalid PoW -- and needs to be for this test to
2192
        // work.
2193
        block_without_valid_pow.set_header_nonce(Digest::default());
2194

2195
        // Sending an invalid block will not necessarily result in a ban. This depends on the peer
2196
        // tolerance that is set in the client. For this reason, we include a "Bye" here.
2197
        let mock = Mock::new(vec![
2198
            Action::Read(PeerMessage::Block(Box::new(
2199
                block_without_valid_pow.clone().try_into().unwrap(),
2200
            ))),
2201
            Action::Read(PeerMessage::Bye),
2202
        ]);
2203

2204
        let from_main_rx_clone = peer_broadcast_tx.subscribe();
2205

2206
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
2207
            to_main_tx.clone(),
2208
            state_lock.clone(),
2209
            peer_address,
2210
            hsd,
2211
            true,
2212
            1,
2213
            block_without_valid_pow.header().timestamp,
2214
        );
2215
        peer_loop_handler
2216
            .run_wrapper(mock, from_main_rx_clone)
2217
            .await
2218
            .expect("sending (one) invalid block should not result in closed connection");
2219

2220
        match to_main_rx1.recv().await {
2221
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
2222
            _ => bail!("Must receive remove of peer block max height"),
2223
        }
2224

2225
        // Verify that no further message was sent to main loop
2226
        match to_main_rx1.try_recv() {
2227
            Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (),
2228
            _ => bail!("Block notification must not be sent for block with invalid PoW"),
2229
        };
2230

2231
        // We need to have the transmitter in scope until we have received from it
2232
        // otherwise the receiver will report the disconnected error when we attempt
2233
        // to read from it. And the purpose is to verify that the channel is empty,
2234
        // not that it has been closed.
2235
        drop(to_main_tx);
2236

2237
        // Verify that peer standing was stored in database
2238
        let standing = state_lock
2239
            .lock_guard()
2240
            .await
2241
            .net
2242
            .peer_databases
2243
            .peer_standings
2244
            .get(peer_address.ip())
2245
            .await
2246
            .unwrap();
2247
        assert!(
2248
            standing.standing < 0,
2249
            "Peer must be sanctioned for sending a bad block"
2250
        );
2251

2252
        Ok(())
2253
    }
2254

2255
    #[traced_test]
2256
    #[apply(shared_tokio_runtime)]
2257
    async fn test_peer_loop_block_with_block_in_db() -> Result<()> {
2258
        // The scenario tested here is that a client receives a block that is already
2259
        // known and stored. The expected behavior is to ignore the block and not send
2260
        // a message to the main task.
2261

2262
        let network = Network::Main;
2263
        let (peer_broadcast_tx, _from_main_rx_clone, to_main_tx, mut to_main_rx1, mut alice, hsd) =
2264
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2265
        let peer_address = get_dummy_socket_address(0);
2266
        let genesis_block: Block = Block::genesis(network);
2267

2268
        let now = genesis_block.header().timestamp + Timestamp::hours(1);
2269
        let block_1 =
2270
            fake_valid_block_for_tests(&alice, StdRng::seed_from_u64(5550001).random()).await;
2271
        assert!(
2272
            block_1.is_valid(&genesis_block, now, network).await,
2273
            "Block must be valid for this test to make sense"
2274
        );
2275
        alice.set_new_tip(block_1.clone()).await?;
2276

2277
        let mock_peer_messages = Mock::new(vec![
2278
            Action::Read(PeerMessage::Block(Box::new(
2279
                block_1.clone().try_into().unwrap(),
2280
            ))),
2281
            Action::Read(PeerMessage::Bye),
2282
        ]);
2283

2284
        let from_main_rx_clone = peer_broadcast_tx.subscribe();
2285

2286
        let mut alice_peer_loop_handler = PeerLoopHandler::with_mocked_time(
2287
            to_main_tx.clone(),
2288
            alice.clone(),
2289
            peer_address,
2290
            hsd,
2291
            false,
2292
            1,
2293
            block_1.header().timestamp,
2294
        );
2295
        alice_peer_loop_handler
2296
            .run_wrapper(mock_peer_messages, from_main_rx_clone)
2297
            .await?;
2298

2299
        match to_main_rx1.recv().await {
2300
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
2301
            other => bail!("Must receive remove of peer block max height. Got:\n {other:?}"),
2302
        }
2303
        match to_main_rx1.try_recv() {
2304
            Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (),
2305
            _ => bail!("Block notification must not be sent for block with invalid PoW"),
2306
        };
2307
        drop(to_main_tx);
2308

2309
        if !alice.lock_guard().await.net.peer_map.is_empty() {
2310
            bail!("peer map must be empty after closing connection gracefully");
2311
        }
2312

2313
        Ok(())
2314
    }
2315

2316
    #[traced_test]
2317
    #[apply(shared_tokio_runtime)]
2318
    async fn block_request_batch_simple() {
2319
        // Scenario: Six blocks (including genesis) are known. Peer requests
2320
        // from all possible starting points, and client responds with the
2321
        // correct list of blocks.
2322
        let network = Network::Main;
2323
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, mut state_lock, hsd) =
2324
            get_test_genesis_setup(network, 0, cli_args::Args::default())
2325
                .await
2326
                .unwrap();
2327
        let genesis_block: Block = Block::genesis(network);
2328
        let peer_address = get_dummy_socket_address(0);
2329
        let [block_1, block_2, block_3, block_4, block_5] =
2330
            fake_valid_sequence_of_blocks_for_tests(
2331
                &genesis_block,
2332
                Timestamp::hours(1),
2333
                StdRng::seed_from_u64(5550001).random(),
2334
                network,
2335
            )
2336
            .await;
2337
        let blocks = vec![
2338
            genesis_block,
2339
            block_1,
2340
            block_2,
2341
            block_3,
2342
            block_4,
2343
            block_5.clone(),
2344
        ];
2345
        for block in blocks.iter().skip(1) {
2346
            state_lock.set_new_tip(block.to_owned()).await.unwrap();
2347
        }
2348

2349
        let mmra = state_lock
2350
            .lock_guard()
2351
            .await
2352
            .chain
2353
            .archival_state()
2354
            .archival_block_mmr
2355
            .ammr()
2356
            .to_accumulator_async()
2357
            .await;
2358
        for i in 0..=4 {
2359
            let expected_response = {
2360
                let state = state_lock.lock_guard().await;
2361
                let blocks_for_response = blocks.iter().skip(i + 1).cloned().collect_vec();
2362
                PeerLoopHandler::batch_response(&state, blocks_for_response, &mmra)
2363
                    .await
2364
                    .unwrap()
2365
            };
2366
            let mock = Mock::new(vec![
2367
                Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch {
2368
                    known_blocks: vec![blocks[i].hash()],
2369
                    max_response_len: 14,
2370
                    anchor: mmra.clone(),
2371
                })),
2372
                Action::Write(PeerMessage::BlockResponseBatch(expected_response)),
2373
                Action::Read(PeerMessage::Bye),
2374
            ]);
2375
            let mut peer_loop_handler = PeerLoopHandler::new(
2376
                to_main_tx.clone(),
2377
                state_lock.clone(),
2378
                peer_address,
2379
                hsd.clone(),
2380
                false,
2381
                1,
2382
            );
2383

2384
            peer_loop_handler
2385
                .run_wrapper(mock, from_main_rx_clone.resubscribe())
2386
                .await
2387
                .unwrap();
2388
        }
2389
    }
2390

2391
    #[traced_test]
2392
    #[apply(shared_tokio_runtime)]
2393
    async fn block_request_batch_in_order_test() -> Result<()> {
2394
        // Scenario: A fork began at block 2, node knows two blocks of height 2 and two of height 3.
2395
        // A peer requests a batch of blocks starting from block 1. Ensure that the correct blocks
2396
        // are returned.
2397

2398
        let network = Network::Main;
2399
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, mut state_lock, hsd) =
2400
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2401
        let genesis_block: Block = Block::genesis(network);
2402
        let peer_address = get_dummy_socket_address(0);
2403
        let [block_1, block_2_a, block_3_a] = fake_valid_sequence_of_blocks_for_tests(
2404
            &genesis_block,
2405
            Timestamp::hours(1),
2406
            StdRng::seed_from_u64(5550001).random(),
2407
            network,
2408
        )
2409
        .await;
2410
        let [block_2_b, block_3_b] = fake_valid_sequence_of_blocks_for_tests(
2411
            &block_1,
2412
            Timestamp::hours(1),
2413
            StdRng::seed_from_u64(5550002).random(),
2414
            network,
2415
        )
2416
        .await;
2417
        assert_ne!(block_2_b.hash(), block_2_a.hash());
2418

2419
        state_lock.set_new_tip(block_1.clone()).await?;
2420
        state_lock.set_new_tip(block_2_a.clone()).await?;
2421
        state_lock.set_new_tip(block_2_b.clone()).await?;
2422
        state_lock.set_new_tip(block_3_b.clone()).await?;
2423
        state_lock.set_new_tip(block_3_a.clone()).await?;
2424

2425
        let anchor = state_lock
2426
            .lock_guard()
2427
            .await
2428
            .chain
2429
            .archival_state()
2430
            .archival_block_mmr
2431
            .ammr()
2432
            .to_accumulator_async()
2433
            .await;
2434
        let response_1 = {
2435
            let state_lock = state_lock.lock_guard().await;
2436
            PeerLoopHandler::batch_response(
2437
                &state_lock,
2438
                vec![block_1.clone(), block_2_a.clone(), block_3_a.clone()],
2439
                &anchor,
2440
            )
2441
            .await
2442
            .unwrap()
2443
        };
2444

2445
        let mut mock = Mock::new(vec![
2446
            Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch {
2447
                known_blocks: vec![genesis_block.hash()],
2448
                max_response_len: 14,
2449
                anchor: anchor.clone(),
2450
            })),
2451
            Action::Write(PeerMessage::BlockResponseBatch(response_1)),
2452
            Action::Read(PeerMessage::Bye),
2453
        ]);
2454

2455
        let mut peer_loop_handler_1 = PeerLoopHandler::with_mocked_time(
2456
            to_main_tx.clone(),
2457
            state_lock.clone(),
2458
            peer_address,
2459
            hsd.clone(),
2460
            false,
2461
            1,
2462
            block_3_a.header().timestamp,
2463
        );
2464

2465
        peer_loop_handler_1
2466
            .run_wrapper(mock, from_main_rx_clone.resubscribe())
2467
            .await?;
2468

2469
        // Peer knows block 2_b, verify that canonical chain with 2_a is returned
2470
        let response_2 = {
2471
            let state_lock = state_lock.lock_guard().await;
2472
            PeerLoopHandler::batch_response(
2473
                &state_lock,
2474
                vec![block_2_a, block_3_a.clone()],
2475
                &anchor,
2476
            )
2477
            .await
2478
            .unwrap()
2479
        };
2480
        mock = Mock::new(vec![
2481
            Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch {
2482
                known_blocks: vec![block_2_b.hash(), block_1.hash(), genesis_block.hash()],
2483
                max_response_len: 14,
2484
                anchor,
2485
            })),
2486
            Action::Write(PeerMessage::BlockResponseBatch(response_2)),
2487
            Action::Read(PeerMessage::Bye),
2488
        ]);
2489

2490
        let mut peer_loop_handler_2 = PeerLoopHandler::with_mocked_time(
2491
            to_main_tx.clone(),
2492
            state_lock.clone(),
2493
            peer_address,
2494
            hsd,
2495
            false,
2496
            1,
2497
            block_3_a.header().timestamp,
2498
        );
2499

2500
        peer_loop_handler_2
2501
            .run_wrapper(mock, from_main_rx_clone)
2502
            .await?;
2503

2504
        Ok(())
2505
    }
2506

2507
    #[traced_test]
2508
    #[apply(shared_tokio_runtime)]
2509
    async fn block_request_batch_out_of_order_test() -> Result<()> {
2510
        // Scenario: A fork began at block 2, node knows two blocks of height 2 and two of height 3.
2511
        // A peer requests a batch of blocks starting from block 1, but the peer supplies their
2512
        // hashes in a wrong order. Ensure that the correct blocks are returned, in the right order.
2513
        // The blocks will be supplied in the correct order but starting from the first digest in
2514
        // the list that is known and canonical.
2515

2516
        let network = Network::Main;
2517
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, mut state_lock, hsd) =
2518
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2519
        let genesis_block = Block::genesis(network);
2520
        let peer_address = get_dummy_socket_address(0);
2521
        let [block_1, block_2_a, block_3_a] = fake_valid_sequence_of_blocks_for_tests(
2522
            &genesis_block,
2523
            Timestamp::hours(1),
2524
            StdRng::seed_from_u64(5550001).random(),
2525
            network,
2526
        )
2527
        .await;
2528
        let [block_2_b, block_3_b] = fake_valid_sequence_of_blocks_for_tests(
2529
            &block_1,
2530
            Timestamp::hours(1),
2531
            StdRng::seed_from_u64(5550002).random(),
2532
            network,
2533
        )
2534
        .await;
2535
        assert_ne!(block_2_a.hash(), block_2_b.hash());
2536

2537
        state_lock.set_new_tip(block_1.clone()).await?;
2538
        state_lock.set_new_tip(block_2_a.clone()).await?;
2539
        state_lock.set_new_tip(block_2_b.clone()).await?;
2540
        state_lock.set_new_tip(block_3_b.clone()).await?;
2541
        state_lock.set_new_tip(block_3_a.clone()).await?;
2542

2543
        // Peer knows block 2_b, verify that canonical chain with 2_a is returned
2544
        let mut expected_anchor = block_3_a.body().block_mmr_accumulator.clone();
2545
        expected_anchor.append(block_3_a.hash());
2546
        let state_anchor = state_lock
2547
            .lock_guard()
2548
            .await
2549
            .chain
2550
            .archival_state()
2551
            .archival_block_mmr
2552
            .ammr()
2553
            .to_accumulator_async()
2554
            .await;
2555
        assert_eq!(
2556
            expected_anchor, state_anchor,
2557
            "Catching assumption about MMRA in tip and in archival state"
2558
        );
2559

2560
        let response = {
2561
            let state_lock = state_lock.lock_guard().await;
2562
            PeerLoopHandler::batch_response(
2563
                &state_lock,
2564
                vec![block_1.clone(), block_2_a, block_3_a.clone()],
2565
                &expected_anchor,
2566
            )
2567
            .await
2568
            .unwrap()
2569
        };
2570
        let mock = Mock::new(vec![
2571
            Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch {
2572
                known_blocks: vec![block_2_b.hash(), genesis_block.hash(), block_1.hash()],
2573
                max_response_len: 14,
2574
                anchor: expected_anchor,
2575
            })),
2576
            // Since genesis block is the 1st known in the list of known blocks,
2577
            // it's immediate descendent, block_1, is the first one returned.
2578
            Action::Write(PeerMessage::BlockResponseBatch(response)),
2579
            Action::Read(PeerMessage::Bye),
2580
        ]);
2581

2582
        let mut peer_loop_handler_2 = PeerLoopHandler::with_mocked_time(
2583
            to_main_tx.clone(),
2584
            state_lock.clone(),
2585
            peer_address,
2586
            hsd,
2587
            false,
2588
            1,
2589
            block_3_a.header().timestamp,
2590
        );
2591

2592
        peer_loop_handler_2
2593
            .run_wrapper(mock, from_main_rx_clone)
2594
            .await?;
2595

2596
        Ok(())
2597
    }
2598

2599
    #[traced_test]
2600
    #[apply(shared_tokio_runtime)]
2601
    async fn request_unknown_height_doesnt_crash() {
2602
        // Scenario: Only genesis block is known. Peer requests block of height
2603
        // 2.
2604
        let network = Network::Main;
2605
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, state_lock, hsd) =
2606
            get_test_genesis_setup(network, 0, cli_args::Args::default())
2607
                .await
2608
                .unwrap();
2609
        let peer_address = get_dummy_socket_address(0);
2610
        let mock = Mock::new(vec![
2611
            Action::Read(PeerMessage::BlockRequestByHeight(2.into())),
2612
            Action::Read(PeerMessage::Bye),
2613
        ]);
2614

2615
        let mut peer_loop_handler = PeerLoopHandler::new(
2616
            to_main_tx.clone(),
2617
            state_lock.clone(),
2618
            peer_address,
2619
            hsd,
2620
            false,
2621
            1,
2622
        );
2623

2624
        // This will return error if seen read/write order does not match that of the
2625
        // mocked object.
2626
        peer_loop_handler
2627
            .run_wrapper(mock, from_main_rx_clone)
2628
            .await
2629
            .unwrap();
2630

2631
        // Verify that peer is sanctioned for this nonsense.
2632
        assert!(state_lock
2633
            .lock_guard()
2634
            .await
2635
            .net
2636
            .get_peer_standing_from_database(peer_address.ip())
2637
            .await
2638
            .unwrap()
2639
            .standing
2640
            .is_negative());
2641
    }
2642

2643
    #[traced_test]
2644
    #[apply(shared_tokio_runtime)]
2645
    async fn find_canonical_chain_when_multiple_blocks_at_same_height_test() -> Result<()> {
2646
        // Scenario: A fork began at block 2, node knows two blocks of height 2 and two of height 3.
2647
        // A peer requests a block at height 2. Verify that the correct block at height 2 is
2648
        // returned.
2649

2650
        let network = Network::Main;
2651
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, mut state_lock, hsd) =
2652
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2653
        let genesis_block = Block::genesis(network);
2654
        let peer_address = get_dummy_socket_address(0);
2655

2656
        let [block_1, block_2_a, block_3_a] = fake_valid_sequence_of_blocks_for_tests(
2657
            &genesis_block,
2658
            Timestamp::hours(1),
2659
            StdRng::seed_from_u64(5550001).random(),
2660
            network,
2661
        )
2662
        .await;
2663
        let [block_2_b, block_3_b] = fake_valid_sequence_of_blocks_for_tests(
2664
            &block_1,
2665
            Timestamp::hours(1),
2666
            StdRng::seed_from_u64(5550002).random(),
2667
            network,
2668
        )
2669
        .await;
2670
        assert_ne!(block_2_a.hash(), block_2_b.hash());
2671

2672
        state_lock.set_new_tip(block_1.clone()).await?;
2673
        state_lock.set_new_tip(block_2_a.clone()).await?;
2674
        state_lock.set_new_tip(block_2_b.clone()).await?;
2675
        state_lock.set_new_tip(block_3_b.clone()).await?;
2676
        state_lock.set_new_tip(block_3_a.clone()).await?;
2677

2678
        let mock = Mock::new(vec![
2679
            Action::Read(PeerMessage::BlockRequestByHeight(2.into())),
2680
            Action::Write(PeerMessage::Block(Box::new(block_2_a.try_into().unwrap()))),
2681
            Action::Read(PeerMessage::BlockRequestByHeight(3.into())),
2682
            Action::Write(PeerMessage::Block(Box::new(
2683
                block_3_a.clone().try_into().unwrap(),
2684
            ))),
2685
            Action::Read(PeerMessage::Bye),
2686
        ]);
2687

2688
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
2689
            to_main_tx.clone(),
2690
            state_lock.clone(),
2691
            peer_address,
2692
            hsd,
2693
            false,
2694
            1,
2695
            block_3_a.header().timestamp,
2696
        );
2697

2698
        // This will return error if seen read/write order does not match that of the
2699
        // mocked object.
2700
        peer_loop_handler
2701
            .run_wrapper(mock, from_main_rx_clone)
2702
            .await?;
2703

2704
        Ok(())
2705
    }
2706

2707
    #[traced_test]
2708
    #[apply(shared_tokio_runtime)]
2709
    async fn receival_of_block_notification_height_1() {
2710
        // Scenario: client only knows genesis block. Then receives block
2711
        // notification of height 1. Must request block 1.
2712
        let network = Network::Main;
2713
        let mut rng = StdRng::seed_from_u64(5552401);
2714
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, to_main_rx1, state_lock, hsd) =
2715
            get_test_genesis_setup(network, 0, cli_args::Args::default())
2716
                .await
2717
                .unwrap();
2718
        let block_1 = fake_valid_block_for_tests(&state_lock, rng.random()).await;
2719
        let notification_height1 = (&block_1).into();
2720
        let mock = Mock::new(vec![
2721
            Action::Read(PeerMessage::BlockNotification(notification_height1)),
2722
            Action::Write(PeerMessage::BlockRequestByHeight(1u64.into())),
2723
            Action::Read(PeerMessage::Bye),
2724
        ]);
2725

2726
        let peer_address = get_dummy_socket_address(0);
2727
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
2728
            to_main_tx.clone(),
2729
            state_lock.clone(),
2730
            peer_address,
2731
            hsd,
2732
            false,
2733
            1,
2734
            block_1.header().timestamp,
2735
        );
2736
        peer_loop_handler
2737
            .run_wrapper(mock, from_main_rx_clone)
2738
            .await
2739
            .unwrap();
2740

2741
        drop(to_main_rx1);
2742
    }
2743

2744
    #[traced_test]
2745
    #[apply(shared_tokio_runtime)]
2746
    async fn receive_block_request_by_height_block_7() {
2747
        // Scenario: client only knows blocks up to height 7. Then receives block-
2748
        // request-by-height for height 7. Must respond with block 7.
2749
        let network = Network::Main;
2750
        let mut rng = StdRng::seed_from_u64(5552401);
2751
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, to_main_rx1, mut state_lock, hsd) =
2752
            get_test_genesis_setup(network, 0, cli_args::Args::default())
2753
                .await
2754
                .unwrap();
2755
        let genesis_block = Block::genesis(network);
2756
        let blocks: [Block; 7] = fake_valid_sequence_of_blocks_for_tests(
2757
            &genesis_block,
2758
            Timestamp::hours(1),
2759
            rng.random(),
2760
            network,
2761
        )
2762
        .await;
2763
        let block7 = blocks.last().unwrap().to_owned();
2764
        let tip_height: u64 = block7.header().height.into();
2765
        assert_eq!(7, tip_height);
2766

2767
        for block in &blocks {
2768
            state_lock.set_new_tip(block.to_owned()).await.unwrap();
2769
        }
2770

2771
        let block7_response = PeerMessage::Block(Box::new(block7.try_into().unwrap()));
2772
        let mock = Mock::new(vec![
2773
            Action::Read(PeerMessage::BlockRequestByHeight(7u64.into())),
2774
            Action::Write(block7_response),
2775
            Action::Read(PeerMessage::Bye),
2776
        ]);
2777

2778
        let peer_address = get_dummy_socket_address(0);
2779
        let mut peer_loop_handler = PeerLoopHandler::new(
2780
            to_main_tx.clone(),
2781
            state_lock.clone(),
2782
            peer_address,
2783
            hsd,
2784
            false,
2785
            1,
2786
        );
2787
        peer_loop_handler
2788
            .run_wrapper(mock, from_main_rx_clone)
2789
            .await
2790
            .unwrap();
2791

2792
        drop(to_main_rx1);
2793
    }
2794

2795
    #[traced_test]
2796
    #[apply(shared_tokio_runtime)]
2797
    async fn test_peer_loop_receival_of_first_block() -> Result<()> {
2798
        // Scenario: client only knows genesis block. Then receives block 1.
2799

2800
        let network = Network::Main;
2801
        let mut rng = StdRng::seed_from_u64(5550001);
2802
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, mut to_main_rx1, state_lock, hsd) =
2803
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2804
        let peer_address = get_dummy_socket_address(0);
2805

2806
        let block_1 = fake_valid_block_for_tests(&state_lock, rng.random()).await;
2807
        let mock = Mock::new(vec![
2808
            Action::Read(PeerMessage::Block(Box::new(
2809
                block_1.clone().try_into().unwrap(),
2810
            ))),
2811
            Action::Read(PeerMessage::Bye),
2812
        ]);
2813

2814
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
2815
            to_main_tx.clone(),
2816
            state_lock.clone(),
2817
            peer_address,
2818
            hsd,
2819
            false,
2820
            1,
2821
            block_1.header().timestamp,
2822
        );
2823
        peer_loop_handler
2824
            .run_wrapper(mock, from_main_rx_clone)
2825
            .await?;
2826

2827
        // Verify that a block was sent to `main_loop`
2828
        match to_main_rx1.recv().await {
2829
            Some(PeerTaskToMain::NewBlocks(_block)) => (),
2830
            _ => bail!("Did not find msg sent to main task"),
2831
        };
2832

2833
        match to_main_rx1.recv().await {
2834
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
2835
            _ => bail!("Must receive remove of peer block max height"),
2836
        }
2837

2838
        if !state_lock.lock_guard().await.net.peer_map.is_empty() {
2839
            bail!("peer map must be empty after closing connection gracefully");
2840
        }
2841

2842
        Ok(())
2843
    }
2844

2845
    #[traced_test]
2846
    #[apply(shared_tokio_runtime)]
2847
    async fn test_peer_loop_receival_of_second_block_no_blocks_in_db() -> Result<()> {
2848
        // In this scenario, the client only knows the genesis block (block 0) and then
2849
        // receives block 2, meaning that block 1 will have to be requested.
2850

2851
        let network = Network::Main;
2852
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, mut to_main_rx1, state_lock, hsd) =
2853
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
2854
        let peer_address = get_dummy_socket_address(0);
2855
        let genesis_block: Block = state_lock
2856
            .lock_guard()
2857
            .await
2858
            .chain
2859
            .archival_state()
2860
            .get_tip()
2861
            .await;
2862
        let [block_1, block_2] = fake_valid_sequence_of_blocks_for_tests(
2863
            &genesis_block,
2864
            Timestamp::hours(1),
2865
            StdRng::seed_from_u64(5550001).random(),
2866
            network,
2867
        )
2868
        .await;
2869

2870
        let mock = Mock::new(vec![
2871
            Action::Read(PeerMessage::Block(Box::new(
2872
                block_2.clone().try_into().unwrap(),
2873
            ))),
2874
            Action::Write(PeerMessage::BlockRequestByHash(block_1.hash())),
2875
            Action::Read(PeerMessage::Block(Box::new(
2876
                block_1.clone().try_into().unwrap(),
2877
            ))),
2878
            Action::Read(PeerMessage::Bye),
2879
        ]);
2880

2881
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
2882
            to_main_tx.clone(),
2883
            state_lock.clone(),
2884
            peer_address,
2885
            hsd,
2886
            true,
2887
            1,
2888
            block_2.header().timestamp,
2889
        );
2890
        peer_loop_handler
2891
            .run_wrapper(mock, from_main_rx_clone)
2892
            .await?;
2893

2894
        match to_main_rx1.recv().await {
2895
            Some(PeerTaskToMain::NewBlocks(blocks)) => {
2896
                if blocks[0].hash() != block_1.hash() {
2897
                    bail!("1st received block by main loop must be block 1");
2898
                }
2899
                if blocks[1].hash() != block_2.hash() {
2900
                    bail!("2nd received block by main loop must be block 2");
2901
                }
2902
            }
2903
            _ => bail!("Did not find msg sent to main task 1"),
2904
        };
2905
        match to_main_rx1.recv().await {
2906
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
2907
            _ => bail!("Must receive remove of peer block max height"),
2908
        }
2909

2910
        if !state_lock.lock_guard().await.net.peer_map.is_empty() {
2911
            bail!("peer map must be empty after closing connection gracefully");
2912
        }
2913

2914
        Ok(())
2915
    }
2916

2917
    #[traced_test]
2918
    #[apply(shared_tokio_runtime)]
2919
    async fn prevent_ram_exhaustion_test() -> Result<()> {
2920
        // In this scenario the peer sends more blocks than the client allows to store in the
2921
        // fork-reconciliation field. This should result in abandonment of the fork-reconciliation
2922
        // process as the alternative is that the program will crash because it runs out of RAM.
2923

2924
        let network = Network::Main;
2925
        let mut rng = StdRng::seed_from_u64(5550001);
2926
        let (
2927
            _peer_broadcast_tx,
2928
            from_main_rx_clone,
2929
            to_main_tx,
2930
            mut to_main_rx1,
2931
            mut state_lock,
2932
            _hsd,
2933
        ) = get_test_genesis_setup(network, 1, cli_args::Args::default()).await?;
2934
        let genesis_block = Block::genesis(network);
2935

2936
        // Restrict max number of blocks held in memory to 2.
2937
        let mut cli = state_lock.cli().clone();
2938
        cli.sync_mode_threshold = 2;
2939
        state_lock.set_cli(cli).await;
2940

2941
        let (hsd1, peer_address1) = get_dummy_peer_connection_data_genesis(Network::Beta, 1);
2942
        let [block_1, _block_2, block_3, block_4] = fake_valid_sequence_of_blocks_for_tests(
2943
            &genesis_block,
2944
            Timestamp::hours(1),
2945
            rng.random(),
2946
            network,
2947
        )
2948
        .await;
2949
        state_lock.set_new_tip(block_1.clone()).await?;
2950

2951
        let mock = Mock::new(vec![
2952
            Action::Read(PeerMessage::Block(Box::new(
2953
                block_4.clone().try_into().unwrap(),
2954
            ))),
2955
            Action::Write(PeerMessage::BlockRequestByHash(block_3.hash())),
2956
            Action::Read(PeerMessage::Block(Box::new(
2957
                block_3.clone().try_into().unwrap(),
2958
            ))),
2959
            Action::Read(PeerMessage::Bye),
2960
        ]);
2961

2962
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
2963
            to_main_tx.clone(),
2964
            state_lock.clone(),
2965
            peer_address1,
2966
            hsd1,
2967
            true,
2968
            1,
2969
            block_4.header().timestamp,
2970
        );
2971
        peer_loop_handler
2972
            .run_wrapper(mock, from_main_rx_clone)
2973
            .await?;
2974

2975
        match to_main_rx1.recv().await {
2976
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
2977
            _ => bail!("Must receive remove of peer block max height"),
2978
        }
2979

2980
        // Verify that no block is sent to main loop.
2981
        match to_main_rx1.try_recv() {
2982
            Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (),
2983
            _ => bail!("Peer must not handle more fork-reconciliation blocks than specified in CLI arguments"),
2984
        };
2985
        drop(to_main_tx);
2986

2987
        // Verify that peer is sanctioned for failed fork reconciliation attempt
2988
        assert!(state_lock
2989
            .lock_guard()
2990
            .await
2991
            .net
2992
            .get_peer_standing_from_database(peer_address1.ip())
2993
            .await
2994
            .unwrap()
2995
            .standing
2996
            .is_negative());
2997

2998
        Ok(())
2999
    }
3000

3001
    #[traced_test]
3002
    #[apply(shared_tokio_runtime)]
3003
    async fn test_peer_loop_receival_of_fourth_block_one_block_in_db() {
3004
        // In this scenario, the client know the genesis block (block 0) and block 1, it
3005
        // then receives block 4, meaning that block 3 and 2 will have to be requested.
3006

3007
        let network = Network::Main;
3008
        let (
3009
            _peer_broadcast_tx,
3010
            from_main_rx_clone,
3011
            to_main_tx,
3012
            mut to_main_rx1,
3013
            mut state_lock,
3014
            hsd,
3015
        ) = get_test_genesis_setup(network, 0, cli_args::Args::default())
3016
            .await
3017
            .unwrap();
3018
        let peer_address: SocketAddr = get_dummy_socket_address(0);
3019
        let genesis_block = Block::genesis(network);
3020
        let [block_1, block_2, block_3, block_4] = fake_valid_sequence_of_blocks_for_tests(
3021
            &genesis_block,
3022
            Timestamp::hours(1),
3023
            StdRng::seed_from_u64(5550001).random(),
3024
            network,
3025
        )
3026
        .await;
3027
        state_lock.set_new_tip(block_1.clone()).await.unwrap();
3028

3029
        let mock = Mock::new(vec![
3030
            Action::Read(PeerMessage::Block(Box::new(
3031
                block_4.clone().try_into().unwrap(),
3032
            ))),
3033
            Action::Write(PeerMessage::BlockRequestByHash(block_3.hash())),
3034
            Action::Read(PeerMessage::Block(Box::new(
3035
                block_3.clone().try_into().unwrap(),
3036
            ))),
3037
            Action::Write(PeerMessage::BlockRequestByHash(block_2.hash())),
3038
            Action::Read(PeerMessage::Block(Box::new(
3039
                block_2.clone().try_into().unwrap(),
3040
            ))),
3041
            Action::Read(PeerMessage::Bye),
3042
        ]);
3043

3044
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
3045
            to_main_tx.clone(),
3046
            state_lock.clone(),
3047
            peer_address,
3048
            hsd,
3049
            true,
3050
            1,
3051
            block_4.header().timestamp,
3052
        );
3053
        peer_loop_handler
3054
            .run_wrapper(mock, from_main_rx_clone)
3055
            .await
3056
            .unwrap();
3057

3058
        let Some(PeerTaskToMain::NewBlocks(blocks)) = to_main_rx1.recv().await else {
3059
            panic!("Did not find msg sent to main task");
3060
        };
3061
        assert_eq!(blocks[0].hash(), block_2.hash());
3062
        assert_eq!(blocks[1].hash(), block_3.hash());
3063
        assert_eq!(blocks[2].hash(), block_4.hash());
3064

3065
        let Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) = to_main_rx1.recv().await else {
3066
            panic!("Must receive remove of peer block max height");
3067
        };
3068

3069
        assert!(
3070
            state_lock.lock_guard().await.net.peer_map.is_empty(),
3071
            "peer map must be empty after closing connection gracefully"
3072
        );
3073
    }
3074

3075
    #[traced_test]
3076
    #[apply(shared_tokio_runtime)]
3077
    async fn test_peer_loop_receival_of_third_block_no_blocks_in_db() -> Result<()> {
3078
        // In this scenario, the client only knows the genesis block (block 0) and then
3079
        // receives block 3, meaning that block 2 and 1 will have to be requested.
3080

3081
        let network = Network::Main;
3082
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, mut to_main_rx1, state_lock, hsd) =
3083
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
3084
        let peer_address = get_dummy_socket_address(0);
3085
        let genesis_block = Block::genesis(network);
3086

3087
        let [block_1, block_2, block_3] = fake_valid_sequence_of_blocks_for_tests(
3088
            &genesis_block,
3089
            Timestamp::hours(1),
3090
            StdRng::seed_from_u64(5550001).random(),
3091
            network,
3092
        )
3093
        .await;
3094

3095
        let mock = Mock::new(vec![
3096
            Action::Read(PeerMessage::Block(Box::new(
3097
                block_3.clone().try_into().unwrap(),
3098
            ))),
3099
            Action::Write(PeerMessage::BlockRequestByHash(block_2.hash())),
3100
            Action::Read(PeerMessage::Block(Box::new(
3101
                block_2.clone().try_into().unwrap(),
3102
            ))),
3103
            Action::Write(PeerMessage::BlockRequestByHash(block_1.hash())),
3104
            Action::Read(PeerMessage::Block(Box::new(
3105
                block_1.clone().try_into().unwrap(),
3106
            ))),
3107
            Action::Read(PeerMessage::Bye),
3108
        ]);
3109

3110
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
3111
            to_main_tx.clone(),
3112
            state_lock.clone(),
3113
            peer_address,
3114
            hsd,
3115
            true,
3116
            1,
3117
            block_3.header().timestamp,
3118
        );
3119
        peer_loop_handler
3120
            .run_wrapper(mock, from_main_rx_clone)
3121
            .await?;
3122

3123
        match to_main_rx1.recv().await {
3124
            Some(PeerTaskToMain::NewBlocks(blocks)) => {
3125
                if blocks[0].hash() != block_1.hash() {
3126
                    bail!("1st received block by main loop must be block 1");
3127
                }
3128
                if blocks[1].hash() != block_2.hash() {
3129
                    bail!("2nd received block by main loop must be block 2");
3130
                }
3131
                if blocks[2].hash() != block_3.hash() {
3132
                    bail!("3rd received block by main loop must be block 3");
3133
                }
3134
            }
3135
            _ => bail!("Did not find msg sent to main task"),
3136
        };
3137
        match to_main_rx1.recv().await {
3138
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
3139
            _ => bail!("Must receive remove of peer block max height"),
3140
        }
3141

3142
        if !state_lock.lock_guard().await.net.peer_map.is_empty() {
3143
            bail!("peer map must be empty after closing connection gracefully");
3144
        }
3145

3146
        Ok(())
3147
    }
3148

3149
    #[traced_test]
3150
    #[apply(shared_tokio_runtime)]
3151
    async fn test_block_reconciliation_interrupted_by_block_notification() -> Result<()> {
3152
        // In this scenario, the client know the genesis block (block 0) and block 1, it
3153
        // then receives block 4, meaning that block 3, 2, and 1 will have to be requested.
3154
        // But the requests are interrupted by the peer sending another message: a new block
3155
        // notification.
3156

3157
        let network = Network::Main;
3158
        let (
3159
            _peer_broadcast_tx,
3160
            from_main_rx_clone,
3161
            to_main_tx,
3162
            mut to_main_rx1,
3163
            mut state_lock,
3164
            hsd,
3165
        ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
3166
        let peer_socket_address: SocketAddr = get_dummy_socket_address(0);
3167
        let genesis_block: Block = state_lock
3168
            .lock_guard()
3169
            .await
3170
            .chain
3171
            .archival_state()
3172
            .get_tip()
3173
            .await;
3174

3175
        let [block_1, block_2, block_3, block_4, block_5] =
3176
            fake_valid_sequence_of_blocks_for_tests(
3177
                &genesis_block,
3178
                Timestamp::hours(1),
3179
                StdRng::seed_from_u64(5550001).random(),
3180
                network,
3181
            )
3182
            .await;
3183
        state_lock.set_new_tip(block_1.clone()).await?;
3184

3185
        let mock = Mock::new(vec![
3186
            Action::Read(PeerMessage::Block(Box::new(
3187
                block_4.clone().try_into().unwrap(),
3188
            ))),
3189
            Action::Write(PeerMessage::BlockRequestByHash(block_3.hash())),
3190
            Action::Read(PeerMessage::Block(Box::new(
3191
                block_3.clone().try_into().unwrap(),
3192
            ))),
3193
            Action::Write(PeerMessage::BlockRequestByHash(block_2.hash())),
3194
            //
3195
            // Now make the interruption of the block reconciliation process
3196
            Action::Read(PeerMessage::BlockNotification((&block_5).into())),
3197
            //
3198
            // Complete the block reconciliation process by requesting the last block
3199
            // in this process, to get back to a mutually known block.
3200
            Action::Read(PeerMessage::Block(Box::new(
3201
                block_2.clone().try_into().unwrap(),
3202
            ))),
3203
            //
3204
            // Then anticipate the request of the block that was announced
3205
            // in the interruption.
3206
            // Note that we cannot anticipate the response, as only the main
3207
            // task writes to the database. And the database needs to be updated
3208
            // for the handling of block 5 to be done correctly.
3209
            Action::Write(PeerMessage::BlockRequestByHeight(
3210
                block_5.kernel.header.height,
3211
            )),
3212
            Action::Read(PeerMessage::Bye),
3213
        ]);
3214

3215
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
3216
            to_main_tx.clone(),
3217
            state_lock.clone(),
3218
            peer_socket_address,
3219
            hsd,
3220
            false,
3221
            1,
3222
            block_5.header().timestamp,
3223
        );
3224
        peer_loop_handler
3225
            .run_wrapper(mock, from_main_rx_clone)
3226
            .await?;
3227

3228
        match to_main_rx1.recv().await {
3229
            Some(PeerTaskToMain::NewBlocks(blocks)) => {
3230
                if blocks[0].hash() != block_2.hash() {
3231
                    bail!("1st received block by main loop must be block 1");
3232
                }
3233
                if blocks[1].hash() != block_3.hash() {
3234
                    bail!("2nd received block by main loop must be block 2");
3235
                }
3236
                if blocks[2].hash() != block_4.hash() {
3237
                    bail!("3rd received block by main loop must be block 3");
3238
                }
3239
            }
3240
            _ => bail!("Did not find msg sent to main task"),
3241
        };
3242
        match to_main_rx1.recv().await {
3243
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
3244
            _ => bail!("Must receive remove of peer block max height"),
3245
        }
3246

3247
        if !state_lock.lock_guard().await.net.peer_map.is_empty() {
3248
            bail!("peer map must be empty after closing connection gracefully");
3249
        }
3250

3251
        Ok(())
3252
    }
3253

3254
    #[traced_test]
3255
    #[apply(shared_tokio_runtime)]
3256
    async fn test_block_reconciliation_interrupted_by_peer_list_request() -> Result<()> {
3257
        // In this scenario, the client knows the genesis block (block 0) and block 1, it
3258
        // then receives block 4, meaning that block 3, 2, and 1 will have to be requested.
3259
        // But the requests are interrupted by the peer sending another message: a request
3260
        // for a list of peers.
3261

3262
        let network = Network::Main;
3263
        let (
3264
            _peer_broadcast_tx,
3265
            from_main_rx_clone,
3266
            to_main_tx,
3267
            mut to_main_rx1,
3268
            mut state_lock,
3269
            _hsd,
3270
        ) = get_test_genesis_setup(network, 1, cli_args::Args::default()).await?;
3271
        let genesis_block = Block::genesis(network);
3272
        let peer_infos: Vec<PeerInfo> = state_lock
3273
            .lock_guard()
3274
            .await
3275
            .net
3276
            .peer_map
3277
            .clone()
3278
            .into_values()
3279
            .collect::<Vec<_>>();
3280

3281
        let [block_1, block_2, block_3, block_4] = fake_valid_sequence_of_blocks_for_tests(
3282
            &genesis_block,
3283
            Timestamp::hours(1),
3284
            StdRng::seed_from_u64(5550001).random(),
3285
            network,
3286
        )
3287
        .await;
3288
        state_lock.set_new_tip(block_1.clone()).await?;
3289

3290
        let (hsd_1, sa_1) = get_dummy_peer_connection_data_genesis(network, 1);
3291
        let expected_peer_list_resp = vec![
3292
            (
3293
                peer_infos[0].listen_address().unwrap(),
3294
                peer_infos[0].instance_id(),
3295
            ),
3296
            (sa_1, hsd_1.instance_id),
3297
        ];
3298
        let mock = Mock::new(vec![
3299
            Action::Read(PeerMessage::Block(Box::new(
3300
                block_4.clone().try_into().unwrap(),
3301
            ))),
3302
            Action::Write(PeerMessage::BlockRequestByHash(block_3.hash())),
3303
            Action::Read(PeerMessage::Block(Box::new(
3304
                block_3.clone().try_into().unwrap(),
3305
            ))),
3306
            Action::Write(PeerMessage::BlockRequestByHash(block_2.hash())),
3307
            //
3308
            // Now make the interruption of the block reconciliation process
3309
            Action::Read(PeerMessage::PeerListRequest),
3310
            //
3311
            // Answer the request for a peer list
3312
            Action::Write(PeerMessage::PeerListResponse(expected_peer_list_resp)),
3313
            //
3314
            // Complete the block reconciliation process by requesting the last block
3315
            // in this process, to get back to a mutually known block.
3316
            Action::Read(PeerMessage::Block(Box::new(
3317
                block_2.clone().try_into().unwrap(),
3318
            ))),
3319
            Action::Read(PeerMessage::Bye),
3320
        ]);
3321

3322
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
3323
            to_main_tx,
3324
            state_lock.clone(),
3325
            sa_1,
3326
            hsd_1,
3327
            true,
3328
            1,
3329
            block_4.header().timestamp,
3330
        );
3331
        peer_loop_handler
3332
            .run_wrapper(mock, from_main_rx_clone)
3333
            .await?;
3334

3335
        // Verify that blocks are sent to `main_loop` in expected ordering
3336
        match to_main_rx1.recv().await {
3337
            Some(PeerTaskToMain::NewBlocks(blocks)) => {
3338
                if blocks[0].hash() != block_2.hash() {
3339
                    bail!("1st received block by main loop must be block 1");
3340
                }
3341
                if blocks[1].hash() != block_3.hash() {
3342
                    bail!("2nd received block by main loop must be block 2");
3343
                }
3344
                if blocks[2].hash() != block_4.hash() {
3345
                    bail!("3rd received block by main loop must be block 3");
3346
                }
3347
            }
3348
            _ => bail!("Did not find msg sent to main task"),
3349
        };
3350

3351
        assert_eq!(
3352
            1,
3353
            state_lock.lock_guard().await.net.peer_map.len(),
3354
            "One peer must remain in peer list after peer_1 closed gracefully"
3355
        );
3356

3357
        Ok(())
3358
    }
3359

3360
    #[traced_test]
3361
    #[apply(shared_tokio_runtime)]
3362
    async fn receive_transaction_request() {
3363
        let network = Network::Main;
3364
        let dummy_tx = invalid_empty_single_proof_transaction();
3365
        let txid = dummy_tx.kernel.txid();
3366

3367
        for transaction_is_known in [false, true] {
3368
            let (_peer_broadcast_tx, from_main_rx, to_main_tx, _, mut state_lock, _hsd) =
3369
                get_test_genesis_setup(network, 1, cli_args::Args::default())
3370
                    .await
3371
                    .unwrap();
3372
            if transaction_is_known {
3373
                state_lock
3374
                    .lock_guard_mut()
3375
                    .await
3376
                    .mempool_insert(dummy_tx.clone(), TransactionOrigin::Own)
3377
                    .await;
3378
            }
3379

3380
            let mock = if transaction_is_known {
3381
                Mock::new(vec![
3382
                    Action::Read(PeerMessage::TransactionRequest(txid)),
3383
                    Action::Write(PeerMessage::Transaction(Box::new(
3384
                        (&dummy_tx).try_into().unwrap(),
3385
                    ))),
3386
                    Action::Read(PeerMessage::Bye),
3387
                ])
3388
            } else {
3389
                Mock::new(vec![
3390
                    Action::Read(PeerMessage::TransactionRequest(txid)),
3391
                    Action::Read(PeerMessage::Bye),
3392
                ])
3393
            };
3394

3395
            let hsd = get_dummy_handshake_data_for_genesis(network);
3396
            let mut peer_state = MutablePeerState::new(hsd.tip_header.height);
3397
            let mut peer_loop_handler = PeerLoopHandler::new(
3398
                to_main_tx,
3399
                state_lock,
3400
                get_dummy_socket_address(0),
3401
                hsd,
3402
                true,
3403
                1,
3404
            );
3405

3406
            peer_loop_handler
3407
                .run(mock, from_main_rx, &mut peer_state)
3408
                .await
3409
                .unwrap();
3410
        }
3411
    }
3412

3413
    #[traced_test]
3414
    #[apply(shared_tokio_runtime)]
3415
    async fn empty_mempool_request_tx_test() {
3416
        // In this scenario the client receives a transaction notification from
3417
        // a peer of a transaction it doesn't know; the client must then request it.
3418

3419
        let network = Network::Main;
3420
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, mut to_main_rx1, state_lock, _hsd) =
3421
            get_test_genesis_setup(network, 1, cli_args::Args::default())
3422
                .await
3423
                .unwrap();
3424

3425
        let spending_key = state_lock
3426
            .lock_guard()
3427
            .await
3428
            .wallet_state
3429
            .wallet_entropy
3430
            .nth_symmetric_key_for_tests(0);
3431
        let genesis_block = Block::genesis(network);
3432
        let now = genesis_block.kernel.header.timestamp;
3433
        let config = TxCreationConfig::default()
3434
            .recover_change_off_chain(spending_key.into())
3435
            .with_prover_capability(TxProvingCapability::ProofCollection);
3436
        let transaction_1: Transaction = state_lock
3437
            .api()
3438
            .tx_initiator_internal()
3439
            .create_transaction(
3440
                Default::default(),
3441
                NativeCurrencyAmount::coins(0),
3442
                now,
3443
                config,
3444
            )
3445
            .await
3446
            .unwrap()
3447
            .transaction
3448
            .into();
3449

3450
        // Build the resulting transaction notification
3451
        let tx_notification: TransactionNotification = (&transaction_1).try_into().unwrap();
3452
        let mock = Mock::new(vec![
3453
            Action::Read(PeerMessage::TransactionNotification(tx_notification)),
3454
            Action::Write(PeerMessage::TransactionRequest(tx_notification.txid)),
3455
            Action::Read(PeerMessage::Transaction(Box::new(
3456
                (&transaction_1).try_into().unwrap(),
3457
            ))),
3458
            Action::Read(PeerMessage::Bye),
3459
        ]);
3460

3461
        let (hsd_1, _sa_1) = get_dummy_peer_connection_data_genesis(network, 1);
3462

3463
        // Mock a timestamp to allow transaction to be considered valid
3464
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
3465
            to_main_tx,
3466
            state_lock.clone(),
3467
            get_dummy_socket_address(0),
3468
            hsd_1.clone(),
3469
            true,
3470
            1,
3471
            now,
3472
        );
3473

3474
        let mut peer_state = MutablePeerState::new(hsd_1.tip_header.height);
3475

3476
        assert!(
3477
            state_lock.lock_guard().await.mempool.is_empty(),
3478
            "Mempool must be empty at init"
3479
        );
3480
        peer_loop_handler
3481
            .run(mock, from_main_rx_clone, &mut peer_state)
3482
            .await
3483
            .unwrap();
3484

3485
        // Transaction must be sent to `main_loop`. The transaction is stored to the mempool
3486
        // by the `main_loop`.
3487
        match to_main_rx1.recv().await {
3488
            Some(PeerTaskToMain::Transaction(_)) => (),
3489
            _ => panic!("Must receive remove of peer block max height"),
3490
        };
3491
    }
3492

3493
    #[traced_test]
3494
    #[apply(shared_tokio_runtime)]
3495
    async fn populated_mempool_request_tx_test() -> Result<()> {
3496
        // In this scenario the peer is informed of a transaction that it already knows
3497

3498
        let network = Network::Main;
3499
        let (
3500
            _peer_broadcast_tx,
3501
            from_main_rx_clone,
3502
            to_main_tx,
3503
            mut to_main_rx1,
3504
            mut state_lock,
3505
            _hsd,
3506
        ) = get_test_genesis_setup(network, 1, cli_args::Args::default())
3507
            .await
3508
            .unwrap();
3509
        let spending_key = state_lock
3510
            .lock_guard()
3511
            .await
3512
            .wallet_state
3513
            .wallet_entropy
3514
            .nth_symmetric_key_for_tests(0);
3515

3516
        let genesis_block = Block::genesis(network);
3517
        let now = genesis_block.kernel.header.timestamp;
3518
        let config = TxCreationConfig::default()
3519
            .recover_change_off_chain(spending_key.into())
3520
            .with_prover_capability(TxProvingCapability::ProofCollection);
3521
        let transaction_1: Transaction = state_lock
3522
            .api()
3523
            .tx_initiator_internal()
3524
            .create_transaction(
3525
                Default::default(),
3526
                NativeCurrencyAmount::coins(0),
3527
                now,
3528
                config,
3529
            )
3530
            .await
3531
            .unwrap()
3532
            .transaction
3533
            .into();
3534

3535
        let (hsd_1, _sa_1) = get_dummy_peer_connection_data_genesis(network, 1);
3536
        let mut peer_loop_handler = PeerLoopHandler::new(
3537
            to_main_tx,
3538
            state_lock.clone(),
3539
            get_dummy_socket_address(0),
3540
            hsd_1.clone(),
3541
            true,
3542
            1,
3543
        );
3544
        let mut peer_state = MutablePeerState::new(hsd_1.tip_header.height);
3545

3546
        assert!(
3547
            state_lock.lock_guard().await.mempool.is_empty(),
3548
            "Mempool must be empty at init"
3549
        );
3550
        state_lock
3551
            .lock_guard_mut()
3552
            .await
3553
            .mempool_insert(transaction_1.clone(), TransactionOrigin::Foreign)
3554
            .await;
3555
        assert!(
3556
            !state_lock.lock_guard().await.mempool.is_empty(),
3557
            "Mempool must be non-empty after insertion"
3558
        );
3559

3560
        // Run the peer loop and verify expected exchange -- namely that the
3561
        // tx notification is received and the the transaction is *not*
3562
        // requested.
3563
        let tx_notification: TransactionNotification = (&transaction_1).try_into().unwrap();
3564
        let mock = Mock::new(vec![
3565
            Action::Read(PeerMessage::TransactionNotification(tx_notification)),
3566
            Action::Read(PeerMessage::Bye),
3567
        ]);
3568
        peer_loop_handler
3569
            .run(mock, from_main_rx_clone, &mut peer_state)
3570
            .await
3571
            .unwrap();
3572

3573
        // nothing is allowed to be sent to `main_loop`
3574
        match to_main_rx1.try_recv() {
3575
            Err(TryRecvError::Empty) => (),
3576
            Err(TryRecvError::Disconnected) => panic!("to_main channel must still be open"),
3577
            Ok(_) => panic!("to_main channel must be empty"),
3578
        };
3579
        Ok(())
3580
    }
3581

3582
    mod block_proposals {
3583
        use super::*;
3584
        use crate::tests::shared::get_dummy_handshake_data_for_genesis;
3585

3586
        struct TestSetup {
3587
            peer_loop_handler: PeerLoopHandler,
3588
            to_main_rx: mpsc::Receiver<PeerTaskToMain>,
3589
            from_main_rx: broadcast::Receiver<MainToPeerTask>,
3590
            peer_state: MutablePeerState,
3591
            to_main_tx: mpsc::Sender<PeerTaskToMain>,
3592
            genesis_block: Block,
3593
            peer_broadcast_tx: broadcast::Sender<MainToPeerTask>,
3594
        }
3595

3596
        async fn genesis_setup(network: Network) -> TestSetup {
3597
            let (peer_broadcast_tx, from_main_rx, to_main_tx, to_main_rx, alice, _hsd) =
3598
                get_test_genesis_setup(network, 0, cli_args::Args::default())
3599
                    .await
3600
                    .unwrap();
3601
            let peer_hsd = get_dummy_handshake_data_for_genesis(network);
3602
            let peer_loop_handler = PeerLoopHandler::new(
3603
                to_main_tx.clone(),
3604
                alice.clone(),
3605
                get_dummy_socket_address(0),
3606
                peer_hsd.clone(),
3607
                true,
3608
                1,
3609
            );
3610
            let peer_state = MutablePeerState::new(peer_hsd.tip_header.height);
3611

3612
            // (peer_loop_handler, to_main_rx1)
3613
            TestSetup {
3614
                peer_broadcast_tx,
3615
                peer_loop_handler,
3616
                to_main_rx,
3617
                from_main_rx,
3618
                peer_state,
3619
                to_main_tx,
3620
                genesis_block: Block::genesis(network),
3621
            }
3622
        }
3623

3624
        #[traced_test]
3625
        #[apply(shared_tokio_runtime)]
3626
        async fn accept_block_proposal_height_one() {
3627
            // Node knows genesis block, receives a block proposal for block 1
3628
            // and must accept this. Verify that main loop is informed of block
3629
            // proposal.
3630
            let TestSetup {
3631
                peer_broadcast_tx,
3632
                mut peer_loop_handler,
3633
                mut to_main_rx,
3634
                from_main_rx,
3635
                mut peer_state,
3636
                to_main_tx,
3637
                genesis_block,
3638
            } = genesis_setup(Network::Main).await;
3639
            let block1 = fake_valid_block_for_tests(
3640
                &peer_loop_handler.global_state_lock,
3641
                StdRng::seed_from_u64(5550001).random(),
3642
            )
3643
            .await;
3644

3645
            let mock = Mock::new(vec![
3646
                Action::Read(PeerMessage::BlockProposal(Box::new(block1))),
3647
                Action::Read(PeerMessage::Bye),
3648
            ]);
3649
            peer_loop_handler
3650
                .run(mock, from_main_rx, &mut peer_state)
3651
                .await
3652
                .unwrap();
3653

3654
            match to_main_rx.try_recv().unwrap() {
3655
                PeerTaskToMain::BlockProposal(block) => {
3656
                    assert_eq!(genesis_block.hash(), block.header().prev_block_digest);
3657
                }
3658
                _ => panic!("Expected main loop to be informed of block proposal"),
3659
            };
3660

3661
            drop(to_main_tx);
3662
            drop(peer_broadcast_tx);
3663
        }
3664

3665
        #[traced_test]
3666
        #[apply(shared_tokio_runtime)]
3667
        async fn accept_block_proposal_notification_height_one() {
3668
            // Node knows genesis block, receives a block proposal notification
3669
            // for block 1 and must accept this by requesting the block
3670
            // proposal from peer.
3671
            let TestSetup {
3672
                peer_broadcast_tx,
3673
                mut peer_loop_handler,
3674
                to_main_rx: _,
3675
                from_main_rx,
3676
                mut peer_state,
3677
                to_main_tx,
3678
                ..
3679
            } = genesis_setup(Network::Main).await;
3680
            let block1 = fake_valid_block_for_tests(
3681
                &peer_loop_handler.global_state_lock,
3682
                StdRng::seed_from_u64(5550001).random(),
3683
            )
3684
            .await;
3685

3686
            let mock = Mock::new(vec![
3687
                Action::Read(PeerMessage::BlockProposalNotification((&block1).into())),
3688
                Action::Write(PeerMessage::BlockProposalRequest(
3689
                    BlockProposalRequest::new(block1.body().mast_hash()),
3690
                )),
3691
                Action::Read(PeerMessage::Bye),
3692
            ]);
3693
            peer_loop_handler
3694
                .run(mock, from_main_rx, &mut peer_state)
3695
                .await
3696
                .unwrap();
3697

3698
            drop(to_main_tx);
3699
            drop(peer_broadcast_tx);
3700
        }
3701
    }
3702

3703
    mod proof_qualities {
3704
        use strum::IntoEnumIterator;
3705

3706
        use super::*;
3707
        use crate::config_models::cli_args;
3708
        use crate::models::blockchain::transaction::Transaction;
3709
        use crate::models::peer::transfer_transaction::TransactionProofQuality;
3710
        use crate::models::state::wallet::transaction_output::TxOutput;
3711
        use crate::tests::shared::mock_genesis_global_state;
3712

3713
        async fn tx_of_proof_quality(
3714
            network: Network,
3715
            quality: TransactionProofQuality,
3716
        ) -> Transaction {
3717
            let wallet_secret = WalletEntropy::devnet_wallet();
3718
            let alice_key = wallet_secret.nth_generation_spending_key_for_tests(0);
3719
            let alice_gsl = mock_genesis_global_state(
3720
                1,
3721
                wallet_secret,
3722
                cli_args::Args::default_with_network(network),
3723
            )
3724
            .await;
3725
            let alice = alice_gsl.lock_guard().await;
3726
            let genesis_block = alice.chain.light_state();
3727
            let in_seven_months = genesis_block.header().timestamp + Timestamp::months(7);
3728
            let prover_capability = match quality {
3729
                TransactionProofQuality::ProofCollection => TxProvingCapability::ProofCollection,
3730
                TransactionProofQuality::SingleProof => TxProvingCapability::SingleProof,
3731
            };
3732
            let config = TxCreationConfig::default()
3733
                .recover_change_off_chain(alice_key.into())
3734
                .with_prover_capability(prover_capability);
3735
            alice_gsl
3736
                .api()
3737
                .tx_initiator_internal()
3738
                .create_transaction(
3739
                    Vec::<TxOutput>::new().into(),
3740
                    NativeCurrencyAmount::coins(1),
3741
                    in_seven_months,
3742
                    config,
3743
                )
3744
                .await
3745
                .unwrap()
3746
                .transaction()
3747
                .clone()
3748
        }
3749

3750
        #[traced_test]
3751
        #[apply(shared_tokio_runtime)]
3752
        async fn client_favors_higher_proof_quality() {
3753
            // In this scenario the peer is informed of a transaction that it
3754
            // already knows, and it's tested that it checks the proof quality
3755
            // field and verifies that it exceeds the proof in the mempool
3756
            // before requesting the transasction.
3757
            let network = Network::Main;
3758
            let proof_collection_tx =
3759
                tx_of_proof_quality(network, TransactionProofQuality::ProofCollection).await;
3760
            let single_proof_tx =
3761
                tx_of_proof_quality(network, TransactionProofQuality::SingleProof).await;
3762

3763
            for (own_tx_pq, new_tx_pq) in
3764
                TransactionProofQuality::iter().cartesian_product(TransactionProofQuality::iter())
3765
            {
3766
                use TransactionProofQuality::*;
3767

3768
                let (
3769
                    _peer_broadcast_tx,
3770
                    from_main_rx_clone,
3771
                    to_main_tx,
3772
                    mut to_main_rx1,
3773
                    mut alice,
3774
                    handshake_data,
3775
                ) = get_test_genesis_setup(network, 1, cli_args::Args::default())
3776
                    .await
3777
                    .unwrap();
3778

3779
                let (own_tx, new_tx) = match (own_tx_pq, new_tx_pq) {
3780
                    (ProofCollection, ProofCollection) => {
3781
                        (&proof_collection_tx, &proof_collection_tx)
3782
                    }
3783
                    (ProofCollection, SingleProof) => (&proof_collection_tx, &single_proof_tx),
3784
                    (SingleProof, ProofCollection) => (&single_proof_tx, &proof_collection_tx),
3785
                    (SingleProof, SingleProof) => (&single_proof_tx, &single_proof_tx),
3786
                };
3787

3788
                alice
3789
                    .lock_guard_mut()
3790
                    .await
3791
                    .mempool_insert((*own_tx).to_owned(), TransactionOrigin::Foreign)
3792
                    .await;
3793

3794
                let tx_notification: TransactionNotification = new_tx.try_into().unwrap();
3795

3796
                let own_proof_is_supreme = own_tx_pq >= new_tx_pq;
3797
                let mock = if own_proof_is_supreme {
3798
                    Mock::new(vec![
3799
                        Action::Read(PeerMessage::TransactionNotification(tx_notification)),
3800
                        Action::Read(PeerMessage::Bye),
3801
                    ])
3802
                } else {
3803
                    Mock::new(vec![
3804
                        Action::Read(PeerMessage::TransactionNotification(tx_notification)),
3805
                        Action::Write(PeerMessage::TransactionRequest(tx_notification.txid)),
3806
                        Action::Read(PeerMessage::Transaction(Box::new(
3807
                            new_tx.try_into().unwrap(),
3808
                        ))),
3809
                        Action::Read(PeerMessage::Bye),
3810
                    ])
3811
                };
3812

3813
                let now = proof_collection_tx.kernel.timestamp;
3814
                let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
3815
                    to_main_tx,
3816
                    alice.clone(),
3817
                    get_dummy_socket_address(0),
3818
                    handshake_data.clone(),
3819
                    true,
3820
                    1,
3821
                    now,
3822
                );
3823
                let mut peer_state = MutablePeerState::new(handshake_data.tip_header.height);
3824

3825
                peer_loop_handler
3826
                    .run(mock, from_main_rx_clone, &mut peer_state)
3827
                    .await
3828
                    .unwrap();
3829

3830
                if own_proof_is_supreme {
3831
                    match to_main_rx1.try_recv() {
3832
                        Err(TryRecvError::Empty) => (),
3833
                        Err(TryRecvError::Disconnected) => {
3834
                            panic!("to_main channel must still be open")
3835
                        }
3836
                        Ok(_) => panic!("to_main channel must be empty"),
3837
                    }
3838
                } else {
3839
                    match to_main_rx1.try_recv() {
3840
                        Err(TryRecvError::Empty) => panic!("Transaction must be sent to main loop"),
3841
                        Err(TryRecvError::Disconnected) => {
3842
                            panic!("to_main channel must still be open")
3843
                        }
3844
                        Ok(PeerTaskToMain::Transaction(_)) => (),
3845
                        _ => panic!("Unexpected result from channel"),
3846
                    }
3847
                }
3848
            }
3849
        }
3850
    }
3851

3852
    mod sync_challenges {
3853
        use super::*;
3854
        use crate::tests::shared::fake_valid_sequence_of_blocks_for_tests_dyn;
3855

3856
        #[traced_test]
3857
        #[apply(shared_tokio_runtime)]
3858
        async fn bad_sync_challenge_height_greater_than_tip() {
3859
            // Criterium: Challenge height may not exceed that of tip in the
3860
            // request.
3861

3862
            let network = Network::Main;
3863
            let (
3864
                _alice_main_to_peer_tx,
3865
                alice_main_to_peer_rx,
3866
                alice_peer_to_main_tx,
3867
                alice_peer_to_main_rx,
3868
                mut alice,
3869
                alice_hsd,
3870
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default())
3871
                .await
3872
                .unwrap();
3873
            let genesis_block: Block = Block::genesis(network);
3874
            let blocks: [Block; 11] = fake_valid_sequence_of_blocks_for_tests(
3875
                &genesis_block,
3876
                Timestamp::hours(1),
3877
                [0u8; 32],
3878
                network,
3879
            )
3880
            .await;
3881
            for block in &blocks {
3882
                alice.set_new_tip(block.clone()).await.unwrap();
3883
            }
3884

3885
            let bh12 = blocks.last().unwrap().header().height;
3886
            let sync_challenge = SyncChallenge {
3887
                tip_digest: blocks[9].hash(),
3888
                challenges: [bh12; 10],
3889
            };
3890
            let alice_p2p_messages = Mock::new(vec![
3891
                Action::Read(PeerMessage::SyncChallenge(sync_challenge)),
3892
                Action::Read(PeerMessage::Bye),
3893
            ]);
3894

3895
            let peer_address = get_dummy_socket_address(0);
3896
            let mut alice_peer_loop_handler = PeerLoopHandler::new(
3897
                alice_peer_to_main_tx.clone(),
3898
                alice.clone(),
3899
                peer_address,
3900
                alice_hsd,
3901
                false,
3902
                1,
3903
            );
3904
            alice_peer_loop_handler
3905
                .run_wrapper(alice_p2p_messages, alice_main_to_peer_rx)
3906
                .await
3907
                .unwrap();
3908

3909
            drop(alice_peer_to_main_rx);
3910

3911
            let latest_sanction = alice
3912
                .lock_guard()
3913
                .await
3914
                .net
3915
                .get_peer_standing_from_database(peer_address.ip())
3916
                .await
3917
                .unwrap();
3918
            assert_eq!(
3919
                NegativePeerSanction::InvalidSyncChallenge,
3920
                latest_sanction
3921
                    .latest_punishment
3922
                    .expect("peer must be sanctioned")
3923
                    .0
3924
            );
3925
        }
3926

3927
        #[traced_test]
3928
        #[apply(shared_tokio_runtime)]
3929
        async fn bad_sync_challenge_genesis_block_doesnt_crash_client() {
3930
            // Criterium: Challenge may not point to genesis block, or block 1, as
3931
            // tip.
3932

3933
            let network = Network::Main;
3934
            let genesis_block: Block = Block::genesis(network);
3935

3936
            let alice_cli = cli_args::Args::default();
3937
            let (
3938
                _alice_main_to_peer_tx,
3939
                alice_main_to_peer_rx,
3940
                alice_peer_to_main_tx,
3941
                alice_peer_to_main_rx,
3942
                alice,
3943
                alice_hsd,
3944
            ) = get_test_genesis_setup(network, 0, alice_cli).await.unwrap();
3945

3946
            let sync_challenge = SyncChallenge {
3947
                tip_digest: genesis_block.hash(),
3948
                challenges: [BlockHeight::genesis(); 10],
3949
            };
3950

3951
            let alice_p2p_messages = Mock::new(vec![
3952
                Action::Read(PeerMessage::SyncChallenge(sync_challenge)),
3953
                Action::Read(PeerMessage::Bye),
3954
            ]);
3955

3956
            let peer_address = get_dummy_socket_address(0);
3957
            let mut alice_peer_loop_handler = PeerLoopHandler::new(
3958
                alice_peer_to_main_tx.clone(),
3959
                alice.clone(),
3960
                peer_address,
3961
                alice_hsd,
3962
                false,
3963
                1,
3964
            );
3965
            alice_peer_loop_handler
3966
                .run_wrapper(alice_p2p_messages, alice_main_to_peer_rx)
3967
                .await
3968
                .unwrap();
3969

3970
            drop(alice_peer_to_main_rx);
3971

3972
            let latest_sanction = alice
3973
                .lock_guard()
3974
                .await
3975
                .net
3976
                .get_peer_standing_from_database(peer_address.ip())
3977
                .await
3978
                .unwrap();
3979
            assert_eq!(
3980
                NegativePeerSanction::InvalidSyncChallenge,
3981
                latest_sanction
3982
                    .latest_punishment
3983
                    .expect("peer must be sanctioned")
3984
                    .0
3985
            );
3986
        }
3987

3988
        #[traced_test]
3989
        #[apply(shared_tokio_runtime)]
3990
        async fn sync_challenge_happy_path() -> Result<()> {
3991
            // Bob notifies Alice of a block whose parameters satisfy the sync mode
3992
            // criterion. Alice issues a challenge. Bob responds. Alice enters into
3993
            // sync mode.
3994

3995
            let mut rng = rand::rng();
3996
            let network = Network::Main;
3997
            let genesis_block: Block = Block::genesis(network);
3998

3999
            const ALICE_SYNC_MODE_THRESHOLD: usize = 10;
4000
            let alice_cli = cli_args::Args {
4001
                sync_mode_threshold: ALICE_SYNC_MODE_THRESHOLD,
4002
                ..Default::default()
4003
            };
4004
            let (
4005
                _alice_main_to_peer_tx,
4006
                alice_main_to_peer_rx,
4007
                alice_peer_to_main_tx,
4008
                mut alice_peer_to_main_rx,
4009
                mut alice,
4010
                alice_hsd,
4011
            ) = get_test_genesis_setup(network, 0, alice_cli).await?;
4012
            let _alice_socket_address = get_dummy_socket_address(0);
4013

4014
            let (
4015
                _bob_main_to_peer_tx,
4016
                _bob_main_to_peer_rx,
4017
                _bob_peer_to_main_tx,
4018
                _bob_peer_to_main_rx,
4019
                mut bob,
4020
                _bob_hsd,
4021
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
4022
            let bob_socket_address = get_dummy_socket_address(0);
4023

4024
            let now = genesis_block.header().timestamp + Timestamp::hours(1);
4025
            let block_1 = fake_valid_block_for_tests(&alice, rng.random()).await;
4026
            assert!(
4027
                block_1.is_valid(&genesis_block, now, network).await,
4028
                "Block must be valid for this test to make sense"
4029
            );
4030
            let alice_tip = &block_1;
4031
            alice.set_new_tip(block_1.clone()).await?;
4032
            bob.set_new_tip(block_1.clone()).await?;
4033

4034
            // produce enough blocks to ensure alice needs to go into sync mode
4035
            // with this block notification.
4036
            let blocks = fake_valid_sequence_of_blocks_for_tests_dyn(
4037
                &block_1,
4038
                network.target_block_interval(),
4039
                rng.random(),
4040
                network,
4041
                rng.random_range(ALICE_SYNC_MODE_THRESHOLD + 1..20),
4042
            )
4043
            .await;
4044
            for block in &blocks {
4045
                bob.set_new_tip(block.clone()).await?;
4046
            }
4047
            let bob_tip = blocks.last().unwrap();
4048

4049
            let block_notification_from_bob = PeerBlockNotification {
4050
                hash: bob_tip.hash(),
4051
                height: bob_tip.header().height,
4052
                cumulative_proof_of_work: bob_tip.header().cumulative_proof_of_work,
4053
            };
4054

4055
            let alice_rng_seed = rng.random::<[u8; 32]>();
4056
            let mut alice_rng_clone = StdRng::from_seed(alice_rng_seed);
4057
            let sync_challenge_from_alice = SyncChallenge::generate(
4058
                &block_notification_from_bob,
4059
                alice_tip.header().height,
4060
                alice_rng_clone.random(),
4061
            );
4062

4063
            println!(
4064
                "sync challenge from alice:\n{:?}",
4065
                sync_challenge_from_alice
4066
            );
4067

4068
            let sync_challenge_response_from_bob = bob
4069
                .lock_guard()
4070
                .await
4071
                .response_to_sync_challenge(sync_challenge_from_alice)
4072
                .await
4073
                .expect("should be able to respond to sync challenge");
4074

4075
            let alice_p2p_messages = Mock::new(vec![
4076
                Action::Read(PeerMessage::BlockNotification(block_notification_from_bob)),
4077
                Action::Write(PeerMessage::SyncChallenge(sync_challenge_from_alice)),
4078
                Action::Read(PeerMessage::SyncChallengeResponse(Box::new(
4079
                    sync_challenge_response_from_bob,
4080
                ))),
4081
                Action::Read(PeerMessage::BlockNotification(block_notification_from_bob)),
4082
                // Ensure no 2nd sync challenge is sent, as timeout has not yet passed.
4083
                // The absence of a Write here checks that a 2nd challenge isn't sent
4084
                // when a successful was just received.
4085
                Action::Read(PeerMessage::Bye),
4086
            ]);
4087

4088
            let mut alice_peer_loop_handler = PeerLoopHandler::with_mocked_time(
4089
                alice_peer_to_main_tx.clone(),
4090
                alice.clone(),
4091
                bob_socket_address,
4092
                alice_hsd,
4093
                false,
4094
                1,
4095
                bob_tip.header().timestamp,
4096
            );
4097
            alice_peer_loop_handler.set_rng(StdRng::from_seed(alice_rng_seed));
4098
            alice_peer_loop_handler
4099
                .run_wrapper(alice_p2p_messages, alice_main_to_peer_rx)
4100
                .await?;
4101

4102
            // AddPeerMaxBlockHeight message triggered *after* sync challenge
4103
            let mut expected_anchor_mmra = bob_tip.body().block_mmr_accumulator.clone();
4104
            expected_anchor_mmra.append(bob_tip.hash());
4105
            let expected_message_from_alice_peer_loop = PeerTaskToMain::AddPeerMaxBlockHeight {
4106
                peer_address: bob_socket_address,
4107
                claimed_height: bob_tip.header().height,
4108
                claimed_cumulative_pow: bob_tip.header().cumulative_proof_of_work,
4109
                claimed_block_mmra: expected_anchor_mmra,
4110
            };
4111
            let observed_message_from_alice_peer_loop = alice_peer_to_main_rx.recv().await.unwrap();
4112
            assert_eq!(
4113
                expected_message_from_alice_peer_loop,
4114
                observed_message_from_alice_peer_loop
4115
            );
4116

4117
            Ok(())
4118
        }
4119
    }
4120
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc