• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Neptune-Crypto / neptune-core / 13936939989

19 Mar 2025 02:10AM UTC coverage: 84.361% (+0.08%) from 84.279%
13936939989

Pull #512

github

GitHub
Merge 7b4847cb1 into bba442d24
Pull Request #512: `proptest_state_machine` over `PeerMessage`

9 of 26 new or added lines in 7 files covered. (34.62%)

247 existing lines in 3 files now uncovered.

50825 of 60247 relevant lines covered (84.36%)

178616.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.98
/src/peer_loop.rs
1
use std::cmp;
2
use std::marker::Unpin;
3
use std::net::SocketAddr;
4
use std::time::SystemTime;
5

6
use anyhow::bail;
7
use anyhow::Result;
8
use chrono::DateTime;
9
use chrono::Utc;
10
use futures::sink::Sink;
11
use futures::sink::SinkExt;
12
use futures::stream::TryStream;
13
use futures::stream::TryStreamExt;
14
use itertools::Itertools;
15
use rand::rngs::StdRng;
16
use rand::Rng;
17
use rand::SeedableRng;
18
use tasm_lib::triton_vm::prelude::Digest;
19
use tasm_lib::twenty_first::prelude::Mmr;
20
use tasm_lib::twenty_first::prelude::MmrMembershipProof;
21
use tasm_lib::twenty_first::util_types::mmr::mmr_accumulator::MmrAccumulator;
22
use tokio::select;
23
use tokio::sync::broadcast;
24
use tokio::sync::mpsc;
25
use tracing::debug;
26
use tracing::error;
27
use tracing::info;
28
use tracing::warn;
29

30
use crate::connect_to_peers::close_peer_connected_callback;
31
use crate::macros::fn_name;
32
use crate::macros::log_slow_scope;
33
use crate::main_loop::MAX_NUM_DIGESTS_IN_BATCH_REQUEST;
34
use crate::models::blockchain::block::block_height::BlockHeight;
35
use crate::models::blockchain::block::mutator_set_update::MutatorSetUpdate;
36
use crate::models::blockchain::block::Block;
37
use crate::models::blockchain::transaction::transaction_kernel::TransactionConfirmabilityError;
38
use crate::models::blockchain::transaction::Transaction;
39
use crate::models::channel::MainToPeerTask;
40
use crate::models::channel::PeerTaskToMain;
41
use crate::models::channel::PeerTaskToMainTransaction;
42
use crate::models::peer::handshake_data::HandshakeData;
43
use crate::models::peer::peer_info::PeerConnectionInfo;
44
use crate::models::peer::peer_info::PeerInfo;
45
use crate::models::peer::transfer_block::TransferBlock;
46
use crate::models::peer::BlockProposalRequest;
47
use crate::models::peer::BlockRequestBatch;
48
use crate::models::peer::IssuedSyncChallenge;
49
use crate::models::peer::MutablePeerState;
50
use crate::models::peer::NegativePeerSanction;
51
use crate::models::peer::PeerMessage;
52
use crate::models::peer::PeerSanction;
53
use crate::models::peer::PeerStanding;
54
use crate::models::peer::PositivePeerSanction;
55
use crate::models::peer::SyncChallenge;
56
use crate::models::proof_abstractions::mast_hash::MastHash;
57
use crate::models::proof_abstractions::timestamp::Timestamp;
58
use crate::models::state::block_proposal::BlockProposalRejectError;
59
use crate::models::state::mempool::MEMPOOL_IGNORE_TRANSACTIONS_THIS_MANY_SECS_AHEAD;
60
use crate::models::state::mempool::MEMPOOL_TX_THRESHOLD_AGE_IN_SECS;
61
use crate::models::state::GlobalState;
62
use crate::models::state::GlobalStateLock;
63
use crate::util_types::mutator_set::removal_record::RemovalRecordValidityError;
64

65
const STANDARD_BLOCK_BATCH_SIZE: usize = 250;
66
const MAX_PEER_LIST_LENGTH: usize = 10;
67
const MINIMUM_BLOCK_BATCH_SIZE: usize = 2;
68

69
const KEEP_CONNECTION_ALIVE: bool = false;
70
const DISCONNECT_CONNECTION: bool = true;
71

72
pub type PeerStandingNumber = i32;
73

74
/// Handles messages from peers via TCP
75
///
76
/// also handles messages from main task over the main-to-peer-tasks broadcast
77
/// channel.
78
#[derive(Debug, Clone)]
79
pub struct PeerLoopHandler {
80
    to_main_tx: mpsc::Sender<PeerTaskToMain>,
81
    global_state_lock: GlobalStateLock,
82
    peer_address: SocketAddr,
83
    peer_handshake_data: HandshakeData,
84
    inbound_connection: bool,
85
    distance: u8,
86
    rng: StdRng,
87
    #[cfg(test)]
88
    mock_now: Option<Timestamp>,
89
}
90

91
impl PeerLoopHandler {
92
    pub(crate) fn new(
24✔
93
        to_main_tx: mpsc::Sender<PeerTaskToMain>,
24✔
94
        global_state_lock: GlobalStateLock,
24✔
95
        peer_address: SocketAddr,
24✔
96
        peer_handshake_data: HandshakeData,
24✔
97
        inbound_connection: bool,
24✔
98
        distance: u8,
24✔
99
    ) -> Self {
24✔
100
        Self {
24✔
101
            to_main_tx,
24✔
102
            global_state_lock,
24✔
103
            peer_address,
24✔
104
            peer_handshake_data,
24✔
105
            inbound_connection,
24✔
106
            distance,
24✔
107
            rng: StdRng::from_rng(&mut rand::rng()),
24✔
108
            #[cfg(test)]
24✔
109
            mock_now: None,
24✔
110
        }
24✔
111
    }
24✔
112

113
    /// Allows for mocked timestamps such that time dependencies may be tested.
114
    #[cfg(test)]
115
    pub(crate) fn with_mocked_time(
20✔
116
        to_main_tx: mpsc::Sender<PeerTaskToMain>,
20✔
117
        global_state_lock: GlobalStateLock,
20✔
118
        peer_address: SocketAddr,
20✔
119
        peer_handshake_data: HandshakeData,
20✔
120
        inbound_connection: bool,
20✔
121
        distance: u8,
20✔
122
        mocked_time: Timestamp,
20✔
123
    ) -> Self {
20✔
124
        Self {
20✔
125
            to_main_tx,
20✔
126
            global_state_lock,
20✔
127
            peer_address,
20✔
128
            peer_handshake_data,
20✔
129
            inbound_connection,
20✔
130
            distance,
20✔
131
            mock_now: Some(mocked_time),
20✔
132
            rng: StdRng::from_rng(&mut rand::rng()),
20✔
133
        }
20✔
134
    }
20✔
135

136
    /// Overwrite the random number generator object with a specific one.
137
    ///
138
    /// Useful for derandomizing tests.
139
    #[cfg(test)]
140
    fn set_rng(&mut self, rng: StdRng) {
1✔
141
        self.rng = rng;
1✔
142
    }
1✔
143

144
    fn now(&self) -> Timestamp {
30✔
145
        #[cfg(not(test))]
30✔
146
        {
30✔
147
            Timestamp::now()
30✔
148
        }
30✔
149
        #[cfg(test)]
30✔
150
        {
30✔
151
            self.mock_now.unwrap_or(Timestamp::now())
30✔
152
        }
30✔
153
    }
30✔
154

155
    /// Punish a peer for bad behavior.
156
    ///
157
    /// Return `Err` if the peer in question is (now) banned.
158
    ///
159
    /// # Locking:
160
    ///   * acquires `global_state_lock` for write
161
    async fn punish(&mut self, reason: NegativePeerSanction) -> Result<()> {
27✔
162
        let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
27✔
163
        warn!("Punishing peer {} for {:?}", self.peer_address.ip(), reason);
27✔
164
        debug!(
27✔
165
            "Peer standing before punishment is {}",
×
166
            global_state_mut
×
167
                .net
×
168
                .peer_map
×
169
                .get(&self.peer_address)
×
170
                .unwrap()
×
171
                .standing
172
        );
173

174
        let Some(peer_info) = global_state_mut.net.peer_map.get_mut(&self.peer_address) else {
27✔
175
            bail!("Could not read peer map.");
×
176
        };
177
        let sanction_result = peer_info.standing.sanction(PeerSanction::Negative(reason));
27✔
178
        if let Err(err) = sanction_result {
27✔
179
            warn!("Banning peer: {err}");
1✔
180
        }
26✔
181

182
        sanction_result.map_err(|err| anyhow::anyhow!("Banning peer: {err}"))
27✔
183
    }
27✔
184

185
    /// Reward a peer for good behavior.
186
    ///
187
    /// Return `Err` if the peer in question is banned.
188
    ///
189
    /// # Locking:
190
    ///   * acquires `global_state_lock` for write
191
    async fn reward(&mut self, reason: PositivePeerSanction) -> Result<()> {
8✔
192
        let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
8✔
193
        info!("Rewarding peer {} for {:?}", self.peer_address.ip(), reason);
8✔
194
        let Some(peer_info) = global_state_mut.net.peer_map.get_mut(&self.peer_address) else {
8✔
195
            error!("Could not read peer map.");
1✔
196
            return Ok(());
1✔
197
        };
198
        let sanction_result = peer_info.standing.sanction(PeerSanction::Positive(reason));
7✔
199
        if sanction_result.is_err() {
7✔
200
            error!("Cannot reward banned peer");
×
201
        }
7✔
202

203
        sanction_result.map_err(|err| anyhow::anyhow!("Cannot reward banned peer: {err}"))
7✔
204
    }
8✔
205

206
    /// Construct a batch response, with blocks and their MMR membership proofs
207
    /// relative to a specified anchor.
208
    ///
209
    /// Returns `None` if the anchor has a lower leaf count than the blocks, or
210
    /// a block height of the response exceeds own tip height.
211
    async fn batch_response(
16✔
212
        state: &GlobalState,
16✔
213
        blocks: Vec<Block>,
16✔
214
        anchor: &MmrAccumulator,
16✔
215
    ) -> Option<Vec<(TransferBlock, MmrMembershipProof)>> {
16✔
216
        let own_tip_height = state.chain.light_state().header().height;
16✔
217
        let block_heights_match_anchor = blocks
16✔
218
            .iter()
16✔
219
            .all(|bl| bl.header().height < anchor.num_leafs().into());
46✔
220
        let block_heights_known = blocks.iter().all(|bl| bl.header().height <= own_tip_height);
46✔
221
        if !block_heights_match_anchor || !block_heights_known {
16✔
222
            let max_block_height = match blocks.iter().map(|bl| bl.header().height).max() {
×
223
                Some(height) => height.to_string(),
×
224
                None => "None".to_owned(),
×
225
            };
226

227
            debug!("max_block_height: {max_block_height}");
×
228
            debug!("own_tip_height: {own_tip_height}");
×
229
            debug!("anchor.num_leafs(): {}", anchor.num_leafs());
×
230
            debug!("block_heights_match_anchor: {block_heights_match_anchor}");
×
231
            debug!("block_heights_known: {block_heights_known}");
×
232
            return None;
×
233
        }
16✔
234

16✔
235
        let mut ret = vec![];
16✔
236
        for block in blocks {
62✔
237
            let mmr_mp = state
46✔
238
                .chain
46✔
239
                .archival_state()
46✔
240
                .archival_block_mmr
46✔
241
                .ammr()
46✔
242
                .prove_membership_relative_to_smaller_mmr(
46✔
243
                    block.header().height.into(),
46✔
244
                    anchor.num_leafs(),
46✔
245
                )
46✔
246
                .await;
46✔
247
            let block: TransferBlock = block.try_into().unwrap();
46✔
248
            ret.push((block, mmr_mp));
46✔
249
        }
250

251
        Some(ret)
16✔
252
    }
16✔
253

254
    /// Handle validation and send all blocks to the main task if they're all
255
    /// valid. Use with a list of blocks or a single block. When the
256
    /// `received_blocks` is a list, the parent of the `i+1`th block in the
257
    /// list is the `i`th block. The parent of element zero in this list is
258
    /// `parent_of_first_block`.
259
    ///
260
    /// # Return Value
261
    ///  - `Err` when the connection should be closed;
262
    ///  - `Ok(None)` if some block is invalid
263
    ///  - `Ok(None)` if the last block has insufficient cumulative PoW and we
264
    ///    are not syncing;
265
    ///  - `Ok(None)` if the last block has insufficient height and we are
266
    ///    syncing;
267
    ///  - `Ok(Some(block_height))` otherwise, referring to the block with the
268
    ///    highest height in the batch.
269
    ///
270
    /// A return value of Ok(Some(_)) means that the message was passed on to
271
    /// main loop.
272
    ///
273
    /// # Locking
274
    ///   * Acquires `global_state_lock` for write via `self.punish(..)` and
275
    ///     `self.reward(..)`.
276
    ///
277
    /// # Panics
278
    ///
279
    ///  - Panics if called with the empty list.
280
    async fn handle_blocks(
8✔
281
        &mut self,
8✔
282
        received_blocks: Vec<Block>,
8✔
283
        parent_of_first_block: Block,
8✔
284
    ) -> Result<Option<BlockHeight>> {
8✔
285
        debug!(
8✔
286
            "attempting to validate {} {}",
×
287
            received_blocks.len(),
×
288
            if received_blocks.len() == 1 {
×
289
                "block"
×
290
            } else {
291
                "blocks"
×
292
            }
293
        );
294
        let now = self.now();
8✔
295
        debug!("validating with respect to current timestamp {now}");
8✔
296
        let mut previous_block = &parent_of_first_block;
8✔
297
        for new_block in &received_blocks {
24✔
298
            let new_block_has_proof_of_work = new_block.has_proof_of_work(previous_block.header());
17✔
299
            debug!("new block has proof of work? {new_block_has_proof_of_work}");
17✔
300
            let new_block_is_valid = new_block.is_valid(previous_block, now).await;
17✔
301
            debug!("new block is valid? {new_block_is_valid}");
17✔
302
            if !new_block_has_proof_of_work {
17✔
303
                warn!(
1✔
304
                    "Received invalid proof-of-work for block of height {} from peer with IP {}",
×
305
                    new_block.kernel.header.height, self.peer_address
×
306
                );
307
                warn!("Difficulty is {}.", previous_block.kernel.header.difficulty);
1✔
308
                warn!(
1✔
309
                    "Proof of work should be {} (or more) but was [{}].",
×
310
                    previous_block.kernel.header.difficulty.target(),
×
311
                    new_block.hash().values().iter().join(", ")
×
312
                );
313
                self.punish(NegativePeerSanction::InvalidBlock((
1✔
314
                    new_block.kernel.header.height,
1✔
315
                    new_block.hash(),
1✔
316
                )))
1✔
317
                .await?;
1✔
318
                warn!("Failed to validate block due to insufficient PoW");
1✔
319
                return Ok(None);
1✔
320
            } else if !new_block_is_valid {
16✔
321
                warn!(
×
322
                    "Received invalid block of height {} from peer with IP {}",
×
323
                    new_block.kernel.header.height, self.peer_address
×
324
                );
325
                self.punish(NegativePeerSanction::InvalidBlock((
×
326
                    new_block.kernel.header.height,
×
327
                    new_block.hash(),
×
328
                )))
×
329
                .await?;
×
330
                warn!("Failed to validate block: invalid block");
×
331
                return Ok(None);
×
332
            }
16✔
333
            info!(
16✔
334
                "Block with height {} is valid. mined: {}",
×
335
                new_block.kernel.header.height,
×
336
                new_block.kernel.header.timestamp.standard_format()
×
337
            );
338

339
            previous_block = new_block;
16✔
340
        }
341

342
        // evaluate the fork choice rule
343
        debug!("Checking last block's canonicity ...");
7✔
344
        let last_block = received_blocks.last().unwrap();
7✔
345
        let is_canonical = self
7✔
346
            .global_state_lock
7✔
347
            .lock_guard()
7✔
348
            .await
7✔
349
            .incoming_block_is_more_canonical(last_block);
7✔
350
        let last_block_height = last_block.header().height;
7✔
351
        let sync_mode_active_and_have_new_champion = self
7✔
352
            .global_state_lock
7✔
353
            .lock_guard()
7✔
354
            .await
7✔
355
            .net
356
            .sync_anchor
357
            .as_ref()
7✔
358
            .is_some_and(|x| {
7✔
359
                x.champion
×
360
                    .is_none_or(|(height, _)| height < last_block_height)
×
361
            });
7✔
362
        if !is_canonical && !sync_mode_active_and_have_new_champion {
7✔
363
            warn!(
1✔
364
                "Received {} blocks from peer but incoming blocks are less \
×
365
            canonical than current tip, or current sync-champion.",
×
366
                received_blocks.len()
×
367
            );
368
            return Ok(None);
1✔
369
        }
6✔
370

6✔
371
        // Send the new blocks to the main task which handles the state update
6✔
372
        // and storage to the database.
6✔
373
        let number_of_received_blocks = received_blocks.len();
6✔
374
        self.to_main_tx
6✔
375
            .send(PeerTaskToMain::NewBlocks(received_blocks))
6✔
376
            .await?;
6✔
377
        info!(
6✔
378
            "Updated block info by block from peer. block height {}",
×
379
            last_block_height
380
        );
381

382
        // Valuable, new, hard-to-produce information. Reward peer.
383
        self.reward(PositivePeerSanction::ValidBlocks(number_of_received_blocks))
6✔
384
            .await?;
6✔
385

386
        Ok(Some(last_block_height))
6✔
387
    }
8✔
388

389
    /// Take a single block received from a peer and (attempt to) find a path
390
    /// between the received block and a common ancestor stored in the blocks
391
    /// database.
392
    ///
393
    /// This function attempts to find the parent of the received block, either
394
    /// by searching the database or by requesting it from a peer.
395
    ///  - If the parent is not stored, it is requested from the peer and the
396
    ///    received block is pushed to the fork reconciliation list for later
397
    ///    handling by this function. The fork reconciliation list starts out
398
    ///    empty, but grows as more parents are requested and transmitted.
399
    ///  - If the parent is found in the database, a) block handling continues:
400
    ///    the entire list of fork reconciliation blocks are passed down the
401
    ///    pipeline, potentially leading to a state update; and b) the fork
402
    ///    reconciliation list is cleared.
403
    ///
404
    /// Locking:
405
    ///   * Acquires `global_state_lock` for write via `self.punish(..)` and
406
    ///     `self.reward(..)`.
407
    async fn try_ensure_path<S>(
22✔
408
        &mut self,
22✔
409
        received_block: Box<Block>,
22✔
410
        peer: &mut S,
22✔
411
        peer_state: &mut MutablePeerState,
22✔
412
    ) -> Result<()>
22✔
413
    where
22✔
414
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
22✔
415
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
22✔
416
        <S as TryStream>::Error: std::error::Error,
22✔
417
    {
22✔
418
        // Does the received block match the fork reconciliation list?
419
        let received_block_matches_fork_reconciliation_list = if let Some(successor) =
22✔
420
            peer_state.fork_reconciliation_blocks.last()
22✔
421
        {
422
            let valid = successor
10✔
423
                .is_valid(received_block.as_ref(), self.now())
10✔
424
                .await;
10✔
425
            if !valid {
10✔
426
                warn!(
×
427
                        "Fork reconciliation failed after receiving {} blocks: successor of received block is invalid",
×
428
                        peer_state.fork_reconciliation_blocks.len() + 1
×
429
                    );
430
            }
10✔
431
            valid
10✔
432
        } else {
433
            true
12✔
434
        };
435

436
        // Are we running out of RAM?
437
        let too_many_blocks = peer_state.fork_reconciliation_blocks.len() + 1
22✔
438
            >= self.global_state_lock.cli().sync_mode_threshold;
22✔
439
        if too_many_blocks {
22✔
440
            warn!(
1✔
441
                "Fork reconciliation failed after receiving {} blocks: block count exceeds sync mode threshold",
×
442
                peer_state.fork_reconciliation_blocks.len() + 1
×
443
            );
444
        }
21✔
445

446
        // Block mismatch or too many blocks: abort!
447
        if !received_block_matches_fork_reconciliation_list || too_many_blocks {
22✔
448
            self.punish(NegativePeerSanction::ForkResolutionError((
1✔
449
                received_block.header().height,
1✔
450
                peer_state.fork_reconciliation_blocks.len() as u16,
1✔
451
                received_block.hash(),
1✔
452
            )))
1✔
453
            .await?;
1✔
454
            peer_state.fork_reconciliation_blocks = vec![];
1✔
455
            return Ok(());
1✔
456
        }
21✔
457

21✔
458
        // otherwise, append
21✔
459
        peer_state.fork_reconciliation_blocks.push(*received_block);
21✔
460

21✔
461
        // Try fetch parent
21✔
462
        let received_block_header = *peer_state
21✔
463
            .fork_reconciliation_blocks
21✔
464
            .last()
21✔
465
            .unwrap()
21✔
466
            .header();
21✔
467

21✔
468
        let parent_digest = received_block_header.prev_block_digest;
21✔
469
        let parent_height = received_block_header.height.previous()
21✔
470
            .expect("transferred block must have previous height because genesis block cannot be transferred");
21✔
471
        debug!("Try ensure path: fetching parent block");
21✔
472
        let parent_block = self
21✔
473
            .global_state_lock
21✔
474
            .lock_guard()
21✔
475
            .await
21✔
476
            .chain
477
            .archival_state()
21✔
478
            .get_block(parent_digest)
21✔
479
            .await?;
21✔
480
        debug!(
21✔
481
            "Completed parent block fetching from DB: {}",
×
482
            if parent_block.is_some() {
×
483
                "found".to_string()
×
484
            } else {
485
                "not found".to_string()
×
486
            }
487
        );
488

489
        // If parent is not known (but not genesis) request it.
490
        let Some(parent_block) = parent_block else {
21✔
491
            if parent_height.is_genesis() {
13✔
492
                peer_state.fork_reconciliation_blocks.clear();
1✔
493
                self.punish(NegativePeerSanction::DifferentGenesis).await?;
1✔
494
                return Ok(());
×
495
            }
12✔
496
            info!(
12✔
497
                "Parent not known: Requesting previous block with height {} from peer",
×
498
                parent_height
499
            );
500

501
            peer.send(PeerMessage::BlockRequestByHash(parent_digest))
12✔
502
                .await?;
12✔
503

504
            return Ok(());
12✔
505
        };
506

507
        // We want to treat the received fork reconciliation blocks (plus the
508
        // received block) in reverse order, from oldest to newest, because
509
        // they were requested from high to low block height.
510
        let mut new_blocks = peer_state.fork_reconciliation_blocks.clone();
8✔
511
        new_blocks.reverse();
8✔
512

8✔
513
        // Reset the fork resolution state since we got all the way back to a
8✔
514
        // block that we have.
8✔
515
        let fork_reconciliation_event = !peer_state.fork_reconciliation_blocks.is_empty();
8✔
516
        peer_state.fork_reconciliation_blocks.clear();
8✔
517

518
        if let Some(new_block_height) = self.handle_blocks(new_blocks, parent_block).await? {
8✔
519
            // If `BlockNotification` was received during a block reconciliation
520
            // event, then the peer might have one (or more (unlikely)) blocks
521
            // that we do not have. We should thus request those blocks.
522
            if fork_reconciliation_event
6✔
523
                && peer_state.highest_shared_block_height > new_block_height
6✔
524
            {
525
                peer.send(PeerMessage::BlockRequestByHeight(
1✔
526
                    peer_state.highest_shared_block_height,
1✔
527
                ))
1✔
528
                .await?;
1✔
529
            }
5✔
530
        }
2✔
531

532
        Ok(())
8✔
533
    }
22✔
534

535
    // #[cfg(test)]
536
    // pub(crate) async fn handle_peer_message_test(
537
    //     &mut self,
538
    //     msg: PeerMessage,
539
    //     peer: &mut crate::tests::shared::Mock<PeerMessage>,
540
    //     peer_state_info: &mut MutablePeerState,
541
    // ) -> Result<bool> {self.handle_peer_message(msg, peer, peer_state_info).await}
542

543
    /// Handle peer messages and returns Ok(true) if connection should be closed.
544
    /// Connection should also be closed if an error is returned.
545
    /// Otherwise, returns OK(false).
546
    ///
547
    /// Locking:
548
    ///   * Acquires `global_state_lock` for read.
549
    ///   * Acquires `global_state_lock` for write via `self.punish(..)` and
550
    ///     `self.reward(..)`.
551
    async fn handle_peer_message<S>(
144✔
552
        &mut self,
144✔
553
        msg: PeerMessage,
144✔
554
        peer: &mut S,
144✔
555
        peer_state_info: &mut MutablePeerState,
144✔
556
    ) -> Result<bool>
144✔
557
    where
144✔
558
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
144✔
559
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
144✔
560
        <S as TryStream>::Error: std::error::Error,
144✔
561
    {
144✔
562
        debug!(
144✔
563
            "Received {} from peer {}",
×
564
            msg.get_type(),
×
565
            self.peer_address
566
        );
567
        match msg {
144✔
568
            PeerMessage::Bye => {
569
                // Note that the current peer is not removed from the global_state.peer_map here
570
                // but that this is done by the caller.
571
                info!("Got bye. Closing connection to peer");
38✔
572
                Ok(DISCONNECT_CONNECTION)
38✔
573
            }
574
            PeerMessage::PeerListRequest => {
575
                let peer_info = {
8✔
576
                    log_slow_scope!(fn_name!() + "::PeerMessage::PeerListRequest");
8✔
577

578
                    // We are interested in the address on which peers accept ingoing connections,
579
                    // not in the address in which they are connected to us. We are only interested in
580
                    // peers that accept incoming connections.
581
                    let mut peer_info: Vec<(SocketAddr, u128)> = self
8✔
582
                        .global_state_lock
8✔
583
                        .lock_guard()
8✔
584
                        .await
8✔
585
                        .net
586
                        .peer_map
587
                        .values()
8✔
588
                        .filter(|peer_info| peer_info.listen_address().is_some())
23✔
589
                        .take(MAX_PEER_LIST_LENGTH) // limit length of response
8✔
590
                        .map(|peer_info| {
23✔
591
                            (
23✔
592
                                // unwrap is safe bc of above `filter`
23✔
593
                                peer_info.listen_address().unwrap(),
23✔
594
                                peer_info.instance_id(),
23✔
595
                            )
23✔
596
                        })
23✔
597
                        .collect();
8✔
598

8✔
599
                    // We sort the returned list, so this function is easier to test
8✔
600
                    peer_info.sort_by_cached_key(|x| x.0);
23✔
601
                    peer_info
8✔
602
                };
8✔
603

8✔
604
                debug!("Responding with: {:?}", peer_info);
8✔
605
                peer.send(PeerMessage::PeerListResponse(peer_info)).await?;
8✔
606
                Ok(KEEP_CONNECTION_ALIVE)
8✔
607
            }
608
            PeerMessage::PeerListResponse(peers) => {
×
609
                log_slow_scope!(fn_name!() + "::PeerMessage::PeerListResponse");
×
610

×
611
                if peers.len() > MAX_PEER_LIST_LENGTH {
×
612
                    self.punish(NegativePeerSanction::FloodPeerListResponse)
×
613
                        .await?;
×
614
                }
×
615
                self.to_main_tx
×
616
                    .send(PeerTaskToMain::PeerDiscoveryAnswer((
×
617
                        peers,
×
618
                        self.peer_address,
×
619
                        // The distance to the revealed peers is 1 + this peer's distance
×
620
                        self.distance + 1,
×
621
                    )))
×
622
                    .await?;
×
623
                Ok(KEEP_CONNECTION_ALIVE)
×
624
            }
625
            PeerMessage::BlockNotificationRequest => {
626
                debug!("Got BlockNotificationRequest");
5✔
627

628
                peer.send(PeerMessage::BlockNotification(
5✔
629
                    self.global_state_lock
5✔
630
                        .lock_guard()
5✔
631
                        .await
5✔
632
                        .chain
633
                        .light_state()
5✔
634
                        .into(),
5✔
635
                ))
5✔
636
                .await?;
5✔
637

638
                Ok(KEEP_CONNECTION_ALIVE)
5✔
639
            }
640
            PeerMessage::BlockNotification(block_notification) => {
5✔
641
                const SYNC_CHALLENGE_COOLDOWN: Timestamp = Timestamp::minutes(10);
642

643
                let (tip_header, sync_anchor_is_set) = {
5✔
644
                    let state = self.global_state_lock.lock_guard().await;
5✔
645
                    (
5✔
646
                        *state.chain.light_state().header(),
5✔
647
                        state.net.sync_anchor.is_some(),
5✔
648
                    )
5✔
649
                };
5✔
650
                debug!(
5✔
651
                    "Got BlockNotification of height {}. Own height is {}",
×
652
                    block_notification.height, tip_header.height
653
                );
654

655
                let sync_mode_threshold = self.global_state_lock.cli().sync_mode_threshold;
5✔
656
                let now = self.now();
5✔
657
                let time_since_latest_successful_challenge = peer_state_info
5✔
658
                    .successful_sync_challenge_response_time
5✔
659
                    .map(|then| now - then);
5✔
660
                let cooldown_expired = time_since_latest_successful_challenge
5✔
661
                    .is_none_or(|time_passed| time_passed > SYNC_CHALLENGE_COOLDOWN);
5✔
662
                let exceeds_sync_mode_threshold = GlobalState::sync_mode_threshold_stateless(
5✔
663
                    &tip_header,
5✔
664
                    block_notification.height,
5✔
665
                    block_notification.cumulative_proof_of_work,
5✔
666
                    sync_mode_threshold,
5✔
667
                );
5✔
668
                if cooldown_expired && exceeds_sync_mode_threshold {
5✔
669
                    debug!("sync mode criterion satisfied.");
2✔
670

671
                    if peer_state_info.sync_challenge.is_some() {
2✔
672
                        warn!("Cannot launch new sync challenge because one is already on-going.");
×
673
                        return Ok(KEEP_CONNECTION_ALIVE);
×
674
                    }
2✔
675

2✔
676
                    info!(
2✔
677
                        "Peer indicates block which satisfies sync mode criterion, issuing challenge."
×
678
                    );
679
                    let challenge = SyncChallenge::generate(
2✔
680
                        &block_notification,
2✔
681
                        tip_header.height,
2✔
682
                        self.rng.random(),
2✔
683
                    );
2✔
684
                    peer_state_info.sync_challenge = Some(IssuedSyncChallenge::new(
2✔
685
                        challenge,
2✔
686
                        block_notification.cumulative_proof_of_work,
2✔
687
                        self.now(),
2✔
688
                    ));
2✔
689

2✔
690
                    debug!("sending challenge ...");
2✔
691
                    peer.send(PeerMessage::SyncChallenge(challenge)).await?;
2✔
692

693
                    return Ok(KEEP_CONNECTION_ALIVE);
2✔
694
                }
3✔
695

3✔
696
                peer_state_info.highest_shared_block_height = block_notification.height;
3✔
697
                let block_is_new = tip_header.cumulative_proof_of_work
3✔
698
                    < block_notification.cumulative_proof_of_work;
3✔
699

3✔
700
                debug!("block_is_new: {}", block_is_new);
3✔
701

702
                if block_is_new
3✔
703
                    && peer_state_info.fork_reconciliation_blocks.is_empty()
3✔
704
                    && !sync_anchor_is_set
2✔
705
                    && !exceeds_sync_mode_threshold
2✔
706
                {
707
                    debug!(
1✔
708
                        "sending BlockRequestByHeight to peer for block with height {}",
×
709
                        block_notification.height
710
                    );
711
                    peer.send(PeerMessage::BlockRequestByHeight(block_notification.height))
1✔
712
                        .await?;
1✔
713
                } else {
714
                    debug!(
2✔
715
                        "ignoring peer block. height {}. new: {}, reconciling_fork: {}",
×
716
                        block_notification.height,
×
717
                        block_is_new,
×
718
                        !peer_state_info.fork_reconciliation_blocks.is_empty()
×
719
                    );
720
                }
721

722
                Ok(KEEP_CONNECTION_ALIVE)
3✔
723
            }
724
            PeerMessage::SyncChallenge(sync_challenge) => {
8✔
725
                let response = {
×
726
                    log_slow_scope!(fn_name!() + "::PeerMessage::SyncChallenge");
8✔
727

8✔
728
                    info!("Got sync challenge from {}", self.peer_address.ip());
8✔
729

730
                    let response = self
8✔
731
                        .global_state_lock
8✔
732
                        .lock_guard()
8✔
733
                        .await
8✔
734
                        .response_to_sync_challenge(sync_challenge)
8✔
735
                        .await;
8✔
736

737
                    match response {
8✔
738
                        Ok(resp) => resp,
×
739
                        Err(e) => {
8✔
740
                            warn!("could not generate sync challenge response:\n{e}");
8✔
741
                            self.punish(NegativePeerSanction::InvalidSyncChallenge)
8✔
742
                                .await?;
8✔
743
                            return Ok(KEEP_CONNECTION_ALIVE);
8✔
744
                        }
745
                    }
746
                };
747

748
                info!(
×
749
                    "Responding to sync challenge from {}",
×
750
                    self.peer_address.ip()
×
751
                );
752
                peer.send(PeerMessage::SyncChallengeResponse(Box::new(response)))
×
753
                    .await?;
×
754

755
                Ok(KEEP_CONNECTION_ALIVE)
×
756
            }
757
            PeerMessage::SyncChallengeResponse(challenge_response) => {
3✔
758
                const SYNC_RESPONSE_TIMEOUT: Timestamp = Timestamp::seconds(45);
759

760
                log_slow_scope!(fn_name!() + "::PeerMessage::SyncChallengeResponse");
3✔
761
                info!(
3✔
762
                    "Got sync challenge response from {}",
×
763
                    self.peer_address.ip()
×
764
                );
765

766
                // The purpose of the sync challenge and sync challenge response
767
                // is to avoid going into sync mode based on a malicious target
768
                // fork. Instead of verifying that the claimed proof-of-work
769
                // number is correct (which would require sending and verifying,
770
                // at least, all blocks between luca (whatever that is) and the
771
                // claimed tip), we use a heuristic that requires less
772
                // communication and less verification work. The downside of
773
                // using a heuristic here is a nonzero false positive and false
774
                // negative rate. Note that the false negative event
775
                // (maliciously sending someone into sync mode based on a bogus
776
                // fork) still requires a significant amount of work from the
777
                // attacker, *in addition* to being lucky. Also, down the line
778
                // succinctness (and more specifically, recursive block
779
                // validation) obviates this entire subprotocol.
780

781
                // Did we issue a challenge?
782
                let Some(issued_challenge) = peer_state_info.sync_challenge else {
3✔
783
                    warn!("Sync challenge response was not prompted.");
2✔
784
                    self.punish(NegativePeerSanction::UnexpectedSyncChallengeResponse)
2✔
785
                        .await?;
2✔
786
                    return Ok(KEEP_CONNECTION_ALIVE);
2✔
787
                };
788

789
                // Reset the challenge, regardless of the response's success.
790
                peer_state_info.sync_challenge = None;
1✔
791

1✔
792
                // Does response match issued challenge?
1✔
793
                if !challenge_response.matches(issued_challenge) {
1✔
794
                    self.punish(NegativePeerSanction::InvalidSyncChallengeResponse)
×
795
                        .await?;
×
796
                    return Ok(KEEP_CONNECTION_ALIVE);
×
797
                }
1✔
798

1✔
799
                // Does response verify?
1✔
800
                let claimed_tip_height = challenge_response.tip.header.height;
1✔
801
                let now = self.now();
1✔
802
                if !challenge_response.is_valid(now).await {
1✔
803
                    self.punish(NegativePeerSanction::InvalidSyncChallengeResponse)
×
804
                        .await?;
×
805
                    return Ok(KEEP_CONNECTION_ALIVE);
×
806
                }
1✔
807

808
                // Does cumulative proof-of-work evolve reasonably?
809
                let own_tip_header = *self
1✔
810
                    .global_state_lock
1✔
811
                    .lock_guard()
1✔
812
                    .await
1✔
813
                    .chain
814
                    .light_state()
1✔
815
                    .header();
1✔
816
                if !challenge_response
1✔
817
                    .check_pow(self.global_state_lock.cli().network, own_tip_header.height)
1✔
818
                {
819
                    self.punish(NegativePeerSanction::FishyPowEvolutionChallengeResponse)
×
820
                        .await?;
×
821
                    return Ok(KEEP_CONNECTION_ALIVE);
×
822
                }
1✔
823

1✔
824
                // Is there some specific (*i.e.*, not aggregate) proof of work?
1✔
825
                if !challenge_response.check_difficulty(own_tip_header.difficulty) {
1✔
826
                    self.punish(NegativePeerSanction::FishyDifficultiesChallengeResponse)
×
827
                        .await?;
×
828
                    return Ok(KEEP_CONNECTION_ALIVE);
×
829
                }
1✔
830

1✔
831
                // Did it come in time?
1✔
832
                if now - issued_challenge.issued_at > SYNC_RESPONSE_TIMEOUT {
1✔
833
                    self.punish(NegativePeerSanction::TimedOutSyncChallengeResponse)
×
834
                        .await?;
×
835
                    return Ok(KEEP_CONNECTION_ALIVE);
×
836
                }
1✔
837

1✔
838
                info!("Successful sync challenge response; relaying peer tip info to main loop.");
1✔
839
                peer_state_info.successful_sync_challenge_response_time = Some(now);
1✔
840

1✔
841
                let mut sync_mmra_anchor = challenge_response.tip.body.block_mmr_accumulator;
1✔
842
                sync_mmra_anchor.append(issued_challenge.challenge.tip_digest);
1✔
843

1✔
844
                // Inform main loop
1✔
845
                self.to_main_tx
1✔
846
                    .send(PeerTaskToMain::AddPeerMaxBlockHeight {
1✔
847
                        peer_address: self.peer_address,
1✔
848
                        claimed_height: claimed_tip_height,
1✔
849
                        claimed_cumulative_pow: issued_challenge.accumulated_pow,
1✔
850
                        claimed_block_mmra: sync_mmra_anchor,
1✔
851
                    })
1✔
852
                    .await?;
1✔
853

854
                Ok(KEEP_CONNECTION_ALIVE)
1✔
855
            }
856
            PeerMessage::BlockRequestByHash(block_digest) => {
13✔
857
                match self
13✔
858
                    .global_state_lock
13✔
859
                    .lock_guard()
13✔
860
                    .await
13✔
861
                    .chain
862
                    .archival_state()
13✔
863
                    .get_block(block_digest)
13✔
864
                    .await?
13✔
865
                {
866
                    None => {
867
                        // TODO: Consider punishing here
868
                        warn!("Peer requested unknown block with hash {}", block_digest);
11✔
869
                        Ok(KEEP_CONNECTION_ALIVE)
11✔
870
                    }
871
                    Some(b) => {
×
872
                        peer.send(PeerMessage::Block(Box::new(b.try_into().unwrap())))
×
873
                            .await?;
×
874
                        Ok(KEEP_CONNECTION_ALIVE)
×
875
                    }
876
                }
877
            }
878
            PeerMessage::BlockRequestByHeight(block_height) => {
8✔
879
                let block_response = {
3✔
880
                    log_slow_scope!(fn_name!() + "::PeerMessage::BlockRequestByHeight");
8✔
881

8✔
882
                    debug!("Got BlockRequestByHeight of height {}", block_height);
8✔
883

884
                    let canonical_block_digest = self
8✔
885
                        .global_state_lock
8✔
886
                        .lock_guard()
8✔
887
                        .await
8✔
888
                        .chain
889
                        .archival_state()
8✔
890
                        .archival_block_mmr
8✔
891
                        .ammr()
8✔
892
                        .try_get_leaf(block_height.into())
8✔
893
                        .await;
8✔
894

895
                    let canonical_block_digest = match canonical_block_digest {
8✔
896
                        None => {
897
                            let own_tip_height = self
5✔
898
                                .global_state_lock
5✔
899
                                .lock_guard()
5✔
900
                                .await
5✔
901
                                .chain
902
                                .light_state()
5✔
903
                                .header()
5✔
904
                                .height;
5✔
905
                            warn!("Got block request by height ({block_height}) for unknown block. Own tip height is {own_tip_height}.");
5✔
906
                            self.punish(NegativePeerSanction::BlockRequestUnknownHeight)
5✔
907
                                .await?;
5✔
908

909
                            return Ok(KEEP_CONNECTION_ALIVE);
5✔
910
                        }
911
                        Some(digest) => digest,
3✔
912
                    };
913

914
                    let canonical_chain_block: Block = self
3✔
915
                        .global_state_lock
3✔
916
                        .lock_guard()
3✔
917
                        .await
3✔
918
                        .chain
919
                        .archival_state()
3✔
920
                        .get_block(canonical_block_digest)
3✔
921
                        .await?
3✔
922
                        .unwrap();
3✔
923

3✔
924
                    PeerMessage::Block(Box::new(canonical_chain_block.try_into().unwrap()))
3✔
925
                };
3✔
926

3✔
927
                debug!("Sending block");
3✔
928
                peer.send(block_response).await?;
3✔
929
                debug!("Sent block");
3✔
930
                Ok(KEEP_CONNECTION_ALIVE)
3✔
931
            }
932
            PeerMessage::Block(t_block) => {
22✔
933
                log_slow_scope!(fn_name!() + "::PeerMessage::Block");
22✔
934

22✔
935
                info!(
22✔
936
                    "Got new block from peer {}, height {}, mined {}",
×
937
                    self.peer_address,
×
938
                    t_block.header.height,
×
939
                    t_block.header.timestamp.standard_format()
×
940
                );
941
                let new_block_height = t_block.header.height;
22✔
942

943
                let block = match Block::try_from(*t_block) {
22✔
944
                    Ok(block) => Box::new(block),
22✔
945
                    Err(e) => {
×
946
                        warn!("Peer sent invalid block: {e:?}");
×
947
                        self.punish(NegativePeerSanction::InvalidTransferBlock)
×
948
                            .await?;
×
949

950
                        return Ok(KEEP_CONNECTION_ALIVE);
×
951
                    }
952
                };
953

954
                // Update the value for the highest known height that peer possesses iff
955
                // we are not in a fork reconciliation state.
956
                if peer_state_info.fork_reconciliation_blocks.is_empty() {
22✔
957
                    peer_state_info.highest_shared_block_height = new_block_height;
12✔
958
                }
12✔
959

960
                self.try_ensure_path(block, peer, peer_state_info).await?;
22✔
961

962
                // Reward happens as part of `try_ensure_path`
963

964
                Ok(KEEP_CONNECTION_ALIVE)
21✔
965
            }
966
            PeerMessage::BlockRequestBatch(BlockRequestBatch {
967
                known_blocks,
8✔
968
                max_response_len,
8✔
969
                anchor,
8✔
970
            }) => {
8✔
971
                debug!(
8✔
972
                    "Received BlockRequestBatch from peer {}, max_response_len: {max_response_len}",
×
973
                    self.peer_address
974
                );
975

976
                if known_blocks.len() > MAX_NUM_DIGESTS_IN_BATCH_REQUEST {
8✔
977
                    self.punish(NegativePeerSanction::BatchBlocksRequestTooManyDigests)
×
978
                        .await?;
×
979

980
                    return Ok(KEEP_CONNECTION_ALIVE);
×
981
                }
8✔
982

983
                // The last block in the list of the peers known block is the
984
                // earliest block, block with lowest height, the peer has
985
                // requested. If it does not belong to canonical chain, none of
986
                // the later will. So we can do an early abort in that case.
987
                let least_preferred = match known_blocks.last() {
8✔
988
                    Some(least_preferred) => *least_preferred,
8✔
989
                    None => {
990
                        self.punish(NegativePeerSanction::BatchBlocksRequestEmpty)
×
991
                            .await?;
×
992

993
                        return Ok(KEEP_CONNECTION_ALIVE);
×
994
                    }
995
                };
996

997
                let state = self.global_state_lock.lock_guard().await;
8✔
998
                let block_mmr_num_leafs = state.chain.light_state().header().height.next().into();
8✔
999
                let luca_is_known = state
8✔
1000
                    .chain
8✔
1001
                    .archival_state()
8✔
1002
                    .block_belongs_to_canonical_chain(least_preferred)
8✔
1003
                    .await;
8✔
1004
                if !luca_is_known || anchor.num_leafs() > block_mmr_num_leafs {
8✔
1005
                    drop(state);
×
1006
                    self.punish(NegativePeerSanction::BatchBlocksUnknownRequest)
×
1007
                        .await?;
×
1008
                    peer.send(PeerMessage::UnableToSatisfyBatchRequest).await?;
×
1009

1010
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1011
                }
8✔
1012

1013
                // Happy case: At least *one* of the blocks referenced by peer
1014
                // is known to us.
1015
                let first_block_in_response = {
8✔
1016
                    let mut first_block_in_response: Option<BlockHeight> = None;
8✔
1017
                    for block_digest in known_blocks {
10✔
1018
                        if state
10✔
1019
                            .chain
10✔
1020
                            .archival_state()
10✔
1021
                            .block_belongs_to_canonical_chain(block_digest)
10✔
1022
                            .await
10✔
1023
                        {
1024
                            let height = state
8✔
1025
                                .chain
8✔
1026
                                .archival_state()
8✔
1027
                                .get_block_header(block_digest)
8✔
1028
                                .await
8✔
1029
                                .unwrap()
8✔
1030
                                .height;
8✔
1031
                            first_block_in_response = Some(height);
8✔
1032
                            debug!(
8✔
1033
                                "Found block in canonical chain for batch response: {}",
×
1034
                                block_digest
1035
                            );
1036
                            break;
8✔
1037
                        }
2✔
1038
                    }
1039

1040
                    first_block_in_response
8✔
1041
                        .expect("existence of LUCA should have been established already.")
8✔
1042
                };
8✔
1043

8✔
1044
                debug!(
8✔
1045
                    "Peer's most preferred block has height {first_block_in_response}.\
×
1046
                 Now building response from that height."
×
1047
                );
1048

1049
                // Get the relevant blocks, at most batch-size many, descending from the
1050
                // peer's (alleged) most canonical block. Don't exceed `max_response_len`
1051
                // or `STANDARD_BLOCK_BATCH_SIZE` number of blocks in response.
1052
                let max_response_len = cmp::min(
8✔
1053
                    max_response_len,
8✔
1054
                    self.global_state_lock.cli().sync_mode_threshold,
8✔
1055
                );
8✔
1056
                let max_response_len = cmp::max(max_response_len, MINIMUM_BLOCK_BATCH_SIZE);
8✔
1057
                let max_response_len = cmp::min(max_response_len, STANDARD_BLOCK_BATCH_SIZE);
8✔
1058

8✔
1059
                let mut digests_of_returned_blocks = Vec::with_capacity(max_response_len);
8✔
1060
                let response_start_height: u64 = first_block_in_response.into();
8✔
1061
                let mut i: u64 = 1;
8✔
1062
                while digests_of_returned_blocks.len() < max_response_len {
31✔
1063
                    let block_height = response_start_height + i;
31✔
1064
                    match state
31✔
1065
                        .chain
31✔
1066
                        .archival_state()
31✔
1067
                        .archival_block_mmr
31✔
1068
                        .ammr()
31✔
1069
                        .try_get_leaf(block_height)
31✔
1070
                        .await
31✔
1071
                    {
1072
                        Some(digest) => {
23✔
1073
                            digests_of_returned_blocks.push(digest);
23✔
1074
                        }
23✔
1075
                        None => break,
8✔
1076
                    }
1077
                    i += 1;
23✔
1078
                }
1079

1080
                let mut returned_blocks: Vec<Block> =
8✔
1081
                    Vec::with_capacity(digests_of_returned_blocks.len());
8✔
1082
                for block_digest in digests_of_returned_blocks {
31✔
1083
                    let block = state
23✔
1084
                        .chain
23✔
1085
                        .archival_state()
23✔
1086
                        .get_block(block_digest)
23✔
1087
                        .await?
23✔
1088
                        .unwrap();
23✔
1089
                    returned_blocks.push(block);
23✔
1090
                }
1091

1092
                let response = Self::batch_response(&state, returned_blocks, &anchor).await;
8✔
1093

1094
                // issue 457. do not hold lock across a peer.send(), nor self.punish()
1095
                drop(state);
8✔
1096

1097
                let Some(response) = response else {
8✔
1098
                    warn!("Unable to satisfy batch-block request");
×
1099
                    self.punish(NegativePeerSanction::BatchBlocksUnknownRequest)
×
1100
                        .await?;
×
1101
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1102
                };
1103

1104
                debug!("Returning {} blocks in batch response", response.len());
8✔
1105

1106
                let response = PeerMessage::BlockResponseBatch(response);
8✔
1107
                peer.send(response).await?;
8✔
1108

1109
                Ok(KEEP_CONNECTION_ALIVE)
8✔
1110
            }
1111
            PeerMessage::BlockResponseBatch(authenticated_blocks) => {
×
1112
                log_slow_scope!(fn_name!() + "::PeerMessage::BlockResponseBatch");
×
1113

×
1114
                debug!(
×
1115
                    "handling block response batch with {} blocks",
×
1116
                    authenticated_blocks.len()
×
1117
                );
1118

1119
                // (Alan:) why is there even a minimum?
1120
                if authenticated_blocks.len() < MINIMUM_BLOCK_BATCH_SIZE {
×
1121
                    warn!("Got smaller batch response than allowed");
×
1122
                    self.punish(NegativePeerSanction::TooShortBlockBatch)
×
1123
                        .await?;
×
1124
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1125
                }
×
1126

1127
                // Verify that we are in fact in syncing mode
1128
                // TODO: Separate peer messages into those allowed under syncing
1129
                // and those that are not
1130
                let Some(sync_anchor) = self
×
1131
                    .global_state_lock
×
1132
                    .lock_guard()
×
1133
                    .await
×
1134
                    .net
1135
                    .sync_anchor
1136
                    .clone()
×
1137
                else {
1138
                    warn!("Received a batch of blocks without being in syncing mode");
×
1139
                    self.punish(NegativePeerSanction::ReceivedBatchBlocksOutsideOfSync)
×
1140
                        .await?;
×
1141
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1142
                };
1143

1144
                // Verify that the response matches the current state
1145
                // We get the latest block from the DB here since this message is
1146
                // only valid for archival nodes.
1147
                let (first_block, _) = &authenticated_blocks[0];
×
1148
                let first_blocks_parent_digest: Digest = first_block.header.prev_block_digest;
×
1149
                let most_canonical_own_block_match: Option<Block> = self
×
1150
                    .global_state_lock
×
1151
                    .lock_guard()
×
1152
                    .await
×
1153
                    .chain
1154
                    .archival_state()
×
1155
                    .get_block(first_blocks_parent_digest)
×
1156
                    .await
×
1157
                    .expect("Block lookup must succeed");
×
1158
                let most_canonical_own_block_match: Block = match most_canonical_own_block_match {
×
1159
                    Some(block) => block,
×
1160
                    None => {
1161
                        warn!("Got batch response with invalid start block");
×
1162
                        self.punish(NegativePeerSanction::BatchBlocksInvalidStartHeight)
×
1163
                            .await?;
×
1164
                        return Ok(KEEP_CONNECTION_ALIVE);
×
1165
                    }
1166
                };
1167

1168
                // Convert all blocks to Block objects
1169
                debug!(
×
1170
                    "Found own block of height {} to match received batch",
×
1171
                    most_canonical_own_block_match.kernel.header.height
×
1172
                );
1173
                let mut received_blocks = vec![];
×
1174
                for (t_block, membership_proof) in authenticated_blocks {
×
1175
                    let Ok(block) = Block::try_from(t_block) else {
×
1176
                        warn!("Received invalid transfer block from peer");
×
1177
                        self.punish(NegativePeerSanction::InvalidTransferBlock)
×
1178
                            .await?;
×
1179
                        return Ok(KEEP_CONNECTION_ALIVE);
×
1180
                    };
1181

1182
                    if !membership_proof.verify(
×
1183
                        block.header().height.into(),
×
1184
                        block.hash(),
×
1185
                        &sync_anchor.block_mmr.peaks(),
×
1186
                        sync_anchor.block_mmr.num_leafs(),
×
1187
                    ) {
×
1188
                        warn!("Authentication of received block fails relative to anchor");
×
1189
                        self.punish(NegativePeerSanction::InvalidBlockMmrAuthentication)
×
1190
                            .await?;
×
1191
                        return Ok(KEEP_CONNECTION_ALIVE);
×
1192
                    }
×
1193

×
1194
                    received_blocks.push(block);
×
1195
                }
1196

1197
                // Get the latest block that we know of and handle all received blocks
1198
                self.handle_blocks(received_blocks, most_canonical_own_block_match)
×
1199
                    .await?;
×
1200

1201
                // Reward happens as part of `handle_blocks`.
1202

1203
                Ok(KEEP_CONNECTION_ALIVE)
×
1204
            }
1205
            PeerMessage::UnableToSatisfyBatchRequest => {
1206
                log_slow_scope!(fn_name!() + "::PeerMessage::UnableToSatisfyBatchRequest");
×
1207
                warn!(
×
1208
                    "Peer {} reports inability to satisfy batch request.",
×
1209
                    self.peer_address
1210
                );
1211

1212
                Ok(KEEP_CONNECTION_ALIVE)
×
1213
            }
1214
            PeerMessage::Handshake(_) => {
1215
                log_slow_scope!(fn_name!() + "::PeerMessage::Handshake");
×
1216

×
1217
                // The handshake should have been sent during connection
×
1218
                // initialization. Here it is out of order at best, malicious at
×
1219
                // worst.
×
1220
                self.punish(NegativePeerSanction::InvalidMessage).await?;
×
1221
                Ok(KEEP_CONNECTION_ALIVE)
×
1222
            }
1223
            PeerMessage::ConnectionStatus(_) => {
1224
                log_slow_scope!(fn_name!() + "::PeerMessage::ConnectionStatus");
×
1225

×
1226
                // The connection status should have been sent during connection
×
1227
                // initialization. Here it is out of order at best, malicious at
×
1228
                // worst.
×
1229

×
1230
                self.punish(NegativePeerSanction::InvalidMessage).await?;
×
1231
                Ok(KEEP_CONNECTION_ALIVE)
×
1232
            }
1233
            PeerMessage::Transaction(transaction) => {
7✔
1234
                log_slow_scope!(fn_name!() + "::PeerMessage::Transaction");
7✔
1235

7✔
1236
                debug!(
7✔
1237
                    "`peer_loop` received following transaction from peer. {} inputs, {} outputs. Synced to mutator set hash: {}",
×
1238
                    transaction.kernel.inputs.len(),
×
1239
                    transaction.kernel.outputs.len(),
×
1240
                    transaction.kernel.mutator_set_hash
×
1241
                );
1242

1243
                let transaction: Transaction = (*transaction).into();
7✔
1244

7✔
1245
                // 1. If transaction is invalid, punish.
7✔
1246
                if !transaction.is_valid().await {
7✔
1247
                    warn!("Received invalid tx");
4✔
1248
                    self.punish(NegativePeerSanction::InvalidTransaction)
4✔
1249
                        .await?;
4✔
1250
                    return Ok(KEEP_CONNECTION_ALIVE);
4✔
1251
                }
2✔
1252

2✔
1253
                // 2. If transaction has coinbase, punish.
2✔
1254
                // Transactions received from peers have not been mined yet.
2✔
1255
                // Only the miner is allowed to produce transactions with non-empty coinbase fields.
2✔
1256
                if transaction.kernel.coinbase.is_some() {
2✔
1257
                    warn!("Received non-mined transaction with coinbase.");
×
1258
                    self.punish(NegativePeerSanction::NonMinedTransactionHasCoinbase)
×
1259
                        .await?;
×
1260
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1261
                }
2✔
1262

2✔
1263
                // 3. If negative fee, punish.
2✔
1264
                if transaction.kernel.fee.is_negative() {
2✔
1265
                    warn!("Received negative-fee transaction.");
×
1266
                    self.punish(NegativePeerSanction::TransactionWithNegativeFee)
×
1267
                        .await?;
×
1268
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1269
                }
2✔
1270

2✔
1271
                // 4. If transaction is already known, ignore.
2✔
1272
                if self
2✔
1273
                    .global_state_lock
2✔
1274
                    .lock_guard()
2✔
1275
                    .await
2✔
1276
                    .mempool
1277
                    .contains_with_higher_proof_quality(
1278
                        transaction.kernel.txid(),
2✔
1279
                        transaction.proof.proof_quality()?,
2✔
1280
                    )
1281
                {
1282
                    warn!("Received transaction that was already known");
×
1283

1284
                    // We received a transaction that we *probably* haven't requested.
1285
                    // Consider punishing here, if this is abused.
1286
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1287
                }
2✔
1288

1289
                // 5. if transaction is not confirmable, punish.
1290
                let (tip, mutator_set_accumulator_after) = {
2✔
1291
                    let state = self.global_state_lock.lock_guard().await;
2✔
1292

1293
                    (
2✔
1294
                        state.chain.light_state().hash(),
2✔
1295
                        state.chain.light_state().mutator_set_accumulator_after(),
2✔
1296
                    )
2✔
1297
                };
2✔
1298
                if !transaction.is_confirmable_relative_to(&mutator_set_accumulator_after) {
2✔
1299
                    warn!(
×
1300
                        "Received unconfirmable transaction with TXID {}. Unconfirmable because:",
×
UNCOV
1301
                        transaction.kernel.txid()
×
1302
                    );
1303
                    // get fine-grained error code for informative logging
1304
                    let confirmability_error_code = transaction
×
1305
                        .kernel
×
1306
                        .is_confirmable_relative_to(&mutator_set_accumulator_after);
×
1307
                    match confirmability_error_code {
×
1308
                        Ok(_) => unreachable!(),
×
1309
                        Err(TransactionConfirmabilityError::InvalidRemovalRecord(index)) => {
×
1310
                            warn!("invalid removal record (at index {index})");
×
1311
                            let invalid_removal_record = transaction.kernel.inputs[index].clone();
×
1312
                            let removal_record_error_code = invalid_removal_record
×
1313
                                .validate_inner(&mutator_set_accumulator_after);
×
1314
                            debug!(
×
UNCOV
1315
                                "Absolute index set of removal record {index}: {:?}",
×
1316
                                invalid_removal_record.absolute_indices
1317
                            );
1318
                            match removal_record_error_code {
×
UNCOV
1319
                                Ok(_) => unreachable!(),
×
1320
                                Err(RemovalRecordValidityError::AbsentAuthenticatedChunk) => {
UNCOV
1321
                                    debug!("invalid because membership proof is missing");
×
1322
                                }
1323
                                Err(RemovalRecordValidityError::InvalidSwbfiMmrMp {
1324
                                    chunk_index,
×
1325
                                }) => {
×
UNCOV
1326
                                    debug!("invalid because membership proof for chunk index {chunk_index} is invalid");
×
1327
                                }
1328
                            };
1329
                            self.punish(NegativePeerSanction::UnconfirmableTransaction)
×
1330
                                .await?;
×
UNCOV
1331
                            return Ok(KEEP_CONNECTION_ALIVE);
×
1332
                        }
1333
                        Err(TransactionConfirmabilityError::DuplicateInputs) => {
1334
                            warn!("duplicate inputs");
×
1335
                            self.punish(NegativePeerSanction::DoubleSpendingTransaction)
×
1336
                                .await?;
×
UNCOV
1337
                            return Ok(KEEP_CONNECTION_ALIVE);
×
1338
                        }
1339
                        Err(TransactionConfirmabilityError::AlreadySpentInput(index)) => {
×
1340
                            warn!("already spent input (at index {index})");
×
1341
                            self.punish(NegativePeerSanction::DoubleSpendingTransaction)
×
1342
                                .await?;
×
UNCOV
1343
                            return Ok(KEEP_CONNECTION_ALIVE);
×
1344
                        }
1345
                    };
1346
                }
2✔
1347

2✔
1348
                // If transaction cannot be applied to mutator set, punish.
2✔
1349
                // I don't think this can happen when above checks pass but we include
2✔
1350
                // the check to ensure that transaction can be applied.
2✔
1351
                let ms_update = MutatorSetUpdate::new(
2✔
1352
                    transaction.kernel.inputs.clone(),
2✔
1353
                    transaction.kernel.outputs.clone(),
2✔
1354
                );
2✔
1355
                let can_apply = ms_update
2✔
1356
                    .apply_to_accumulator(&mut mutator_set_accumulator_after.clone())
2✔
1357
                    .is_ok();
2✔
1358
                if !can_apply {
2✔
1359
                    warn!("Cannot apply transaction to current mutator set.");
×
1360
                    warn!("Transaction ID: {}", transaction.kernel.txid());
×
1361
                    self.punish(NegativePeerSanction::CannotApplyTransactionToMutatorSet)
×
1362
                        .await?;
×
UNCOV
1363
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1364
                }
2✔
1365

2✔
1366
                let tx_timestamp = transaction.kernel.timestamp;
2✔
1367

2✔
1368
                // 6. Ignore if transaction is too old
2✔
1369
                let now = self.now();
2✔
1370
                if tx_timestamp < now - Timestamp::seconds(MEMPOOL_TX_THRESHOLD_AGE_IN_SECS) {
2✔
1371
                    // TODO: Consider punishing here
1372
                    warn!("Received too old tx");
×
UNCOV
1373
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1374
                }
2✔
1375

2✔
1376
                // 7. Ignore if transaction is too far into the future
2✔
1377
                if tx_timestamp
2✔
1378
                    > now + Timestamp::seconds(MEMPOOL_IGNORE_TRANSACTIONS_THIS_MANY_SECS_AHEAD)
2✔
1379
                {
1380
                    // TODO: Consider punishing here
1381
                    warn!("Received tx too far into the future. Got timestamp: {tx_timestamp:?}");
×
UNCOV
1382
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1383
                }
2✔
1384

2✔
1385
                // Otherwise, relay to main
2✔
1386
                let pt2m_transaction = PeerTaskToMainTransaction {
2✔
1387
                    transaction,
2✔
1388
                    confirmable_for_block: tip,
2✔
1389
                };
2✔
1390
                self.to_main_tx
2✔
1391
                    .send(PeerTaskToMain::Transaction(Box::new(pt2m_transaction)))
2✔
1392
                    .await?;
2✔
1393

1394
                Ok(KEEP_CONNECTION_ALIVE)
2✔
1395
            }
1396
            PeerMessage::TransactionNotification(tx_notification) => {
8✔
1397
                // addresses #457
8✔
1398
                // new scope for state read-lock to avoid holding across peer.send()
8✔
1399
                {
8✔
1400
                    log_slow_scope!(fn_name!() + "::PeerMessage::TransactionNotification");
8✔
1401

1402
                    // 1. Ignore if we already know this transaction, and
1403
                    // the proof quality is not higher than what we already know.
1404
                    let state = self.global_state_lock.lock_guard().await;
8✔
1405
                    let transaction_of_same_or_higher_proof_quality_is_known =
8✔
1406
                        state.mempool.contains_with_higher_proof_quality(
8✔
1407
                            tx_notification.txid,
8✔
1408
                            tx_notification.proof_quality,
8✔
1409
                        );
8✔
1410
                    if transaction_of_same_or_higher_proof_quality_is_known {
8✔
1411
                        debug!("transaction with same or higher proof quality was already known");
4✔
1412
                        return Ok(KEEP_CONNECTION_ALIVE);
4✔
1413
                    }
4✔
1414

4✔
1415
                    // Only accept transactions that do not require executing
4✔
1416
                    // `update`.
4✔
1417
                    if state
4✔
1418
                        .chain
4✔
1419
                        .light_state()
4✔
1420
                        .mutator_set_accumulator_after()
4✔
1421
                        .hash()
4✔
1422
                        != tx_notification.mutator_set_hash
4✔
1423
                    {
1424
                        debug!("transaction refers to non-canonical mutator set state");
2✔
1425
                        return Ok(KEEP_CONNECTION_ALIVE);
2✔
1426
                    }
2✔
1427
                }
2✔
1428

2✔
1429
                // 2. Request the actual `Transaction` from peer
2✔
1430
                debug!("requesting transaction from peer");
2✔
1431
                peer.send(PeerMessage::TransactionRequest(tx_notification.txid))
2✔
1432
                    .await?;
2✔
1433

1434
                Ok(KEEP_CONNECTION_ALIVE)
2✔
1435
            }
1436
            PeerMessage::TransactionRequest(transaction_identifier) => {
3✔
1437
                if let Some(transaction) = self
3✔
1438
                    .global_state_lock
3✔
1439
                    .lock_guard()
3✔
1440
                    .await
3✔
1441
                    .mempool
1442
                    .get(transaction_identifier)
3✔
1443
                {
1444
                    if let Ok(transfer_transaction) = transaction.try_into() {
×
1445
                        peer.send(PeerMessage::Transaction(Box::new(transfer_transaction)))
×
UNCOV
1446
                            .await?;
×
1447
                    } else {
UNCOV
1448
                        warn!("Peer requested transaction that cannot be converted to transfer object");
×
1449
                    }
1450
                }
3✔
1451

1452
                Ok(KEEP_CONNECTION_ALIVE)
3✔
1453
            }
1454
            PeerMessage::BlockProposalNotification(block_proposal_notification) => {
1✔
1455
                let verdict = self
1✔
1456
                    .global_state_lock
1✔
1457
                    .lock_guard()
1✔
1458
                    .await
1✔
1459
                    .favor_incoming_block_proposal(
1✔
1460
                        block_proposal_notification.height,
1✔
1461
                        block_proposal_notification.guesser_fee,
1✔
1462
                    );
1✔
1463
                match verdict {
1✔
1464
                    Ok(_) => {
1465
                        peer.send(PeerMessage::BlockProposalRequest(
1✔
1466
                            BlockProposalRequest::new(block_proposal_notification.body_mast_hash),
1✔
1467
                        ))
1✔
1468
                        .await?
1✔
1469
                    }
UNCOV
1470
                    Err(reject_reason) => info!(
×
UNCOV
1471
                        "Got unfavorable block proposal notification from {} peer; rejecting. Reason:\n{reject_reason}",
×
1472
                        self.peer_address
1473
                    ),
1474
                }
1475

1476
                Ok(KEEP_CONNECTION_ALIVE)
1✔
1477
            }
1478
            PeerMessage::BlockProposalRequest(block_proposal_request) => {
5✔
1479
                let matching_proposal = self
5✔
1480
                    .global_state_lock
5✔
1481
                    .lock_guard()
5✔
1482
                    .await
5✔
1483
                    .mining_state
1484
                    .block_proposal
1485
                    .filter(|x| x.body().mast_hash() == block_proposal_request.body_mast_hash)
5✔
1486
                    .map(|x| x.to_owned());
5✔
1487
                if let Some(proposal) = matching_proposal {
5✔
UNCOV
1488
                    peer.send(PeerMessage::BlockProposal(Box::new(proposal)))
×
UNCOV
1489
                        .await?;
×
1490
                } else {
1491
                    self.punish(NegativePeerSanction::BlockProposalNotFound)
5✔
1492
                        .await?;
5✔
1493
                }
1494

1495
                Ok(KEEP_CONNECTION_ALIVE)
5✔
1496
            }
1497
            PeerMessage::BlockProposal(block) => {
2✔
1498
                info!("Got block proposal from peer.");
2✔
1499

1500
                let should_punish = {
2✔
1501
                    log_slow_scope!(fn_name!() + "::PeerMessage::BlockProposal::should_punish");
2✔
1502

1503
                    let (verdict, tip) = {
2✔
1504
                        let state = self.global_state_lock.lock_guard().await;
2✔
1505

1506
                        let verdict = state.favor_incoming_block_proposal(
2✔
1507
                            block.header().height,
2✔
1508
                            block.total_guesser_reward(),
2✔
1509
                        );
2✔
1510
                        let tip = state.chain.light_state().to_owned();
2✔
1511
                        (verdict, tip)
2✔
1512
                    };
1513

1514
                    if let Err(rejection_reason) = verdict {
2✔
UNCOV
1515
                        match rejection_reason {
×
1516
                            // no need to punish and log if the fees are equal.  we just ignore the incoming proposal.
UNCOV
1517
                            BlockProposalRejectError::InsufficientFee { current, received }
×
UNCOV
1518
                                if Some(received) == current =>
×
UNCOV
1519
                            {
×
1520
                                debug!("ignoring new block proposal because the fee is equal to the present one");
×
UNCOV
1521
                                None
×
1522
                            }
1523
                            _ => {
1524
                                warn!("Rejecting new block proposal:\n{rejection_reason}");
×
1525
                                Some(NegativePeerSanction::NonFavorableBlockProposal)
×
1526
                            }
1527
                        }
1528
                    } else {
1529
                        // Verify validity and that proposal is child of current tip
1530
                        if block.is_valid(&tip, self.now()).await {
2✔
1531
                            None // all is well, no punishment.
2✔
1532
                        } else {
UNCOV
1533
                            Some(NegativePeerSanction::InvalidBlockProposal)
×
1534
                        }
1535
                    }
1536
                };
1537

1538
                if let Some(sanction) = should_punish {
2✔
UNCOV
1539
                    self.punish(sanction).await?;
×
1540
                } else {
1541
                    self.send_to_main(PeerTaskToMain::BlockProposal(block), line!())
2✔
1542
                        .await?;
2✔
1543

1544
                    // Valuable, new, hard-to-produce information. Reward peer.
1545
                    self.reward(PositivePeerSanction::NewBlockProposal).await?;
2✔
1546
                }
1547

1548
                Ok(KEEP_CONNECTION_ALIVE)
2✔
1549
            }
1550
        }
1551
    }
141✔
1552

1553
    /// send msg to main via mpsc channel `to_main_tx` and logs if slow.
1554
    ///
1555
    /// the channel could potentially fill up in which case the send() will
1556
    /// block until there is capacity.  we wrap the send() so we can log if
1557
    /// that ever happens to the extent it passes slow-scope threshold.
1558
    async fn send_to_main(
2✔
1559
        &self,
2✔
1560
        msg: PeerTaskToMain,
2✔
1561
        line: u32,
2✔
1562
    ) -> Result<(), tokio::sync::mpsc::error::SendError<PeerTaskToMain>> {
2✔
1563
        // we measure across the send() in case the channel ever fills up.
2✔
1564
        log_slow_scope!(fn_name!() + &format!("peer_loop.rs:{}", line));
2✔
1565

2✔
1566
        self.to_main_tx.send(msg).await
2✔
1567
    }
2✔
1568

1569
    /// Handle message from main task. The boolean return value indicates if
1570
    /// the connection should be closed.
1571
    ///
1572
    /// Locking:
1573
    ///   * acquires `global_state_lock` for write via Self::punish()
UNCOV
1574
    async fn handle_main_task_message<S>(
×
UNCOV
1575
        &mut self,
×
UNCOV
1576
        msg: MainToPeerTask,
×
UNCOV
1577
        peer: &mut S,
×
UNCOV
1578
        peer_state_info: &mut MutablePeerState,
×
1579
    ) -> Result<bool>
×
1580
    where
×
1581
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
×
1582
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
×
1583
        <S as TryStream>::Error: std::error::Error,
×
1584
    {
×
1585
        debug!("Handling {} message from main in peer loop", msg.get_type());
×
1586
        match msg {
×
1587
            MainToPeerTask::Block(block) => {
×
1588
                // We don't currently differentiate whether a new block came from a peer, or from our
×
1589
                // own miner. It's always shared through this logic.
×
1590
                let new_block_height = block.kernel.header.height;
×
1591
                if new_block_height > peer_state_info.highest_shared_block_height {
×
1592
                    debug!("Sending PeerMessage::BlockNotification");
×
1593
                    peer_state_info.highest_shared_block_height = new_block_height;
×
1594
                    peer.send(PeerMessage::BlockNotification(block.as_ref().into()))
×
1595
                        .await?;
×
1596
                    debug!("Sent PeerMessage::BlockNotification");
×
1597
                }
×
1598
                Ok(KEEP_CONNECTION_ALIVE)
×
1599
            }
1600
            MainToPeerTask::RequestBlockBatch(batch_block_request) => {
×
1601
                // Only ask one of the peers about the batch of blocks
×
1602
                if batch_block_request.peer_addr_target != self.peer_address {
×
1603
                    return Ok(KEEP_CONNECTION_ALIVE);
×
UNCOV
1604
                }
×
1605

×
1606
                let max_response_len = std::cmp::min(
×
1607
                    STANDARD_BLOCK_BATCH_SIZE,
×
1608
                    self.global_state_lock.cli().sync_mode_threshold,
×
1609
                );
×
1610

×
1611
                peer.send(PeerMessage::BlockRequestBatch(BlockRequestBatch {
×
1612
                    known_blocks: batch_block_request.known_blocks,
×
1613
                    max_response_len,
×
1614
                    anchor: batch_block_request.anchor_mmr,
×
1615
                }))
×
1616
                .await?;
×
1617

1618
                Ok(KEEP_CONNECTION_ALIVE)
×
1619
            }
1620
            MainToPeerTask::PeerSynchronizationTimeout(socket_addr) => {
×
1621
                log_slow_scope!(fn_name!() + "::MainToPeerTask::PeerSynchronizationTimeout");
×
UNCOV
1622

×
1623
                if self.peer_address != socket_addr {
×
UNCOV
1624
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1625
                }
×
1626

×
1627
                self.punish(NegativePeerSanction::SynchronizationTimeout)
×
1628
                    .await?;
×
1629

1630
                // If this peer failed the last synchronization attempt, we only
1631
                // sanction, we don't disconnect.
1632
                Ok(KEEP_CONNECTION_ALIVE)
×
1633
            }
1634
            MainToPeerTask::MakePeerDiscoveryRequest => {
UNCOV
1635
                peer.send(PeerMessage::PeerListRequest).await?;
×
UNCOV
1636
                Ok(KEEP_CONNECTION_ALIVE)
×
1637
            }
UNCOV
1638
            MainToPeerTask::Disconnect(peer_address) => {
×
UNCOV
1639
                log_slow_scope!(fn_name!() + "::MainToPeerTask::Disconnect");
×
1640

×
1641
                // Only disconnect from the peer the main task requested a disconnect for.
×
UNCOV
1642
                if peer_address != self.peer_address {
×
1643
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1644
                }
×
1645
                self.register_peer_disconnection().await;
×
1646

1647
                Ok(DISCONNECT_CONNECTION)
×
1648
            }
1649
            MainToPeerTask::DisconnectAll() => {
1650
                self.register_peer_disconnection().await;
×
1651

1652
                Ok(DISCONNECT_CONNECTION)
×
1653
            }
UNCOV
1654
            MainToPeerTask::MakeSpecificPeerDiscoveryRequest(target_socket_addr) => {
×
1655
                if target_socket_addr == self.peer_address {
×
UNCOV
1656
                    peer.send(PeerMessage::PeerListRequest).await?;
×
1657
                }
×
UNCOV
1658
                Ok(KEEP_CONNECTION_ALIVE)
×
1659
            }
1660
            MainToPeerTask::TransactionNotification(transaction_notification) => {
×
1661
                debug!("Sending PeerMessage::TransactionNotification");
×
1662
                peer.send(PeerMessage::TransactionNotification(
×
1663
                    transaction_notification,
×
UNCOV
1664
                ))
×
1665
                .await?;
×
1666
                debug!("Sent PeerMessage::TransactionNotification");
×
1667
                Ok(KEEP_CONNECTION_ALIVE)
×
1668
            }
1669
            MainToPeerTask::BlockProposalNotification(block_proposal_notification) => {
×
1670
                debug!("Sending PeerMessage::BlockProposalNotification");
×
1671
                peer.send(PeerMessage::BlockProposalNotification(
×
1672
                    block_proposal_notification,
×
UNCOV
1673
                ))
×
1674
                .await?;
×
1675
                debug!("Sent PeerMessage::BlockProposalNotification");
×
1676
                Ok(KEEP_CONNECTION_ALIVE)
×
1677
            }
1678
        }
1679
    }
×
1680

1681
    /// Loop for the peer tasks. Awaits either a message from the peer over TCP,
1682
    /// or a message from main over the main-to-peer-tasks broadcast channel.
1683
    async fn run<S>(
43✔
1684
        &mut self,
43✔
1685
        mut peer: S,
43✔
1686
        mut from_main_rx: broadcast::Receiver<MainToPeerTask>,
43✔
1687
        peer_state_info: &mut MutablePeerState,
43✔
1688
    ) -> Result<()>
43✔
1689
    where
43✔
1690
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
43✔
1691
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
43✔
1692
        <S as TryStream>::Error: std::error::Error,
43✔
1693
    {
43✔
1694
        dbg!("into the very PeerLoop loop");
43✔
1695
        loop {
1696
            select! {
145✔
1697
                // Handle peer messages
1698
                peer_message = peer.try_next() => {
145✔
1699
                    let peer_address = self.peer_address;
144✔
1700
                    let peer_message = match peer_message {
144✔
1701
                        Ok(message) => message,
144✔
UNCOV
1702
                        Err(err) => {
×
UNCOV
1703
                            let msg = format!("Error when receiving from peer: {peer_address}");
×
UNCOV
1704
                            error!("{msg}. Error: {err}");
×
UNCOV
1705
                            bail!("{msg}. Closing connection.");
×
1706
                        }
1707
                    };
1708
                    let Some(peer_message) = peer_message else {
144✔
1709
                        info!("Peer {peer_address} closed connection.");
×
1710
                        break;
×
1711
                    };
1712

1713
                    let syncing =
144✔
1714
                        self.global_state_lock.lock(|s| s.net.sync_anchor.is_some()).await;
144✔
1715
                    let message_type = peer_message.get_type();
144✔
1716
                    if peer_message.ignore_during_sync() && syncing {
144✔
UNCOV
1717
                        debug!(
×
UNCOV
1718
                            "Ignoring {message_type} message when syncing, from {peer_address}",
×
1719
                        );
UNCOV
1720
                        continue;
×
1721
                    }
144✔
1722
                    if peer_message.ignore_when_not_sync() && !syncing {
144✔
1723
                        debug!(
×
UNCOV
1724
                            "Ignoring {message_type} message when not syncing, from {peer_address}",
×
1725
                        );
UNCOV
1726
                        continue;
×
1727
                    }
144✔
1728

144✔
1729
                    match self
144✔
1730
                        .handle_peer_message(peer_message, &mut peer, peer_state_info)
144✔
1731
                        .await
144✔
1732
                    {
1733
                        Ok(false) => {}
102✔
1734
                        Ok(true) => {
1735
                            info!("Closing connection to {peer_address}");
38✔
1736
                            break;
38✔
1737
                        }
1738
                        Err(err) => {
1✔
1739
                            warn!("Closing connection to {peer_address} because of error {err}.");
1✔
1740
                            bail!("{err}");
1✔
1741
                        }
1742
                    };
1743
                }
1744

1745
                // Handle messages from main task
1746
                main_msg_res = from_main_rx.recv() => {
145✔
UNCOV
1747
                    let main_msg = main_msg_res
×
UNCOV
1748
                        .unwrap_or_else(|e| panic!("Failed to read from main loop: {e}"));
×
UNCOV
1749
                    let close_connection = self
×
UNCOV
1750
                        .handle_main_task_message(main_msg, &mut peer, peer_state_info)
×
UNCOV
1751
                        .await
×
1752
                        .unwrap_or_else(|err| {
×
1753
                            warn!("handle_main_task_message returned an error: {err}");
×
1754
                            true
×
1755
                        });
×
1756

×
1757
                    if close_connection {
×
1758
                        info!(
×
1759
                            "handle_main_task_message is closing the connection to {}",
×
1760
                            self.peer_address
1761
                        );
1762
                        break;
×
1763
                    }
×
1764
                }
1765
            }
1766
        }
1767

1768
        Ok(())
38✔
1769
    }
39✔
1770

1771
    /// Function called before entering the peer loop. Reads the potentially stored
1772
    /// peer standing from the database and does other book-keeping before entering
1773
    /// its final resting place: the `peer_loop`. Note that the peer has already been
1774
    /// accepted for a connection for this loop to be entered. So we don't need
1775
    /// to check the standing again.
1776
    ///
1777
    /// Locking:
1778
    ///   * acquires `global_state_lock` for write
1779
    pub(crate) async fn run_wrapper<S>(
36✔
1780
        &mut self,
36✔
1781
        mut peer: S,
36✔
1782
        from_main_rx: broadcast::Receiver<MainToPeerTask>,
36✔
1783
    ) -> Result<()>
36✔
1784
    where
36✔
1785
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
36✔
1786
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
36✔
1787
        <S as TryStream>::Error: std::error::Error,
36✔
1788
    {
36✔
1789
        const TIME_DIFFERENCE_WARN_THRESHOLD_IN_SECONDS: i128 = 120;
1790

1791
        let cli_args = self.global_state_lock.cli().clone();
36✔
1792
        let global_state = self.global_state_lock.lock_guard().await;
36✔
1793

1794
        let standing = global_state
36✔
1795
            .net
36✔
1796
            .peer_databases
36✔
1797
            .peer_standings
36✔
1798
            .get(self.peer_address.ip())
36✔
1799
            .await
36✔
1800
            .unwrap_or_else(|| PeerStanding::new(cli_args.peer_tolerance));
35✔
1801

35✔
1802
        // Add peer to peer map
35✔
1803
        let peer_connection_info = PeerConnectionInfo::new(
35✔
1804
            self.peer_handshake_data.listen_port,
35✔
1805
            self.peer_address,
35✔
1806
            self.inbound_connection,
35✔
1807
        );
35✔
1808
        let new_peer = PeerInfo::new(
35✔
1809
            peer_connection_info,
35✔
1810
            &self.peer_handshake_data,
35✔
1811
            SystemTime::now(),
35✔
1812
            cli_args.peer_tolerance,
35✔
1813
        )
35✔
1814
        .with_standing(standing);
35✔
1815

35✔
1816
        // If timestamps are different, we currently just log a warning.
35✔
1817
        let peer_clock_ahead_in_seconds = new_peer.time_difference_in_seconds();
35✔
1818
        let own_clock_ahead_in_seconds = -peer_clock_ahead_in_seconds;
35✔
1819
        if peer_clock_ahead_in_seconds > TIME_DIFFERENCE_WARN_THRESHOLD_IN_SECONDS
35✔
1820
            || own_clock_ahead_in_seconds > TIME_DIFFERENCE_WARN_THRESHOLD_IN_SECONDS
35✔
1821
        {
UNCOV
1822
            let own_datetime_utc: DateTime<Utc> =
×
UNCOV
1823
                new_peer.own_timestamp_connection_established.into();
×
UNCOV
1824
            let peer_datetime_utc: DateTime<Utc> =
×
UNCOV
1825
                new_peer.peer_timestamp_connection_established.into();
×
UNCOV
1826
            warn!(
×
1827
                "New peer {} disagrees with us about time. Peer reports time {} but our clock at handshake was {}.",
×
1828
                new_peer.connected_address(),
×
1829
                peer_datetime_utc.format("%Y-%m-%d %H:%M:%S"),
×
1830
                own_datetime_utc.format("%Y-%m-%d %H:%M:%S"));
×
1831
        }
35✔
1832

1833
        // There is potential for a race-condition in the peer_map here, as we've previously
1834
        // counted the number of entries and checked if instance ID was already connected. But
1835
        // this check could have been invalidated by other tasks so we perform it again
1836

1837
        if global_state
35✔
1838
            .net
35✔
1839
            .peer_map
35✔
1840
            .values()
35✔
1841
            .any(|pi| pi.instance_id() == self.peer_handshake_data.instance_id)
35✔
1842
        {
UNCOV
1843
            bail!("Attempted to connect to already connected peer. Aborting connection.");
×
1844
        }
35✔
1845

35✔
1846
        if global_state.net.peer_map.len() >= cli_args.max_num_peers {
35✔
UNCOV
1847
            bail!("Attempted to connect to more peers than allowed. Aborting connection.");
×
1848
        }
35✔
1849

35✔
1850
        if global_state.net.peer_map.contains_key(&self.peer_address) {
35✔
1851
            // This shouldn't be possible, unless the peer reports a different instance ID than
1852
            // for the other connection. Only a malignant client would do that.
UNCOV
1853
            bail!("Already connected to peer. Aborting connection");
×
1854
        }
35✔
1855
        drop(global_state);
35✔
1856

35✔
1857
        self.global_state_lock
35✔
1858
            .lock_mut(|s| s.net.peer_map.insert(self.peer_address, new_peer))
35✔
1859
            .await;
35✔
1860

1861
        // `MutablePeerState` contains the part of the peer-loop's state that is mutable
1862
        let mut peer_state = MutablePeerState::new(self.peer_handshake_data.tip_header.height);
35✔
1863

35✔
1864
        // If peer indicates more canonical block, request a block notification to catch up ASAP
35✔
1865
        if self.peer_handshake_data.tip_header.cumulative_proof_of_work
35✔
1866
            > self
35✔
1867
                .global_state_lock
35✔
1868
                .lock_guard()
35✔
1869
                .await
35✔
1870
                .chain
1871
                .light_state()
35✔
1872
                .kernel
1873
                .header
1874
                .cumulative_proof_of_work
1875
        {
1876
            // Send block notification request to catch up ASAP, in case we're
1877
            // behind the newly-connected peer.
UNCOV
1878
            peer.send(PeerMessage::BlockNotificationRequest).await?;
×
1879
        }
35✔
1880

1881
        let res = self.run(peer, from_main_rx, &mut peer_state).await;
35✔
1882
        debug!("Exited peer loop for {}", self.peer_address);
31✔
1883

1884
        close_peer_connected_callback(
31✔
1885
            self.global_state_lock.clone(),
31✔
1886
            self.peer_address,
31✔
1887
            &self.to_main_tx,
31✔
1888
        )
31✔
1889
        .await;
31✔
1890

1891
        debug!("Ending peer loop for {}", self.peer_address);
31✔
1892

1893
        // Return any error that `run` returned. Returning and not suppressing errors is a quite nice
1894
        // feature to have for testing purposes.
1895
        res
31✔
1896
    }
31✔
1897

1898
    /// Register graceful peer disconnection in the global state.
1899
    ///
1900
    /// See also [`NetworkingState::register_peer_disconnection`][1].
1901
    ///
1902
    /// # Locking:
1903
    ///   * acquires `global_state_lock` for write
1904
    ///
1905
    /// [1]: crate::models::state::networking_state::NetworkingState::register_peer_disconnection
UNCOV
1906
    async fn register_peer_disconnection(&mut self) {
×
UNCOV
1907
        let peer_id = self.peer_handshake_data.instance_id;
×
UNCOV
1908
        self.global_state_lock
×
UNCOV
1909
            .lock_guard_mut()
×
UNCOV
1910
            .await
×
1911
            .net
1912
            .register_peer_disconnection(peer_id, SystemTime::now());
×
1913
    }
×
1914
}
1915

1916
#[cfg(test)]
1917
mod peer_loop_tests {
1918
    use rand::rngs::StdRng;
1919
    use rand::Rng;
1920
    use rand::SeedableRng;
1921
    use tokio::sync::mpsc::error::TryRecvError;
1922
    use tracing_test::traced_test;
1923

1924
    use super::*;
1925
    use crate::config_models::cli_args;
1926
    use crate::config_models::network::Network;
1927
    use crate::job_queue::triton_vm::TritonVmJobQueue;
1928
    use crate::models::blockchain::block::block_header::TARGET_BLOCK_INTERVAL;
1929
    use crate::models::blockchain::type_scripts::native_currency_amount::NativeCurrencyAmount;
1930
    use crate::models::peer::peer_block_notifications::PeerBlockNotification;
1931
    use crate::models::peer::transaction_notification::TransactionNotification;
1932
    use crate::models::state::mempool::TransactionOrigin;
1933
    use crate::models::state::tx_proving_capability::TxProvingCapability;
1934
    use crate::models::state::wallet::utxo_notification::UtxoNotificationMedium;
1935
    use crate::models::state::wallet::wallet_entropy::WalletEntropy;
1936
    use crate::tests::shared::fake_valid_block_for_tests;
1937
    use crate::tests::shared::fake_valid_sequence_of_blocks_for_tests;
1938
    use crate::tests::shared::get_dummy_handshake_data_for_genesis;
1939
    use crate::tests::shared::get_dummy_peer_connection_data_genesis;
1940
    use crate::tests::shared::get_dummy_socket_address;
1941
    use crate::tests::shared::get_test_genesis_setup;
1942
    use crate::tests::shared::Action;
1943
    use crate::tests::shared::Mock;
1944

UNCOV
1945
    #[traced_test]
×
1946
    #[tokio::test]
1947
    async fn test_peer_loop_bye() -> Result<()> {
1✔
1948
        let mock = Mock::new(vec![Action::Read(PeerMessage::Bye)]);
1✔
1949

1✔
1950
        let (peer_broadcast_tx, _from_main_rx_clone, to_main_tx, _to_main_rx1, state_lock, hsd) =
1✔
1951
            get_test_genesis_setup(Network::Alpha, 2, cli_args::Args::default()).await?;
1✔
1952

1✔
1953
        let peer_address = get_dummy_socket_address(2);
1✔
1954
        let from_main_rx_clone = peer_broadcast_tx.subscribe();
1✔
1955
        let mut peer_loop_handler =
1✔
1956
            PeerLoopHandler::new(to_main_tx, state_lock.clone(), peer_address, hsd, true, 1);
1✔
1957
        peer_loop_handler
1✔
1958
            .run_wrapper(mock, from_main_rx_clone)
1✔
1959
            .await?;
1✔
1960

1✔
1961
        assert_eq!(
1✔
1962
            2,
1✔
1963
            state_lock.lock_guard().await.net.peer_map.len(),
1✔
1964
            "peer map length must be back to 2 after goodbye"
1✔
1965
        );
1✔
1966

1✔
1967
        Ok(())
1✔
1968
    }
1✔
1969

UNCOV
1970
    #[traced_test]
×
1971
    #[tokio::test]
1972
    async fn test_peer_loop_peer_list() {
1✔
1973
        let (peer_broadcast_tx, _from_main_rx_clone, to_main_tx, _to_main_rx1, state_lock, _hsd) =
1✔
1974
            get_test_genesis_setup(Network::Alpha, 2, cli_args::Args::default())
1✔
1975
                .await
1✔
1976
                .unwrap();
1✔
1977

1✔
1978
        let mut peer_infos = state_lock
1✔
1979
            .lock_guard()
1✔
1980
            .await
1✔
1981
            .net
1✔
1982
            .peer_map
1✔
1983
            .clone()
1✔
1984
            .into_values()
1✔
1985
            .collect::<Vec<_>>();
1✔
1986
        peer_infos.sort_by_cached_key(|x| x.connected_address());
2✔
1987
        let (peer_address0, instance_id0) = (
1✔
1988
            peer_infos[0].connected_address(),
1✔
1989
            peer_infos[0].instance_id(),
1✔
1990
        );
1✔
1991
        let (peer_address1, instance_id1) = (
1✔
1992
            peer_infos[1].connected_address(),
1✔
1993
            peer_infos[1].instance_id(),
1✔
1994
        );
1✔
1995

1✔
1996
        let (hsd2, sa2) = get_dummy_peer_connection_data_genesis(Network::Alpha, 2);
1✔
1997
        let expected_response = vec![
1✔
1998
            (peer_address0, instance_id0),
1✔
1999
            (peer_address1, instance_id1),
1✔
2000
            (sa2, hsd2.instance_id),
1✔
2001
        ];
1✔
2002
        let mock = Mock::new(vec![
1✔
2003
            Action::Read(PeerMessage::PeerListRequest),
1✔
2004
            Action::Write(PeerMessage::PeerListResponse(expected_response)),
1✔
2005
            Action::Read(PeerMessage::Bye),
1✔
2006
        ]);
1✔
2007

1✔
2008
        let from_main_rx_clone = peer_broadcast_tx.subscribe();
1✔
2009

1✔
2010
        let mut peer_loop_handler =
1✔
2011
            PeerLoopHandler::new(to_main_tx, state_lock.clone(), sa2, hsd2, true, 0);
1✔
2012
        peer_loop_handler
1✔
2013
            .run_wrapper(mock, from_main_rx_clone)
1✔
2014
            .await
1✔
2015
            .unwrap();
1✔
2016

1✔
2017
        assert_eq!(
1✔
2018
            2,
1✔
2019
            state_lock.lock_guard().await.net.peer_map.len(),
1✔
2020
            "peer map must have length 2 after saying goodbye to peer 2"
1✔
2021
        );
1✔
2022
    }
1✔
2023

UNCOV
2024
    #[traced_test]
×
2025
    #[tokio::test]
2026
    async fn different_genesis_test() -> Result<()> {
1✔
2027
        // In this scenario a peer provides another genesis block than what has been
1✔
2028
        // hardcoded. This should lead to the closing of the connection to this peer
1✔
2029
        // and a ban.
1✔
2030

1✔
2031
        let network = Network::Main;
1✔
2032
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, mut to_main_rx1, state_lock, hsd) =
1✔
2033
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
2034
        assert_eq!(1000, state_lock.cli().peer_tolerance);
1✔
2035
        let peer_address = get_dummy_socket_address(0);
1✔
2036

1✔
2037
        // Although the database is empty, `get_latest_block` still returns the genesis block,
1✔
2038
        // since that block is hardcoded.
1✔
2039
        let mut different_genesis_block = state_lock
1✔
2040
            .lock_guard()
1✔
2041
            .await
1✔
2042
            .chain
1✔
2043
            .archival_state()
1✔
2044
            .get_tip()
1✔
2045
            .await;
1✔
2046

1✔
2047
        different_genesis_block.set_header_nonce(StdRng::seed_from_u64(5550001).random());
1✔
2048
        let [block_1_with_different_genesis] = fake_valid_sequence_of_blocks_for_tests(
1✔
2049
            &different_genesis_block,
1✔
2050
            Timestamp::hours(1),
1✔
2051
            StdRng::seed_from_u64(5550001).random(),
1✔
2052
        )
1✔
2053
        .await;
1✔
2054
        let mock = Mock::new(vec![Action::Read(PeerMessage::Block(Box::new(
1✔
2055
            block_1_with_different_genesis.try_into().unwrap(),
1✔
2056
        )))]);
1✔
2057

1✔
2058
        let mut peer_loop_handler = PeerLoopHandler::new(
1✔
2059
            to_main_tx.clone(),
1✔
2060
            state_lock.clone(),
1✔
2061
            peer_address,
1✔
2062
            hsd,
1✔
2063
            true,
1✔
2064
            1,
1✔
2065
        );
1✔
2066
        let res = peer_loop_handler
1✔
2067
            .run_wrapper(mock, from_main_rx_clone)
1✔
2068
            .await;
1✔
2069
        assert!(
1✔
2070
            res.is_err(),
1✔
2071
            "run_wrapper must return failure when genesis is different"
1✔
2072
        );
1✔
2073

1✔
2074
        match to_main_rx1.recv().await {
1✔
2075
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
1✔
2076
            _ => bail!("Must receive remove of peer block max height"),
1✔
2077
        }
1✔
2078

1✔
2079
        // Verify that no further message was sent to main loop
1✔
2080
        match to_main_rx1.try_recv() {
1✔
2081
            Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (),
1✔
2082
            _ => bail!("Block notification must not be sent for block with invalid PoW"),
1✔
2083
        };
1✔
2084

1✔
2085
        drop(to_main_tx);
1✔
2086

1✔
2087
        let peer_standing = state_lock
1✔
2088
            .lock_guard()
1✔
2089
            .await
1✔
2090
            .net
1✔
2091
            .get_peer_standing_from_database(peer_address.ip())
1✔
2092
            .await;
1✔
2093
        assert_eq!(
1✔
2094
            -i32::from(state_lock.cli().peer_tolerance),
1✔
2095
            peer_standing.unwrap().standing
1✔
2096
        );
1✔
2097
        assert_eq!(
1✔
2098
            NegativePeerSanction::DifferentGenesis,
1✔
2099
            peer_standing.unwrap().latest_punishment.unwrap().0
1✔
2100
        );
1✔
2101

1✔
2102
        Ok(())
1✔
2103
    }
1✔
2104

UNCOV
2105
    #[traced_test]
×
2106
    #[tokio::test]
2107
    async fn node_does_not_record_disconnection_time_when_peer_initiates_disconnect() -> Result<()>
1✔
2108
    {
1✔
2109
        let args = cli_args::Args::default();
1✔
2110
        let network = args.network;
1✔
2111
        let (from_main_tx, from_main_rx, to_main_tx, to_main_rx, state_lock, _) =
1✔
2112
            get_test_genesis_setup(network, 0, args).await?;
1✔
2113

1✔
2114
        let peer_address = get_dummy_socket_address(0);
1✔
2115
        let peer_handshake_data = get_dummy_handshake_data_for_genesis(network);
1✔
2116
        let peer_id = peer_handshake_data.instance_id;
1✔
2117
        let mut peer_loop_handler = PeerLoopHandler::new(
1✔
2118
            to_main_tx,
1✔
2119
            state_lock.clone(),
1✔
2120
            peer_address,
1✔
2121
            peer_handshake_data,
1✔
2122
            true,
1✔
2123
            1,
1✔
2124
        );
1✔
2125
        let mock = Mock::new(vec![Action::Read(PeerMessage::Bye)]);
1✔
2126
        peer_loop_handler.run_wrapper(mock, from_main_rx).await?;
1✔
2127

1✔
2128
        let global_state = state_lock.lock_guard().await;
1✔
2129
        assert!(global_state
1✔
2130
            .net
1✔
2131
            .last_disconnection_time_of_peer(peer_id)
1✔
2132
            .is_none());
1✔
2133

1✔
2134
        drop(to_main_rx);
1✔
2135
        drop(from_main_tx);
1✔
2136

1✔
2137
        Ok(())
1✔
2138
    }
1✔
2139

UNCOV
2140
    #[traced_test]
×
2141
    #[tokio::test]
2142
    async fn block_without_valid_pow_test() -> Result<()> {
1✔
2143
        // In this scenario, a block without a valid PoW is received. This block should be rejected
1✔
2144
        // by the peer loop and a notification should never reach the main loop.
1✔
2145

1✔
2146
        let network = Network::Main;
1✔
2147
        let (peer_broadcast_tx, _from_main_rx_clone, to_main_tx, mut to_main_rx1, state_lock, hsd) =
1✔
2148
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
2149
        let peer_address = get_dummy_socket_address(0);
1✔
2150
        let genesis_block: Block = state_lock
1✔
2151
            .lock_guard()
1✔
2152
            .await
1✔
2153
            .chain
1✔
2154
            .archival_state()
1✔
2155
            .get_tip()
1✔
2156
            .await;
1✔
2157

1✔
2158
        // Make a with hash above what the implied threshold from
1✔
2159
        let [mut block_without_valid_pow] = fake_valid_sequence_of_blocks_for_tests(
1✔
2160
            &genesis_block,
1✔
2161
            Timestamp::hours(1),
1✔
2162
            StdRng::seed_from_u64(5550001).random(),
1✔
2163
        )
1✔
2164
        .await;
1✔
2165

1✔
2166
        // This *probably* is invalid PoW -- and needs to be for this test to
1✔
2167
        // work.
1✔
2168
        block_without_valid_pow.set_header_nonce(Digest::default());
1✔
2169

1✔
2170
        // Sending an invalid block will not necessarily result in a ban. This depends on the peer
1✔
2171
        // tolerance that is set in the client. For this reason, we include a "Bye" here.
1✔
2172
        let mock = Mock::new(vec![
1✔
2173
            Action::Read(PeerMessage::Block(Box::new(
1✔
2174
                block_without_valid_pow.clone().try_into().unwrap(),
1✔
2175
            ))),
1✔
2176
            Action::Read(PeerMessage::Bye),
1✔
2177
        ]);
1✔
2178

1✔
2179
        let from_main_rx_clone = peer_broadcast_tx.subscribe();
1✔
2180

1✔
2181
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
2182
            to_main_tx.clone(),
1✔
2183
            state_lock.clone(),
1✔
2184
            peer_address,
1✔
2185
            hsd,
1✔
2186
            true,
1✔
2187
            1,
1✔
2188
            block_without_valid_pow.header().timestamp,
1✔
2189
        );
1✔
2190
        peer_loop_handler
1✔
2191
            .run_wrapper(mock, from_main_rx_clone)
1✔
2192
            .await
1✔
2193
            .expect("sending (one) invalid block should not result in closed connection");
1✔
2194

1✔
2195
        match to_main_rx1.recv().await {
1✔
2196
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
1✔
2197
            _ => bail!("Must receive remove of peer block max height"),
1✔
2198
        }
1✔
2199

1✔
2200
        // Verify that no further message was sent to main loop
1✔
2201
        match to_main_rx1.try_recv() {
1✔
2202
            Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (),
1✔
2203
            _ => bail!("Block notification must not be sent for block with invalid PoW"),
1✔
2204
        };
1✔
2205

1✔
2206
        // We need to have the transmitter in scope until we have received from it
1✔
2207
        // otherwise the receiver will report the disconnected error when we attempt
1✔
2208
        // to read from it. And the purpose is to verify that the channel is empty,
1✔
2209
        // not that it has been closed.
1✔
2210
        drop(to_main_tx);
1✔
2211

1✔
2212
        // Verify that peer standing was stored in database
1✔
2213
        let standing = state_lock
1✔
2214
            .lock_guard()
1✔
2215
            .await
1✔
2216
            .net
1✔
2217
            .peer_databases
1✔
2218
            .peer_standings
1✔
2219
            .get(peer_address.ip())
1✔
2220
            .await
1✔
2221
            .unwrap();
1✔
2222
        assert!(
1✔
2223
            standing.standing < 0,
1✔
2224
            "Peer must be sanctioned for sending a bad block"
1✔
2225
        );
1✔
2226

1✔
2227
        Ok(())
1✔
2228
    }
1✔
2229

UNCOV
2230
    #[traced_test]
×
2231
    #[tokio::test]
2232
    async fn test_peer_loop_block_with_block_in_db() -> Result<()> {
1✔
2233
        // The scenario tested here is that a client receives a block that is already
1✔
2234
        // known and stored. The expected behavior is to ignore the block and not send
1✔
2235
        // a message to the main task.
1✔
2236

1✔
2237
        let network = Network::Main;
1✔
2238
        let (peer_broadcast_tx, _from_main_rx_clone, to_main_tx, mut to_main_rx1, mut alice, hsd) =
1✔
2239
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
2240
        let peer_address = get_dummy_socket_address(0);
1✔
2241
        let genesis_block: Block = Block::genesis(network);
1✔
2242

1✔
2243
        let now = genesis_block.header().timestamp + Timestamp::hours(1);
1✔
2244
        let block_1 =
1✔
2245
            fake_valid_block_for_tests(&alice, StdRng::seed_from_u64(5550001).random()).await;
1✔
2246
        assert!(
1✔
2247
            block_1.is_valid(&genesis_block, now).await,
1✔
2248
            "Block must be valid for this test to make sense"
1✔
2249
        );
1✔
2250
        alice.set_new_tip(block_1.clone()).await?;
1✔
2251

1✔
2252
        let mock_peer_messages = Mock::new(vec![
1✔
2253
            Action::Read(PeerMessage::Block(Box::new(
1✔
2254
                block_1.clone().try_into().unwrap(),
1✔
2255
            ))),
1✔
2256
            Action::Read(PeerMessage::Bye),
1✔
2257
        ]);
1✔
2258

1✔
2259
        let from_main_rx_clone = peer_broadcast_tx.subscribe();
1✔
2260

1✔
2261
        let mut alice_peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
2262
            to_main_tx.clone(),
1✔
2263
            alice.clone(),
1✔
2264
            peer_address,
1✔
2265
            hsd,
1✔
2266
            false,
1✔
2267
            1,
1✔
2268
            block_1.header().timestamp,
1✔
2269
        );
1✔
2270
        alice_peer_loop_handler
1✔
2271
            .run_wrapper(mock_peer_messages, from_main_rx_clone)
1✔
2272
            .await?;
1✔
2273

1✔
2274
        match to_main_rx1.recv().await {
1✔
2275
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
1✔
2276
            other => bail!("Must receive remove of peer block max height. Got:\n {other:?}"),
1✔
2277
        }
1✔
2278
        match to_main_rx1.try_recv() {
1✔
2279
            Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (),
1✔
2280
            _ => bail!("Block notification must not be sent for block with invalid PoW"),
1✔
2281
        };
1✔
2282
        drop(to_main_tx);
1✔
2283

1✔
2284
        if !alice.lock_guard().await.net.peer_map.is_empty() {
1✔
2285
            bail!("peer map must be empty after closing connection gracefully");
1✔
2286
        }
1✔
2287

1✔
2288
        Ok(())
1✔
2289
    }
1✔
2290

UNCOV
2291
    #[traced_test]
×
2292
    #[tokio::test]
2293
    async fn block_request_batch_simple() {
1✔
2294
        // Scenario: Six blocks (including genesis) are known. Peer requests
1✔
2295
        // from all possible starting points, and client responds with the
1✔
2296
        // correct list of blocks.
1✔
2297
        let network = Network::Main;
1✔
2298
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, mut state_lock, hsd) =
1✔
2299
            get_test_genesis_setup(network, 0, cli_args::Args::default())
1✔
2300
                .await
1✔
2301
                .unwrap();
1✔
2302
        let genesis_block: Block = Block::genesis(network);
1✔
2303
        let peer_address = get_dummy_socket_address(0);
1✔
2304
        let [block_1, block_2, block_3, block_4, block_5] =
1✔
2305
            fake_valid_sequence_of_blocks_for_tests(
1✔
2306
                &genesis_block,
1✔
2307
                Timestamp::hours(1),
1✔
2308
                StdRng::seed_from_u64(5550001).random(),
1✔
2309
            )
1✔
2310
            .await;
1✔
2311
        let blocks = vec![
1✔
2312
            genesis_block,
1✔
2313
            block_1,
1✔
2314
            block_2,
1✔
2315
            block_3,
1✔
2316
            block_4,
1✔
2317
            block_5.clone(),
1✔
2318
        ];
1✔
2319
        for block in blocks.iter().skip(1) {
5✔
2320
            state_lock.set_new_tip(block.to_owned()).await.unwrap();
5✔
2321
        }
1✔
2322

1✔
2323
        let mmra = state_lock
1✔
2324
            .lock_guard()
1✔
2325
            .await
1✔
2326
            .chain
1✔
2327
            .archival_state()
1✔
2328
            .archival_block_mmr
1✔
2329
            .ammr()
1✔
2330
            .to_accumulator_async()
1✔
2331
            .await;
1✔
2332
        for i in 0..=4 {
6✔
2333
            let expected_response = {
5✔
2334
                let state = state_lock.lock_guard().await;
5✔
2335
                let blocks_for_response = blocks.iter().skip(i + 1).cloned().collect_vec();
5✔
2336
                PeerLoopHandler::batch_response(&state, blocks_for_response, &mmra)
5✔
2337
                    .await
5✔
2338
                    .unwrap()
5✔
2339
            };
5✔
2340
            let mock = Mock::new(vec![
5✔
2341
                Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch {
5✔
2342
                    known_blocks: vec![blocks[i].hash()],
5✔
2343
                    max_response_len: 14,
5✔
2344
                    anchor: mmra.clone(),
5✔
2345
                })),
5✔
2346
                Action::Write(PeerMessage::BlockResponseBatch(expected_response)),
5✔
2347
                Action::Read(PeerMessage::Bye),
5✔
2348
            ]);
5✔
2349
            let mut peer_loop_handler = PeerLoopHandler::new(
5✔
2350
                to_main_tx.clone(),
5✔
2351
                state_lock.clone(),
5✔
2352
                peer_address,
5✔
2353
                hsd.clone(),
5✔
2354
                false,
5✔
2355
                1,
5✔
2356
            );
5✔
2357

5✔
2358
            peer_loop_handler
5✔
2359
                .run_wrapper(mock, from_main_rx_clone.resubscribe())
5✔
2360
                .await
5✔
2361
                .unwrap();
5✔
2362
        }
1✔
2363
    }
1✔
2364

UNCOV
2365
    #[traced_test]
×
2366
    #[tokio::test]
2367
    async fn block_request_batch_in_order_test() -> Result<()> {
1✔
2368
        // Scenario: A fork began at block 2, node knows two blocks of height 2 and two of height 3.
1✔
2369
        // A peer requests a batch of blocks starting from block 1. Ensure that the correct blocks
1✔
2370
        // are returned.
1✔
2371

1✔
2372
        let network = Network::Main;
1✔
2373
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, mut state_lock, hsd) =
1✔
2374
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
2375
        let genesis_block: Block = Block::genesis(network);
1✔
2376
        let peer_address = get_dummy_socket_address(0);
1✔
2377
        let [block_1, block_2_a, block_3_a] = fake_valid_sequence_of_blocks_for_tests(
1✔
2378
            &genesis_block,
1✔
2379
            Timestamp::hours(1),
1✔
2380
            StdRng::seed_from_u64(5550001).random(),
1✔
2381
        )
1✔
2382
        .await;
1✔
2383
        let [block_2_b, block_3_b] = fake_valid_sequence_of_blocks_for_tests(
1✔
2384
            &block_1,
1✔
2385
            Timestamp::hours(1),
1✔
2386
            StdRng::seed_from_u64(5550002).random(),
1✔
2387
        )
1✔
2388
        .await;
1✔
2389
        assert_ne!(block_2_b.hash(), block_2_a.hash());
1✔
2390

1✔
2391
        state_lock.set_new_tip(block_1.clone()).await?;
1✔
2392
        state_lock.set_new_tip(block_2_a.clone()).await?;
1✔
2393
        state_lock.set_new_tip(block_2_b.clone()).await?;
1✔
2394
        state_lock.set_new_tip(block_3_b.clone()).await?;
1✔
2395
        state_lock.set_new_tip(block_3_a.clone()).await?;
1✔
2396

1✔
2397
        let anchor = state_lock
1✔
2398
            .lock_guard()
1✔
2399
            .await
1✔
2400
            .chain
1✔
2401
            .archival_state()
1✔
2402
            .archival_block_mmr
1✔
2403
            .ammr()
1✔
2404
            .to_accumulator_async()
1✔
2405
            .await;
1✔
2406
        let response_1 = {
1✔
2407
            let state_lock = state_lock.lock_guard().await;
1✔
2408
            PeerLoopHandler::batch_response(
1✔
2409
                &state_lock,
1✔
2410
                vec![block_1.clone(), block_2_a.clone(), block_3_a.clone()],
1✔
2411
                &anchor,
1✔
2412
            )
1✔
2413
            .await
1✔
2414
            .unwrap()
1✔
2415
        };
1✔
2416

1✔
2417
        let mut mock = Mock::new(vec![
1✔
2418
            Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch {
1✔
2419
                known_blocks: vec![genesis_block.hash()],
1✔
2420
                max_response_len: 14,
1✔
2421
                anchor: anchor.clone(),
1✔
2422
            })),
1✔
2423
            Action::Write(PeerMessage::BlockResponseBatch(response_1)),
1✔
2424
            Action::Read(PeerMessage::Bye),
1✔
2425
        ]);
1✔
2426

1✔
2427
        let mut peer_loop_handler_1 = PeerLoopHandler::with_mocked_time(
1✔
2428
            to_main_tx.clone(),
1✔
2429
            state_lock.clone(),
1✔
2430
            peer_address,
1✔
2431
            hsd.clone(),
1✔
2432
            false,
1✔
2433
            1,
1✔
2434
            block_3_a.header().timestamp,
1✔
2435
        );
1✔
2436

1✔
2437
        peer_loop_handler_1
1✔
2438
            .run_wrapper(mock, from_main_rx_clone.resubscribe())
1✔
2439
            .await?;
1✔
2440

1✔
2441
        // Peer knows block 2_b, verify that canonical chain with 2_a is returned
1✔
2442
        let response_2 = {
1✔
2443
            let state_lock = state_lock.lock_guard().await;
1✔
2444
            PeerLoopHandler::batch_response(
1✔
2445
                &state_lock,
1✔
2446
                vec![block_2_a, block_3_a.clone()],
1✔
2447
                &anchor,
1✔
2448
            )
1✔
2449
            .await
1✔
2450
            .unwrap()
1✔
2451
        };
1✔
2452
        mock = Mock::new(vec![
1✔
2453
            Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch {
1✔
2454
                known_blocks: vec![block_2_b.hash(), block_1.hash(), genesis_block.hash()],
1✔
2455
                max_response_len: 14,
1✔
2456
                anchor,
1✔
2457
            })),
1✔
2458
            Action::Write(PeerMessage::BlockResponseBatch(response_2)),
1✔
2459
            Action::Read(PeerMessage::Bye),
1✔
2460
        ]);
1✔
2461

1✔
2462
        let mut peer_loop_handler_2 = PeerLoopHandler::with_mocked_time(
1✔
2463
            to_main_tx.clone(),
1✔
2464
            state_lock.clone(),
1✔
2465
            peer_address,
1✔
2466
            hsd,
1✔
2467
            false,
1✔
2468
            1,
1✔
2469
            block_3_a.header().timestamp,
1✔
2470
        );
1✔
2471

1✔
2472
        peer_loop_handler_2
1✔
2473
            .run_wrapper(mock, from_main_rx_clone)
1✔
2474
            .await?;
1✔
2475

1✔
2476
        Ok(())
1✔
2477
    }
1✔
2478

UNCOV
2479
    #[traced_test]
×
2480
    #[tokio::test]
2481
    async fn block_request_batch_out_of_order_test() -> Result<()> {
1✔
2482
        // Scenario: A fork began at block 2, node knows two blocks of height 2 and two of height 3.
1✔
2483
        // A peer requests a batch of blocks starting from block 1, but the peer supplies their
1✔
2484
        // hashes in a wrong order. Ensure that the correct blocks are returned, in the right order.
1✔
2485
        // The blocks will be supplied in the correct order but starting from the first digest in
1✔
2486
        // the list that is known and canonical.
1✔
2487

1✔
2488
        let network = Network::Main;
1✔
2489
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, mut state_lock, hsd) =
1✔
2490
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
2491
        let genesis_block = Block::genesis(network);
1✔
2492
        let peer_address = get_dummy_socket_address(0);
1✔
2493
        let [block_1, block_2_a, block_3_a] = fake_valid_sequence_of_blocks_for_tests(
1✔
2494
            &genesis_block,
1✔
2495
            Timestamp::hours(1),
1✔
2496
            StdRng::seed_from_u64(5550001).random(),
1✔
2497
        )
1✔
2498
        .await;
1✔
2499
        let [block_2_b, block_3_b] = fake_valid_sequence_of_blocks_for_tests(
1✔
2500
            &block_1,
1✔
2501
            Timestamp::hours(1),
1✔
2502
            StdRng::seed_from_u64(5550002).random(),
1✔
2503
        )
1✔
2504
        .await;
1✔
2505
        assert_ne!(block_2_a.hash(), block_2_b.hash());
1✔
2506

1✔
2507
        state_lock.set_new_tip(block_1.clone()).await?;
1✔
2508
        state_lock.set_new_tip(block_2_a.clone()).await?;
1✔
2509
        state_lock.set_new_tip(block_2_b.clone()).await?;
1✔
2510
        state_lock.set_new_tip(block_3_b.clone()).await?;
1✔
2511
        state_lock.set_new_tip(block_3_a.clone()).await?;
1✔
2512

1✔
2513
        // Peer knows block 2_b, verify that canonical chain with 2_a is returned
1✔
2514
        let mut expected_anchor = block_3_a.body().block_mmr_accumulator.clone();
1✔
2515
        expected_anchor.append(block_3_a.hash());
1✔
2516
        let state_anchor = state_lock
1✔
2517
            .lock_guard()
1✔
2518
            .await
1✔
2519
            .chain
1✔
2520
            .archival_state()
1✔
2521
            .archival_block_mmr
1✔
2522
            .ammr()
1✔
2523
            .to_accumulator_async()
1✔
2524
            .await;
1✔
2525
        assert_eq!(
1✔
2526
            expected_anchor, state_anchor,
1✔
2527
            "Catching assumption about MMRA in tip and in archival state"
1✔
2528
        );
1✔
2529

1✔
2530
        let response = {
1✔
2531
            let state_lock = state_lock.lock_guard().await;
1✔
2532
            PeerLoopHandler::batch_response(
1✔
2533
                &state_lock,
1✔
2534
                vec![block_1.clone(), block_2_a, block_3_a.clone()],
1✔
2535
                &expected_anchor,
1✔
2536
            )
1✔
2537
            .await
1✔
2538
            .unwrap()
1✔
2539
        };
1✔
2540
        let mock = Mock::new(vec![
1✔
2541
            Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch {
1✔
2542
                known_blocks: vec![block_2_b.hash(), genesis_block.hash(), block_1.hash()],
1✔
2543
                max_response_len: 14,
1✔
2544
                anchor: expected_anchor,
1✔
2545
            })),
1✔
2546
            // Since genesis block is the 1st known in the list of known blocks,
1✔
2547
            // it's immediate descendent, block_1, is the first one returned.
1✔
2548
            Action::Write(PeerMessage::BlockResponseBatch(response)),
1✔
2549
            Action::Read(PeerMessage::Bye),
1✔
2550
        ]);
1✔
2551

1✔
2552
        let mut peer_loop_handler_2 = PeerLoopHandler::with_mocked_time(
1✔
2553
            to_main_tx.clone(),
1✔
2554
            state_lock.clone(),
1✔
2555
            peer_address,
1✔
2556
            hsd,
1✔
2557
            false,
1✔
2558
            1,
1✔
2559
            block_3_a.header().timestamp,
1✔
2560
        );
1✔
2561

1✔
2562
        peer_loop_handler_2
1✔
2563
            .run_wrapper(mock, from_main_rx_clone)
1✔
2564
            .await?;
1✔
2565

1✔
2566
        Ok(())
1✔
2567
    }
1✔
2568

UNCOV
2569
    #[traced_test]
×
2570
    #[tokio::test]
2571
    async fn request_unknown_height_doesnt_crash() {
1✔
2572
        // Scenario: Only genesis block is known. Peer requests block of height
1✔
2573
        // 2.
1✔
2574
        let network = Network::Main;
1✔
2575
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, state_lock, hsd) =
1✔
2576
            get_test_genesis_setup(network, 0, cli_args::Args::default())
1✔
2577
                .await
1✔
2578
                .unwrap();
1✔
2579
        let peer_address = get_dummy_socket_address(0);
1✔
2580
        let mock = Mock::new(vec![
1✔
2581
            Action::Read(PeerMessage::BlockRequestByHeight(2.into())),
1✔
2582
            Action::Read(PeerMessage::Bye),
1✔
2583
        ]);
1✔
2584

1✔
2585
        let mut peer_loop_handler = PeerLoopHandler::new(
1✔
2586
            to_main_tx.clone(),
1✔
2587
            state_lock.clone(),
1✔
2588
            peer_address,
1✔
2589
            hsd,
1✔
2590
            false,
1✔
2591
            1,
1✔
2592
        );
1✔
2593

1✔
2594
        // This will return error if seen read/write order does not match that of the
1✔
2595
        // mocked object.
1✔
2596
        peer_loop_handler
1✔
2597
            .run_wrapper(mock, from_main_rx_clone)
1✔
2598
            .await
1✔
2599
            .unwrap();
1✔
2600

1✔
2601
        // Verify that peer is sanctioned for this nonsense.
1✔
2602
        assert!(state_lock
1✔
2603
            .lock_guard()
1✔
2604
            .await
1✔
2605
            .net
1✔
2606
            .get_peer_standing_from_database(peer_address.ip())
1✔
2607
            .await
1✔
2608
            .unwrap()
1✔
2609
            .standing
1✔
2610
            .is_negative());
1✔
2611
    }
1✔
2612

UNCOV
2613
    #[traced_test]
×
2614
    #[tokio::test]
2615
    async fn find_canonical_chain_when_multiple_blocks_at_same_height_test() -> Result<()> {
1✔
2616
        // Scenario: A fork began at block 2, node knows two blocks of height 2 and two of height 3.
1✔
2617
        // A peer requests a block at height 2. Verify that the correct block at height 2 is
1✔
2618
        // returned.
1✔
2619

1✔
2620
        let network = Network::Main;
1✔
2621
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, mut state_lock, hsd) =
1✔
2622
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
2623
        let genesis_block = Block::genesis(network);
1✔
2624
        let peer_address = get_dummy_socket_address(0);
1✔
2625

1✔
2626
        let [block_1, block_2_a, block_3_a] = fake_valid_sequence_of_blocks_for_tests(
1✔
2627
            &genesis_block,
1✔
2628
            Timestamp::hours(1),
1✔
2629
            StdRng::seed_from_u64(5550001).random(),
1✔
2630
        )
1✔
2631
        .await;
1✔
2632
        let [block_2_b, block_3_b] = fake_valid_sequence_of_blocks_for_tests(
1✔
2633
            &block_1,
1✔
2634
            Timestamp::hours(1),
1✔
2635
            StdRng::seed_from_u64(5550002).random(),
1✔
2636
        )
1✔
2637
        .await;
1✔
2638
        assert_ne!(block_2_a.hash(), block_2_b.hash());
1✔
2639

1✔
2640
        state_lock.set_new_tip(block_1.clone()).await?;
1✔
2641
        state_lock.set_new_tip(block_2_a.clone()).await?;
1✔
2642
        state_lock.set_new_tip(block_2_b.clone()).await?;
1✔
2643
        state_lock.set_new_tip(block_3_b.clone()).await?;
1✔
2644
        state_lock.set_new_tip(block_3_a.clone()).await?;
1✔
2645

1✔
2646
        let mock = Mock::new(vec![
1✔
2647
            Action::Read(PeerMessage::BlockRequestByHeight(2.into())),
1✔
2648
            Action::Write(PeerMessage::Block(Box::new(block_2_a.try_into().unwrap()))),
1✔
2649
            Action::Read(PeerMessage::BlockRequestByHeight(3.into())),
1✔
2650
            Action::Write(PeerMessage::Block(Box::new(
1✔
2651
                block_3_a.clone().try_into().unwrap(),
1✔
2652
            ))),
1✔
2653
            Action::Read(PeerMessage::Bye),
1✔
2654
        ]);
1✔
2655

1✔
2656
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
2657
            to_main_tx.clone(),
1✔
2658
            state_lock.clone(),
1✔
2659
            peer_address,
1✔
2660
            hsd,
1✔
2661
            false,
1✔
2662
            1,
1✔
2663
            block_3_a.header().timestamp,
1✔
2664
        );
1✔
2665

1✔
2666
        // This will return error if seen read/write order does not match that of the
1✔
2667
        // mocked object.
1✔
2668
        peer_loop_handler
1✔
2669
            .run_wrapper(mock, from_main_rx_clone)
1✔
2670
            .await?;
1✔
2671

1✔
2672
        Ok(())
1✔
2673
    }
1✔
2674

UNCOV
2675
    #[traced_test]
×
2676
    #[tokio::test]
2677
    async fn receival_of_block_notification_height_1() {
1✔
2678
        // Scenario: client only knows genesis block. Then receives block
1✔
2679
        // notification of height 1. Must request block 1.
1✔
2680
        let network = Network::Main;
1✔
2681
        let mut rng = StdRng::seed_from_u64(5552401);
1✔
2682
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, to_main_rx1, state_lock, hsd) =
1✔
2683
            get_test_genesis_setup(network, 0, cli_args::Args::default())
1✔
2684
                .await
1✔
2685
                .unwrap();
1✔
2686
        let block_1 = fake_valid_block_for_tests(&state_lock, rng.random()).await;
1✔
2687
        let notification_height1 = (&block_1).into();
1✔
2688
        let mock = Mock::new(vec![
1✔
2689
            Action::Read(PeerMessage::BlockNotification(notification_height1)),
1✔
2690
            Action::Write(PeerMessage::BlockRequestByHeight(1u64.into())),
1✔
2691
            Action::Read(PeerMessage::Bye),
1✔
2692
        ]);
1✔
2693

1✔
2694
        let peer_address = get_dummy_socket_address(0);
1✔
2695
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
2696
            to_main_tx.clone(),
1✔
2697
            state_lock.clone(),
1✔
2698
            peer_address,
1✔
2699
            hsd,
1✔
2700
            false,
1✔
2701
            1,
1✔
2702
            block_1.header().timestamp,
1✔
2703
        );
1✔
2704
        peer_loop_handler
1✔
2705
            .run_wrapper(mock, from_main_rx_clone)
1✔
2706
            .await
1✔
2707
            .unwrap();
1✔
2708

1✔
2709
        drop(to_main_rx1);
1✔
2710
    }
1✔
2711

UNCOV
2712
    #[traced_test]
×
2713
    #[tokio::test]
2714
    async fn receive_block_request_by_height_block_7() {
1✔
2715
        // Scenario: client only knows blocks up to height 7. Then receives block-
1✔
2716
        // request-by-height for height 7. Must respond with block 7.
1✔
2717
        let network = Network::Main;
1✔
2718
        let mut rng = StdRng::seed_from_u64(5552401);
1✔
2719
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, to_main_rx1, mut state_lock, hsd) =
1✔
2720
            get_test_genesis_setup(network, 0, cli_args::Args::default())
1✔
2721
                .await
1✔
2722
                .unwrap();
1✔
2723
        let genesis_block = Block::genesis(network);
1✔
2724
        let blocks: [Block; 7] = fake_valid_sequence_of_blocks_for_tests(
1✔
2725
            &genesis_block,
1✔
2726
            Timestamp::hours(1),
1✔
2727
            rng.random(),
1✔
2728
        )
1✔
2729
        .await;
1✔
2730
        let block7 = blocks.last().unwrap().to_owned();
1✔
2731
        let tip_height: u64 = block7.header().height.into();
1✔
2732
        assert_eq!(7, tip_height);
1✔
2733

1✔
2734
        for block in &blocks {
8✔
2735
            state_lock.set_new_tip(block.to_owned()).await.unwrap();
7✔
2736
        }
1✔
2737

1✔
2738
        let block7_response = PeerMessage::Block(Box::new(block7.try_into().unwrap()));
1✔
2739
        let mock = Mock::new(vec![
1✔
2740
            Action::Read(PeerMessage::BlockRequestByHeight(7u64.into())),
1✔
2741
            Action::Write(block7_response),
1✔
2742
            Action::Read(PeerMessage::Bye),
1✔
2743
        ]);
1✔
2744

1✔
2745
        let peer_address = get_dummy_socket_address(0);
1✔
2746
        let mut peer_loop_handler = PeerLoopHandler::new(
1✔
2747
            to_main_tx.clone(),
1✔
2748
            state_lock.clone(),
1✔
2749
            peer_address,
1✔
2750
            hsd,
1✔
2751
            false,
1✔
2752
            1,
1✔
2753
        );
1✔
2754
        peer_loop_handler
1✔
2755
            .run_wrapper(mock, from_main_rx_clone)
1✔
2756
            .await
1✔
2757
            .unwrap();
1✔
2758

1✔
2759
        drop(to_main_rx1);
1✔
2760
    }
1✔
2761

UNCOV
2762
    #[traced_test]
×
2763
    #[tokio::test]
2764
    async fn test_peer_loop_receival_of_first_block() -> Result<()> {
1✔
2765
        // Scenario: client only knows genesis block. Then receives block 1.
1✔
2766

1✔
2767
        let network = Network::Main;
1✔
2768
        let mut rng = StdRng::seed_from_u64(5550001);
1✔
2769
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, mut to_main_rx1, state_lock, hsd) =
1✔
2770
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
2771
        let peer_address = get_dummy_socket_address(0);
1✔
2772

1✔
2773
        let block_1 = fake_valid_block_for_tests(&state_lock, rng.random()).await;
1✔
2774
        let mock = Mock::new(vec![
1✔
2775
            Action::Read(PeerMessage::Block(Box::new(
1✔
2776
                block_1.clone().try_into().unwrap(),
1✔
2777
            ))),
1✔
2778
            Action::Read(PeerMessage::Bye),
1✔
2779
        ]);
1✔
2780

1✔
2781
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
2782
            to_main_tx.clone(),
1✔
2783
            state_lock.clone(),
1✔
2784
            peer_address,
1✔
2785
            hsd,
1✔
2786
            false,
1✔
2787
            1,
1✔
2788
            block_1.header().timestamp,
1✔
2789
        );
1✔
2790
        peer_loop_handler
1✔
2791
            .run_wrapper(mock, from_main_rx_clone)
1✔
2792
            .await?;
1✔
2793

1✔
2794
        // Verify that a block was sent to `main_loop`
1✔
2795
        match to_main_rx1.recv().await {
1✔
2796
            Some(PeerTaskToMain::NewBlocks(_block)) => (),
1✔
2797
            _ => bail!("Did not find msg sent to main task"),
1✔
2798
        };
1✔
2799

1✔
2800
        match to_main_rx1.recv().await {
1✔
2801
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
1✔
2802
            _ => bail!("Must receive remove of peer block max height"),
1✔
2803
        }
1✔
2804

1✔
2805
        if !state_lock.lock_guard().await.net.peer_map.is_empty() {
1✔
2806
            bail!("peer map must be empty after closing connection gracefully");
1✔
2807
        }
1✔
2808

1✔
2809
        Ok(())
1✔
2810
    }
1✔
2811

UNCOV
2812
    #[traced_test]
×
2813
    #[tokio::test]
2814
    async fn test_peer_loop_receival_of_second_block_no_blocks_in_db() -> Result<()> {
1✔
2815
        // In this scenario, the client only knows the genesis block (block 0) and then
1✔
2816
        // receives block 2, meaning that block 1 will have to be requested.
1✔
2817

1✔
2818
        let network = Network::Main;
1✔
2819
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, mut to_main_rx1, state_lock, hsd) =
1✔
2820
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
2821
        let peer_address = get_dummy_socket_address(0);
1✔
2822
        let genesis_block: Block = state_lock
1✔
2823
            .lock_guard()
1✔
2824
            .await
1✔
2825
            .chain
1✔
2826
            .archival_state()
1✔
2827
            .get_tip()
1✔
2828
            .await;
1✔
2829
        let [block_1, block_2] = fake_valid_sequence_of_blocks_for_tests(
1✔
2830
            &genesis_block,
1✔
2831
            Timestamp::hours(1),
1✔
2832
            StdRng::seed_from_u64(5550001).random(),
1✔
2833
        )
1✔
2834
        .await;
1✔
2835

1✔
2836
        let mock = Mock::new(vec![
1✔
2837
            Action::Read(PeerMessage::Block(Box::new(
1✔
2838
                block_2.clone().try_into().unwrap(),
1✔
2839
            ))),
1✔
2840
            Action::Write(PeerMessage::BlockRequestByHash(block_1.hash())),
1✔
2841
            Action::Read(PeerMessage::Block(Box::new(
1✔
2842
                block_1.clone().try_into().unwrap(),
1✔
2843
            ))),
1✔
2844
            Action::Read(PeerMessage::Bye),
1✔
2845
        ]);
1✔
2846

1✔
2847
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
2848
            to_main_tx.clone(),
1✔
2849
            state_lock.clone(),
1✔
2850
            peer_address,
1✔
2851
            hsd,
1✔
2852
            true,
1✔
2853
            1,
1✔
2854
            block_2.header().timestamp,
1✔
2855
        );
1✔
2856
        peer_loop_handler
1✔
2857
            .run_wrapper(mock, from_main_rx_clone)
1✔
2858
            .await?;
1✔
2859

1✔
2860
        match to_main_rx1.recv().await {
1✔
2861
            Some(PeerTaskToMain::NewBlocks(blocks)) => {
1✔
2862
                if blocks[0].hash() != block_1.hash() {
1✔
2863
                    bail!("1st received block by main loop must be block 1");
1✔
2864
                }
1✔
2865
                if blocks[1].hash() != block_2.hash() {
1✔
2866
                    bail!("2nd received block by main loop must be block 2");
1✔
2867
                }
1✔
2868
            }
1✔
2869
            _ => bail!("Did not find msg sent to main task 1"),
1✔
2870
        };
1✔
2871
        match to_main_rx1.recv().await {
1✔
2872
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
1✔
2873
            _ => bail!("Must receive remove of peer block max height"),
1✔
2874
        }
1✔
2875

1✔
2876
        if !state_lock.lock_guard().await.net.peer_map.is_empty() {
1✔
2877
            bail!("peer map must be empty after closing connection gracefully");
1✔
2878
        }
1✔
2879

1✔
2880
        Ok(())
1✔
2881
    }
1✔
2882

UNCOV
2883
    #[traced_test]
×
2884
    #[tokio::test]
2885
    async fn prevent_ram_exhaustion_test() -> Result<()> {
1✔
2886
        // In this scenario the peer sends more blocks than the client allows to store in the
1✔
2887
        // fork-reconciliation field. This should result in abandonment of the fork-reconciliation
1✔
2888
        // process as the alternative is that the program will crash because it runs out of RAM.
1✔
2889

1✔
2890
        let network = Network::Main;
1✔
2891
        let mut rng = StdRng::seed_from_u64(5550001);
1✔
2892
        let (
1✔
2893
            _peer_broadcast_tx,
1✔
2894
            from_main_rx_clone,
1✔
2895
            to_main_tx,
1✔
2896
            mut to_main_rx1,
1✔
2897
            mut state_lock,
1✔
2898
            _hsd,
1✔
2899
        ) = get_test_genesis_setup(network, 1, cli_args::Args::default()).await?;
1✔
2900
        let genesis_block = Block::genesis(network);
1✔
2901

1✔
2902
        // Restrict max number of blocks held in memory to 2.
1✔
2903
        let mut cli = state_lock.cli().clone();
1✔
2904
        cli.sync_mode_threshold = 2;
1✔
2905
        state_lock.set_cli(cli).await;
1✔
2906

1✔
2907
        let (hsd1, peer_address1) = get_dummy_peer_connection_data_genesis(Network::Alpha, 1);
1✔
2908
        let [block_1, _block_2, block_3, block_4] = fake_valid_sequence_of_blocks_for_tests(
1✔
2909
            &genesis_block,
1✔
2910
            Timestamp::hours(1),
1✔
2911
            rng.random(),
1✔
2912
        )
1✔
2913
        .await;
1✔
2914
        state_lock.set_new_tip(block_1.clone()).await?;
1✔
2915

1✔
2916
        let mock = Mock::new(vec![
1✔
2917
            Action::Read(PeerMessage::Block(Box::new(
1✔
2918
                block_4.clone().try_into().unwrap(),
1✔
2919
            ))),
1✔
2920
            Action::Write(PeerMessage::BlockRequestByHash(block_3.hash())),
1✔
2921
            Action::Read(PeerMessage::Block(Box::new(
1✔
2922
                block_3.clone().try_into().unwrap(),
1✔
2923
            ))),
1✔
2924
            Action::Read(PeerMessage::Bye),
1✔
2925
        ]);
1✔
2926

1✔
2927
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
2928
            to_main_tx.clone(),
1✔
2929
            state_lock.clone(),
1✔
2930
            peer_address1,
1✔
2931
            hsd1,
1✔
2932
            true,
1✔
2933
            1,
1✔
2934
            block_4.header().timestamp,
1✔
2935
        );
1✔
2936
        peer_loop_handler
1✔
2937
            .run_wrapper(mock, from_main_rx_clone)
1✔
2938
            .await?;
1✔
2939

1✔
2940
        match to_main_rx1.recv().await {
1✔
2941
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
1✔
2942
            _ => bail!("Must receive remove of peer block max height"),
1✔
2943
        }
1✔
2944

1✔
2945
        // Verify that no block is sent to main loop.
1✔
2946
        match to_main_rx1.try_recv() {
1✔
2947
            Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (),
1✔
2948
            _ => bail!("Peer must not handle more fork-reconciliation blocks than specified in CLI arguments"),
1✔
2949
        };
1✔
2950
        drop(to_main_tx);
1✔
2951

1✔
2952
        // Verify that peer is sanctioned for failed fork reconciliation attempt
1✔
2953
        assert!(state_lock
1✔
2954
            .lock_guard()
1✔
2955
            .await
1✔
2956
            .net
1✔
2957
            .get_peer_standing_from_database(peer_address1.ip())
1✔
2958
            .await
1✔
2959
            .unwrap()
1✔
2960
            .standing
1✔
2961
            .is_negative());
1✔
2962

1✔
2963
        Ok(())
1✔
2964
    }
1✔
2965

UNCOV
2966
    #[traced_test]
×
2967
    #[tokio::test]
2968
    async fn test_peer_loop_receival_of_fourth_block_one_block_in_db() {
1✔
2969
        // In this scenario, the client know the genesis block (block 0) and block 1, it
1✔
2970
        // then receives block 4, meaning that block 3 and 2 will have to be requested.
1✔
2971

1✔
2972
        let network = Network::Main;
1✔
2973
        let (
1✔
2974
            _peer_broadcast_tx,
1✔
2975
            from_main_rx_clone,
1✔
2976
            to_main_tx,
1✔
2977
            mut to_main_rx1,
1✔
2978
            mut state_lock,
1✔
2979
            hsd,
1✔
2980
        ) = get_test_genesis_setup(network, 0, cli_args::Args::default())
1✔
2981
            .await
1✔
2982
            .unwrap();
1✔
2983
        let peer_address: SocketAddr = get_dummy_socket_address(0);
1✔
2984
        let genesis_block = Block::genesis(network);
1✔
2985
        let [block_1, block_2, block_3, block_4] = fake_valid_sequence_of_blocks_for_tests(
1✔
2986
            &genesis_block,
1✔
2987
            Timestamp::hours(1),
1✔
2988
            StdRng::seed_from_u64(5550001).random(),
1✔
2989
        )
1✔
2990
        .await;
1✔
2991
        state_lock.set_new_tip(block_1.clone()).await.unwrap();
1✔
2992

1✔
2993
        let mock = Mock::new(vec![
1✔
2994
            Action::Read(PeerMessage::Block(Box::new(
1✔
2995
                block_4.clone().try_into().unwrap(),
1✔
2996
            ))),
1✔
2997
            Action::Write(PeerMessage::BlockRequestByHash(block_3.hash())),
1✔
2998
            Action::Read(PeerMessage::Block(Box::new(
1✔
2999
                block_3.clone().try_into().unwrap(),
1✔
3000
            ))),
1✔
3001
            Action::Write(PeerMessage::BlockRequestByHash(block_2.hash())),
1✔
3002
            Action::Read(PeerMessage::Block(Box::new(
1✔
3003
                block_2.clone().try_into().unwrap(),
1✔
3004
            ))),
1✔
3005
            Action::Read(PeerMessage::Bye),
1✔
3006
        ]);
1✔
3007

1✔
3008
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
3009
            to_main_tx.clone(),
1✔
3010
            state_lock.clone(),
1✔
3011
            peer_address,
1✔
3012
            hsd,
1✔
3013
            true,
1✔
3014
            1,
1✔
3015
            block_4.header().timestamp,
1✔
3016
        );
1✔
3017
        peer_loop_handler
1✔
3018
            .run_wrapper(mock, from_main_rx_clone)
1✔
3019
            .await
1✔
3020
            .unwrap();
1✔
3021

1✔
3022
        let Some(PeerTaskToMain::NewBlocks(blocks)) = to_main_rx1.recv().await else {
1✔
3023
            panic!("Did not find msg sent to main task");
1✔
3024
        };
1✔
3025
        assert_eq!(blocks[0].hash(), block_2.hash());
1✔
3026
        assert_eq!(blocks[1].hash(), block_3.hash());
1✔
3027
        assert_eq!(blocks[2].hash(), block_4.hash());
1✔
3028

1✔
3029
        let Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) = to_main_rx1.recv().await else {
1✔
3030
            panic!("Must receive remove of peer block max height");
1✔
3031
        };
1✔
3032

1✔
3033
        assert!(
1✔
3034
            state_lock.lock_guard().await.net.peer_map.is_empty(),
1✔
3035
            "peer map must be empty after closing connection gracefully"
1✔
3036
        );
1✔
3037
    }
1✔
3038

UNCOV
3039
    #[traced_test]
×
3040
    #[tokio::test]
3041
    async fn test_peer_loop_receival_of_third_block_no_blocks_in_db() -> Result<()> {
1✔
3042
        // In this scenario, the client only knows the genesis block (block 0) and then
1✔
3043
        // receives block 3, meaning that block 2 and 1 will have to be requested.
1✔
3044

1✔
3045
        let network = Network::Main;
1✔
3046
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, mut to_main_rx1, state_lock, hsd) =
1✔
3047
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
3048
        let peer_address = get_dummy_socket_address(0);
1✔
3049
        let genesis_block = Block::genesis(network);
1✔
3050

1✔
3051
        let [block_1, block_2, block_3] = fake_valid_sequence_of_blocks_for_tests(
1✔
3052
            &genesis_block,
1✔
3053
            Timestamp::hours(1),
1✔
3054
            StdRng::seed_from_u64(5550001).random(),
1✔
3055
        )
1✔
3056
        .await;
1✔
3057

1✔
3058
        let mock = Mock::new(vec![
1✔
3059
            Action::Read(PeerMessage::Block(Box::new(
1✔
3060
                block_3.clone().try_into().unwrap(),
1✔
3061
            ))),
1✔
3062
            Action::Write(PeerMessage::BlockRequestByHash(block_2.hash())),
1✔
3063
            Action::Read(PeerMessage::Block(Box::new(
1✔
3064
                block_2.clone().try_into().unwrap(),
1✔
3065
            ))),
1✔
3066
            Action::Write(PeerMessage::BlockRequestByHash(block_1.hash())),
1✔
3067
            Action::Read(PeerMessage::Block(Box::new(
1✔
3068
                block_1.clone().try_into().unwrap(),
1✔
3069
            ))),
1✔
3070
            Action::Read(PeerMessage::Bye),
1✔
3071
        ]);
1✔
3072

1✔
3073
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
3074
            to_main_tx.clone(),
1✔
3075
            state_lock.clone(),
1✔
3076
            peer_address,
1✔
3077
            hsd,
1✔
3078
            true,
1✔
3079
            1,
1✔
3080
            block_3.header().timestamp,
1✔
3081
        );
1✔
3082
        peer_loop_handler
1✔
3083
            .run_wrapper(mock, from_main_rx_clone)
1✔
3084
            .await?;
1✔
3085

1✔
3086
        match to_main_rx1.recv().await {
1✔
3087
            Some(PeerTaskToMain::NewBlocks(blocks)) => {
1✔
3088
                if blocks[0].hash() != block_1.hash() {
1✔
3089
                    bail!("1st received block by main loop must be block 1");
1✔
3090
                }
1✔
3091
                if blocks[1].hash() != block_2.hash() {
1✔
3092
                    bail!("2nd received block by main loop must be block 2");
1✔
3093
                }
1✔
3094
                if blocks[2].hash() != block_3.hash() {
1✔
3095
                    bail!("3rd received block by main loop must be block 3");
1✔
3096
                }
1✔
3097
            }
1✔
3098
            _ => bail!("Did not find msg sent to main task"),
1✔
3099
        };
1✔
3100
        match to_main_rx1.recv().await {
1✔
3101
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
1✔
3102
            _ => bail!("Must receive remove of peer block max height"),
1✔
3103
        }
1✔
3104

1✔
3105
        if !state_lock.lock_guard().await.net.peer_map.is_empty() {
1✔
3106
            bail!("peer map must be empty after closing connection gracefully");
1✔
3107
        }
1✔
3108

1✔
3109
        Ok(())
1✔
3110
    }
1✔
3111

UNCOV
3112
    #[traced_test]
×
3113
    #[tokio::test]
3114
    async fn test_block_reconciliation_interrupted_by_block_notification() -> Result<()> {
1✔
3115
        // In this scenario, the client know the genesis block (block 0) and block 1, it
1✔
3116
        // then receives block 4, meaning that block 3, 2, and 1 will have to be requested.
1✔
3117
        // But the requests are interrupted by the peer sending another message: a new block
1✔
3118
        // notification.
1✔
3119

1✔
3120
        let network = Network::Main;
1✔
3121
        let (
1✔
3122
            _peer_broadcast_tx,
1✔
3123
            from_main_rx_clone,
1✔
3124
            to_main_tx,
1✔
3125
            mut to_main_rx1,
1✔
3126
            mut state_lock,
1✔
3127
            hsd,
1✔
3128
        ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
3129
        let peer_socket_address: SocketAddr = get_dummy_socket_address(0);
1✔
3130
        let genesis_block: Block = state_lock
1✔
3131
            .lock_guard()
1✔
3132
            .await
1✔
3133
            .chain
1✔
3134
            .archival_state()
1✔
3135
            .get_tip()
1✔
3136
            .await;
1✔
3137

1✔
3138
        let [block_1, block_2, block_3, block_4, block_5] =
1✔
3139
            fake_valid_sequence_of_blocks_for_tests(
1✔
3140
                &genesis_block,
1✔
3141
                Timestamp::hours(1),
1✔
3142
                StdRng::seed_from_u64(5550001).random(),
1✔
3143
            )
1✔
3144
            .await;
1✔
3145
        state_lock.set_new_tip(block_1.clone()).await?;
1✔
3146

1✔
3147
        let mock = Mock::new(vec![
1✔
3148
            Action::Read(PeerMessage::Block(Box::new(
1✔
3149
                block_4.clone().try_into().unwrap(),
1✔
3150
            ))),
1✔
3151
            Action::Write(PeerMessage::BlockRequestByHash(block_3.hash())),
1✔
3152
            Action::Read(PeerMessage::Block(Box::new(
1✔
3153
                block_3.clone().try_into().unwrap(),
1✔
3154
            ))),
1✔
3155
            Action::Write(PeerMessage::BlockRequestByHash(block_2.hash())),
1✔
3156
            //
1✔
3157
            // Now make the interruption of the block reconciliation process
1✔
3158
            Action::Read(PeerMessage::BlockNotification((&block_5).into())),
1✔
3159
            //
1✔
3160
            // Complete the block reconciliation process by requesting the last block
1✔
3161
            // in this process, to get back to a mutually known block.
1✔
3162
            Action::Read(PeerMessage::Block(Box::new(
1✔
3163
                block_2.clone().try_into().unwrap(),
1✔
3164
            ))),
1✔
3165
            //
1✔
3166
            // Then anticipate the request of the block that was announced
1✔
3167
            // in the interruption.
1✔
3168
            // Note that we cannot anticipate the response, as only the main
1✔
3169
            // task writes to the database. And the database needs to be updated
1✔
3170
            // for the handling of block 5 to be done correctly.
1✔
3171
            Action::Write(PeerMessage::BlockRequestByHeight(
1✔
3172
                block_5.kernel.header.height,
1✔
3173
            )),
1✔
3174
            Action::Read(PeerMessage::Bye),
1✔
3175
        ]);
1✔
3176

1✔
3177
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
3178
            to_main_tx.clone(),
1✔
3179
            state_lock.clone(),
1✔
3180
            peer_socket_address,
1✔
3181
            hsd,
1✔
3182
            false,
1✔
3183
            1,
1✔
3184
            block_5.header().timestamp,
1✔
3185
        );
1✔
3186
        peer_loop_handler
1✔
3187
            .run_wrapper(mock, from_main_rx_clone)
1✔
3188
            .await?;
1✔
3189

1✔
3190
        match to_main_rx1.recv().await {
1✔
3191
            Some(PeerTaskToMain::NewBlocks(blocks)) => {
1✔
3192
                if blocks[0].hash() != block_2.hash() {
1✔
3193
                    bail!("1st received block by main loop must be block 1");
1✔
3194
                }
1✔
3195
                if blocks[1].hash() != block_3.hash() {
1✔
3196
                    bail!("2nd received block by main loop must be block 2");
1✔
3197
                }
1✔
3198
                if blocks[2].hash() != block_4.hash() {
1✔
3199
                    bail!("3rd received block by main loop must be block 3");
1✔
3200
                }
1✔
3201
            }
1✔
3202
            _ => bail!("Did not find msg sent to main task"),
1✔
3203
        };
1✔
3204
        match to_main_rx1.recv().await {
1✔
3205
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
1✔
3206
            _ => bail!("Must receive remove of peer block max height"),
1✔
3207
        }
1✔
3208

1✔
3209
        if !state_lock.lock_guard().await.net.peer_map.is_empty() {
1✔
3210
            bail!("peer map must be empty after closing connection gracefully");
1✔
3211
        }
1✔
3212

1✔
3213
        Ok(())
1✔
3214
    }
1✔
3215

UNCOV
3216
    #[traced_test]
×
3217
    #[tokio::test]
3218
    async fn test_block_reconciliation_interrupted_by_peer_list_request() -> Result<()> {
1✔
3219
        // In this scenario, the client knows the genesis block (block 0) and block 1, it
1✔
3220
        // then receives block 4, meaning that block 3, 2, and 1 will have to be requested.
1✔
3221
        // But the requests are interrupted by the peer sending another message: a request
1✔
3222
        // for a list of peers.
1✔
3223

1✔
3224
        let network = Network::Main;
1✔
3225
        let (
1✔
3226
            _peer_broadcast_tx,
1✔
3227
            from_main_rx_clone,
1✔
3228
            to_main_tx,
1✔
3229
            mut to_main_rx1,
1✔
3230
            mut state_lock,
1✔
3231
            _hsd,
1✔
3232
        ) = get_test_genesis_setup(network, 1, cli_args::Args::default()).await?;
1✔
3233
        let genesis_block = Block::genesis(network);
1✔
3234
        let peer_infos: Vec<PeerInfo> = state_lock
1✔
3235
            .lock_guard()
1✔
3236
            .await
1✔
3237
            .net
1✔
3238
            .peer_map
1✔
3239
            .clone()
1✔
3240
            .into_values()
1✔
3241
            .collect::<Vec<_>>();
1✔
3242

1✔
3243
        let [block_1, block_2, block_3, block_4] = fake_valid_sequence_of_blocks_for_tests(
1✔
3244
            &genesis_block,
1✔
3245
            Timestamp::hours(1),
1✔
3246
            StdRng::seed_from_u64(5550001).random(),
1✔
3247
        )
1✔
3248
        .await;
1✔
3249
        state_lock.set_new_tip(block_1.clone()).await?;
1✔
3250

1✔
3251
        let (hsd_1, sa_1) = get_dummy_peer_connection_data_genesis(network, 1);
1✔
3252
        let expected_peer_list_resp = vec![
1✔
3253
            (
1✔
3254
                peer_infos[0].listen_address().unwrap(),
1✔
3255
                peer_infos[0].instance_id(),
1✔
3256
            ),
1✔
3257
            (sa_1, hsd_1.instance_id),
1✔
3258
        ];
1✔
3259
        let mock = Mock::new(vec![
1✔
3260
            Action::Read(PeerMessage::Block(Box::new(
1✔
3261
                block_4.clone().try_into().unwrap(),
1✔
3262
            ))),
1✔
3263
            Action::Write(PeerMessage::BlockRequestByHash(block_3.hash())),
1✔
3264
            Action::Read(PeerMessage::Block(Box::new(
1✔
3265
                block_3.clone().try_into().unwrap(),
1✔
3266
            ))),
1✔
3267
            Action::Write(PeerMessage::BlockRequestByHash(block_2.hash())),
1✔
3268
            //
1✔
3269
            // Now make the interruption of the block reconciliation process
1✔
3270
            Action::Read(PeerMessage::PeerListRequest),
1✔
3271
            //
1✔
3272
            // Answer the request for a peer list
1✔
3273
            Action::Write(PeerMessage::PeerListResponse(expected_peer_list_resp)),
1✔
3274
            //
1✔
3275
            // Complete the block reconciliation process by requesting the last block
1✔
3276
            // in this process, to get back to a mutually known block.
1✔
3277
            Action::Read(PeerMessage::Block(Box::new(
1✔
3278
                block_2.clone().try_into().unwrap(),
1✔
3279
            ))),
1✔
3280
            Action::Read(PeerMessage::Bye),
1✔
3281
        ]);
1✔
3282

1✔
3283
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
3284
            to_main_tx,
1✔
3285
            state_lock.clone(),
1✔
3286
            sa_1,
1✔
3287
            hsd_1,
1✔
3288
            true,
1✔
3289
            1,
1✔
3290
            block_4.header().timestamp,
1✔
3291
        );
1✔
3292
        peer_loop_handler
1✔
3293
            .run_wrapper(mock, from_main_rx_clone)
1✔
3294
            .await?;
1✔
3295

1✔
3296
        // Verify that blocks are sent to `main_loop` in expected ordering
1✔
3297
        match to_main_rx1.recv().await {
1✔
3298
            Some(PeerTaskToMain::NewBlocks(blocks)) => {
1✔
3299
                if blocks[0].hash() != block_2.hash() {
1✔
3300
                    bail!("1st received block by main loop must be block 1");
1✔
3301
                }
1✔
3302
                if blocks[1].hash() != block_3.hash() {
1✔
3303
                    bail!("2nd received block by main loop must be block 2");
1✔
3304
                }
1✔
3305
                if blocks[2].hash() != block_4.hash() {
1✔
3306
                    bail!("3rd received block by main loop must be block 3");
1✔
3307
                }
1✔
3308
            }
1✔
3309
            _ => bail!("Did not find msg sent to main task"),
1✔
3310
        };
1✔
3311

1✔
3312
        assert_eq!(
1✔
3313
            1,
1✔
3314
            state_lock.lock_guard().await.net.peer_map.len(),
1✔
3315
            "One peer must remain in peer list after peer_1 closed gracefully"
1✔
3316
        );
1✔
3317

1✔
3318
        Ok(())
1✔
3319
    }
1✔
3320

UNCOV
3321
    #[traced_test]
×
3322
    #[tokio::test]
3323
    async fn empty_mempool_request_tx_test() {
1✔
3324
        // In this scenario the client receives a transaction notification from
1✔
3325
        // a peer of a transaction it doesn't know; the client must then request it.
1✔
3326

1✔
3327
        let network = Network::Main;
1✔
3328
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, mut to_main_rx1, state_lock, _hsd) =
1✔
3329
            get_test_genesis_setup(network, 1, cli_args::Args::default())
1✔
3330
                .await
1✔
3331
                .unwrap();
1✔
3332

1✔
3333
        let spending_key = state_lock
1✔
3334
            .lock_guard()
1✔
3335
            .await
1✔
3336
            .wallet_state
1✔
3337
            .wallet_entropy
1✔
3338
            .nth_symmetric_key_for_tests(0);
1✔
3339
        let genesis_block = Block::genesis(network);
1✔
3340
        let now = genesis_block.kernel.header.timestamp;
1✔
3341
        let (transaction_1, _, _change_output) = state_lock
1✔
3342
            .lock_guard()
1✔
3343
            .await
1✔
3344
            .create_transaction_with_prover_capability(
1✔
3345
                Default::default(),
1✔
3346
                spending_key.into(),
1✔
3347
                UtxoNotificationMedium::OffChain,
1✔
3348
                NativeCurrencyAmount::coins(0),
1✔
3349
                now,
1✔
3350
                TxProvingCapability::ProofCollection,
1✔
3351
                &TritonVmJobQueue::dummy(),
1✔
3352
            )
1✔
3353
            .await
1✔
3354
            .unwrap();
1✔
3355

1✔
3356
        // Build the resulting transaction notification
1✔
3357
        let tx_notification: TransactionNotification = (&transaction_1).try_into().unwrap();
1✔
3358
        let mock = Mock::new(vec![
1✔
3359
            Action::Read(PeerMessage::TransactionNotification(tx_notification)),
1✔
3360
            Action::Write(PeerMessage::TransactionRequest(tx_notification.txid)),
1✔
3361
            Action::Read(PeerMessage::Transaction(Box::new(
1✔
3362
                (&transaction_1).try_into().unwrap(),
1✔
3363
            ))),
1✔
3364
            Action::Read(PeerMessage::Bye),
1✔
3365
        ]);
1✔
3366

1✔
3367
        let (hsd_1, _sa_1) = get_dummy_peer_connection_data_genesis(network, 1);
1✔
3368

1✔
3369
        // Mock a timestamp to allow transaction to be considered valid
1✔
3370
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
3371
            to_main_tx,
1✔
3372
            state_lock.clone(),
1✔
3373
            get_dummy_socket_address(0),
1✔
3374
            hsd_1.clone(),
1✔
3375
            true,
1✔
3376
            1,
1✔
3377
            now,
1✔
3378
        );
1✔
3379

1✔
3380
        let mut peer_state = MutablePeerState::new(hsd_1.tip_header.height);
1✔
3381

1✔
3382
        assert!(
1✔
3383
            state_lock.lock_guard().await.mempool.is_empty(),
1✔
3384
            "Mempool must be empty at init"
1✔
3385
        );
1✔
3386
        peer_loop_handler
1✔
3387
            .run(mock, from_main_rx_clone, &mut peer_state)
1✔
3388
            .await
1✔
3389
            .unwrap();
1✔
3390

1✔
3391
        // Transaction must be sent to `main_loop`. The transaction is stored to the mempool
1✔
3392
        // by the `main_loop`.
1✔
3393
        match to_main_rx1.recv().await {
1✔
3394
            Some(PeerTaskToMain::Transaction(_)) => (),
1✔
3395
            _ => panic!("Must receive remove of peer block max height"),
1✔
3396
        };
1✔
3397
    }
1✔
3398

UNCOV
3399
    #[traced_test]
×
3400
    #[tokio::test]
3401
    async fn populated_mempool_request_tx_test() -> Result<()> {
1✔
3402
        // In this scenario the peer is informed of a transaction that it already knows
1✔
3403

1✔
3404
        let network = Network::Main;
1✔
3405
        let (
1✔
3406
            _peer_broadcast_tx,
1✔
3407
            from_main_rx_clone,
1✔
3408
            to_main_tx,
1✔
3409
            mut to_main_rx1,
1✔
3410
            mut state_lock,
1✔
3411
            _hsd,
1✔
3412
        ) = get_test_genesis_setup(network, 1, cli_args::Args::default())
1✔
3413
            .await
1✔
3414
            .unwrap();
1✔
3415
        let spending_key = state_lock
1✔
3416
            .lock_guard()
1✔
3417
            .await
1✔
3418
            .wallet_state
1✔
3419
            .wallet_entropy
1✔
3420
            .nth_symmetric_key_for_tests(0);
1✔
3421

1✔
3422
        let genesis_block = Block::genesis(network);
1✔
3423
        let now = genesis_block.kernel.header.timestamp;
1✔
3424
        let (transaction_1, _, _change_output) = state_lock
1✔
3425
            .lock_guard()
1✔
3426
            .await
1✔
3427
            .create_transaction_with_prover_capability(
1✔
3428
                Default::default(),
1✔
3429
                spending_key.into(),
1✔
3430
                UtxoNotificationMedium::OffChain,
1✔
3431
                NativeCurrencyAmount::coins(0),
1✔
3432
                now,
1✔
3433
                TxProvingCapability::ProofCollection,
1✔
3434
                &TritonVmJobQueue::dummy(),
1✔
3435
            )
1✔
3436
            .await
1✔
3437
            .unwrap();
1✔
3438

1✔
3439
        let (hsd_1, _sa_1) = get_dummy_peer_connection_data_genesis(network, 1);
1✔
3440
        let mut peer_loop_handler = PeerLoopHandler::new(
1✔
3441
            to_main_tx,
1✔
3442
            state_lock.clone(),
1✔
3443
            get_dummy_socket_address(0),
1✔
3444
            hsd_1.clone(),
1✔
3445
            true,
1✔
3446
            1,
1✔
3447
        );
1✔
3448
        let mut peer_state = MutablePeerState::new(hsd_1.tip_header.height);
1✔
3449

1✔
3450
        assert!(
1✔
3451
            state_lock.lock_guard().await.mempool.is_empty(),
1✔
3452
            "Mempool must be empty at init"
1✔
3453
        );
1✔
3454
        state_lock
1✔
3455
            .lock_guard_mut()
1✔
3456
            .await
1✔
3457
            .mempool_insert(transaction_1.clone(), TransactionOrigin::Foreign)
1✔
3458
            .await;
1✔
3459
        assert!(
1✔
3460
            !state_lock.lock_guard().await.mempool.is_empty(),
1✔
3461
            "Mempool must be non-empty after insertion"
1✔
3462
        );
1✔
3463

1✔
3464
        // Run the peer loop and verify expected exchange -- namely that the
1✔
3465
        // tx notification is received and the the transaction is *not*
1✔
3466
        // requested.
1✔
3467
        let tx_notification: TransactionNotification = (&transaction_1).try_into().unwrap();
1✔
3468
        let mock = Mock::new(vec![
1✔
3469
            Action::Read(PeerMessage::TransactionNotification(tx_notification)),
1✔
3470
            Action::Read(PeerMessage::Bye),
1✔
3471
        ]);
1✔
3472
        peer_loop_handler
1✔
3473
            .run(mock, from_main_rx_clone, &mut peer_state)
1✔
3474
            .await
1✔
3475
            .unwrap();
1✔
3476

1✔
3477
        // nothing is allowed to be sent to `main_loop`
1✔
3478
        match to_main_rx1.try_recv() {
1✔
3479
            Err(TryRecvError::Empty) => (),
1✔
3480
            Err(TryRecvError::Disconnected) => panic!("to_main channel must still be open"),
1✔
3481
            Ok(_) => panic!("to_main channel must be empty"),
1✔
3482
        };
1✔
3483
        Ok(())
1✔
3484
    }
1✔
3485

3486
    mod block_proposals {
3487
        use super::*;
3488
        use crate::tests::shared::get_dummy_handshake_data_for_genesis;
3489

3490
        struct TestSetup {
3491
            peer_loop_handler: PeerLoopHandler,
3492
            to_main_rx: mpsc::Receiver<PeerTaskToMain>,
3493
            from_main_rx: broadcast::Receiver<MainToPeerTask>,
3494
            peer_state: MutablePeerState,
3495
            to_main_tx: mpsc::Sender<PeerTaskToMain>,
3496
            genesis_block: Block,
3497
            peer_broadcast_tx: broadcast::Sender<MainToPeerTask>,
3498
        }
3499

3500
        async fn genesis_setup(network: Network) -> TestSetup {
2✔
3501
            let (peer_broadcast_tx, from_main_rx, to_main_tx, to_main_rx, alice, _hsd) =
2✔
3502
                get_test_genesis_setup(network, 0, cli_args::Args::default())
2✔
3503
                    .await
2✔
3504
                    .unwrap();
2✔
3505
            let peer_hsd = get_dummy_handshake_data_for_genesis(network);
2✔
3506
            let peer_loop_handler = PeerLoopHandler::new(
2✔
3507
                to_main_tx.clone(),
2✔
3508
                alice.clone(),
2✔
3509
                get_dummy_socket_address(0),
2✔
3510
                peer_hsd.clone(),
2✔
3511
                true,
2✔
3512
                1,
2✔
3513
            );
2✔
3514
            let peer_state = MutablePeerState::new(peer_hsd.tip_header.height);
2✔
3515

2✔
3516
            // (peer_loop_handler, to_main_rx1)
2✔
3517
            TestSetup {
2✔
3518
                peer_broadcast_tx,
2✔
3519
                peer_loop_handler,
2✔
3520
                to_main_rx,
2✔
3521
                from_main_rx,
2✔
3522
                peer_state,
2✔
3523
                to_main_tx,
2✔
3524
                genesis_block: Block::genesis(network),
2✔
3525
            }
2✔
3526
        }
2✔
3527

UNCOV
3528
        #[traced_test]
×
3529
        #[tokio::test]
3530
        async fn accept_block_proposal_height_one() {
1✔
3531
            // Node knows genesis block, receives a block proposal for block 1
1✔
3532
            // and must accept this. Verify that main loop is informed of block
1✔
3533
            // proposal.
1✔
3534
            let TestSetup {
1✔
3535
                peer_broadcast_tx,
1✔
3536
                mut peer_loop_handler,
1✔
3537
                mut to_main_rx,
1✔
3538
                from_main_rx,
1✔
3539
                mut peer_state,
1✔
3540
                to_main_tx,
1✔
3541
                genesis_block,
1✔
3542
            } = genesis_setup(Network::Main).await;
1✔
3543
            let block1 = fake_valid_block_for_tests(
1✔
3544
                &peer_loop_handler.global_state_lock,
1✔
3545
                StdRng::seed_from_u64(5550001).random(),
1✔
3546
            )
1✔
3547
            .await;
1✔
3548

1✔
3549
            let mock = Mock::new(vec![
1✔
3550
                Action::Read(PeerMessage::BlockProposal(Box::new(block1))),
1✔
3551
                Action::Read(PeerMessage::Bye),
1✔
3552
            ]);
1✔
3553
            peer_loop_handler
1✔
3554
                .run(mock, from_main_rx, &mut peer_state)
1✔
3555
                .await
1✔
3556
                .unwrap();
1✔
3557

1✔
3558
            match to_main_rx.try_recv().unwrap() {
1✔
3559
                PeerTaskToMain::BlockProposal(block) => {
1✔
3560
                    assert_eq!(genesis_block.hash(), block.header().prev_block_digest);
1✔
3561
                }
1✔
3562
                _ => panic!("Expected main loop to be informed of block proposal"),
1✔
3563
            };
1✔
3564

1✔
3565
            drop(to_main_tx);
1✔
3566
            drop(peer_broadcast_tx);
1✔
3567
        }
1✔
3568

UNCOV
3569
        #[traced_test]
×
3570
        #[tokio::test]
3571
        async fn accept_block_proposal_notification_height_one() {
1✔
3572
            // Node knows genesis block, receives a block proposal notification
1✔
3573
            // for block 1 and must accept this by requesting the block
1✔
3574
            // proposal from peer.
1✔
3575
            let TestSetup {
1✔
3576
                peer_broadcast_tx,
1✔
3577
                mut peer_loop_handler,
1✔
3578
                to_main_rx: _,
1✔
3579
                from_main_rx,
1✔
3580
                mut peer_state,
1✔
3581
                to_main_tx,
1✔
3582
                ..
1✔
3583
            } = genesis_setup(Network::Main).await;
1✔
3584
            let block1 = fake_valid_block_for_tests(
1✔
3585
                &peer_loop_handler.global_state_lock,
1✔
3586
                StdRng::seed_from_u64(5550001).random(),
1✔
3587
            )
1✔
3588
            .await;
1✔
3589

1✔
3590
            let mock = Mock::new(vec![
1✔
3591
                Action::Read(PeerMessage::BlockProposalNotification((&block1).into())),
1✔
3592
                Action::Write(PeerMessage::BlockProposalRequest(
1✔
3593
                    BlockProposalRequest::new(block1.body().mast_hash()),
1✔
3594
                )),
1✔
3595
                Action::Read(PeerMessage::Bye),
1✔
3596
            ]);
1✔
3597
            peer_loop_handler
1✔
3598
                .run(mock, from_main_rx, &mut peer_state)
1✔
3599
                .await
1✔
3600
                .unwrap();
1✔
3601

1✔
3602
            drop(to_main_tx);
1✔
3603
            drop(peer_broadcast_tx);
1✔
3604
        }
1✔
3605
    }
3606

3607
    mod proof_qualities {
3608
        use strum::IntoEnumIterator;
3609

3610
        use super::*;
3611
        use crate::config_models::cli_args;
3612
        use crate::models::blockchain::transaction::Transaction;
3613
        use crate::models::peer::transfer_transaction::TransactionProofQuality;
3614
        use crate::tests::shared::mock_genesis_global_state;
3615

3616
        async fn tx_of_proof_quality(
2✔
3617
            network: Network,
2✔
3618
            quality: TransactionProofQuality,
2✔
3619
        ) -> Transaction {
2✔
3620
            let wallet_secret = WalletEntropy::devnet_wallet();
2✔
3621
            let alice_key = wallet_secret.nth_generation_spending_key_for_tests(0);
2✔
3622
            let alice =
2✔
3623
                mock_genesis_global_state(network, 1, wallet_secret, cli_args::Args::default())
2✔
3624
                    .await;
2✔
3625
            let alice = alice.lock_guard().await;
2✔
3626
            let genesis_block = alice.chain.light_state();
2✔
3627
            let in_seven_months = genesis_block.header().timestamp + Timestamp::months(7);
2✔
3628
            let prover_capability = match quality {
2✔
3629
                TransactionProofQuality::ProofCollection => TxProvingCapability::ProofCollection,
1✔
3630
                TransactionProofQuality::SingleProof => TxProvingCapability::SingleProof,
1✔
3631
            };
3632
            alice
2✔
3633
                .create_transaction_with_prover_capability(
2✔
3634
                    vec![].into(),
2✔
3635
                    alice_key.into(),
2✔
3636
                    UtxoNotificationMedium::OffChain,
2✔
3637
                    NativeCurrencyAmount::coins(1),
2✔
3638
                    in_seven_months,
2✔
3639
                    prover_capability,
2✔
3640
                    &TritonVmJobQueue::dummy(),
2✔
3641
                )
2✔
3642
                .await
2✔
3643
                .unwrap()
2✔
3644
                .0
2✔
3645
        }
2✔
3646

UNCOV
3647
        #[traced_test]
×
3648
        #[tokio::test]
3649
        async fn client_favors_higher_proof_quality() {
1✔
3650
            // In this scenario the peer is informed of a transaction that it
1✔
3651
            // already knows, and it's tested that it checks the proof quality
1✔
3652
            // field and verifies that it exceeds the proof in the mempool
1✔
3653
            // before requesting the transasction.
1✔
3654
            let network = Network::Main;
1✔
3655
            let proof_collection_tx =
1✔
3656
                tx_of_proof_quality(network, TransactionProofQuality::ProofCollection).await;
1✔
3657
            let single_proof_tx =
1✔
3658
                tx_of_proof_quality(network, TransactionProofQuality::SingleProof).await;
1✔
3659

1✔
3660
            for (own_tx_pq, new_tx_pq) in
4✔
3661
                TransactionProofQuality::iter().cartesian_product(TransactionProofQuality::iter())
1✔
3662
            {
1✔
3663
                use TransactionProofQuality::*;
1✔
3664

1✔
3665
                let (
1✔
3666
                    _peer_broadcast_tx,
4✔
3667
                    from_main_rx_clone,
4✔
3668
                    to_main_tx,
4✔
3669
                    mut to_main_rx1,
4✔
3670
                    mut alice,
4✔
3671
                    handshake_data,
4✔
3672
                ) = get_test_genesis_setup(network, 1, cli_args::Args::default())
4✔
3673
                    .await
4✔
3674
                    .unwrap();
4✔
3675

1✔
3676
                let (own_tx, new_tx) = match (own_tx_pq, new_tx_pq) {
4✔
3677
                    (ProofCollection, ProofCollection) => {
1✔
3678
                        (&proof_collection_tx, &proof_collection_tx)
1✔
3679
                    }
1✔
3680
                    (ProofCollection, SingleProof) => (&proof_collection_tx, &single_proof_tx),
1✔
3681
                    (SingleProof, ProofCollection) => (&single_proof_tx, &proof_collection_tx),
1✔
3682
                    (SingleProof, SingleProof) => (&single_proof_tx, &single_proof_tx),
1✔
3683
                };
1✔
3684

1✔
3685
                alice
4✔
3686
                    .lock_guard_mut()
4✔
3687
                    .await
4✔
3688
                    .mempool_insert(own_tx.to_owned(), TransactionOrigin::Foreign)
4✔
3689
                    .await;
4✔
3690

1✔
3691
                let tx_notification: TransactionNotification = new_tx.try_into().unwrap();
4✔
3692

4✔
3693
                let own_proof_is_supreme = own_tx_pq >= new_tx_pq;
4✔
3694
                let mock = if own_proof_is_supreme {
4✔
3695
                    Mock::new(vec![
3✔
3696
                        Action::Read(PeerMessage::TransactionNotification(tx_notification)),
3✔
3697
                        Action::Read(PeerMessage::Bye),
3✔
3698
                    ])
3✔
3699
                } else {
1✔
3700
                    Mock::new(vec![
1✔
3701
                        Action::Read(PeerMessage::TransactionNotification(tx_notification)),
1✔
3702
                        Action::Write(PeerMessage::TransactionRequest(tx_notification.txid)),
1✔
3703
                        Action::Read(PeerMessage::Transaction(Box::new(
1✔
3704
                            new_tx.try_into().unwrap(),
1✔
3705
                        ))),
1✔
3706
                        Action::Read(PeerMessage::Bye),
1✔
3707
                    ])
1✔
3708
                };
1✔
3709

1✔
3710
                let now = proof_collection_tx.kernel.timestamp;
4✔
3711
                let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
4✔
3712
                    to_main_tx,
4✔
3713
                    alice.clone(),
4✔
3714
                    get_dummy_socket_address(0),
4✔
3715
                    handshake_data.clone(),
4✔
3716
                    true,
4✔
3717
                    1,
4✔
3718
                    now,
4✔
3719
                );
4✔
3720
                let mut peer_state = MutablePeerState::new(handshake_data.tip_header.height);
4✔
3721

4✔
3722
                peer_loop_handler
4✔
3723
                    .run(mock, from_main_rx_clone, &mut peer_state)
4✔
3724
                    .await
4✔
3725
                    .unwrap();
4✔
3726

4✔
3727
                if own_proof_is_supreme {
4✔
3728
                    match to_main_rx1.try_recv() {
3✔
3729
                        Err(TryRecvError::Empty) => (),
3✔
3730
                        Err(TryRecvError::Disconnected) => {
1✔
3731
                            panic!("to_main channel must still be open")
1✔
3732
                        }
1✔
3733
                        Ok(_) => panic!("to_main channel must be empty"),
1✔
3734
                    }
1✔
3735
                } else {
1✔
3736
                    match to_main_rx1.try_recv() {
1✔
3737
                        Err(TryRecvError::Empty) => panic!("Transaction must be sent to main loop"),
1✔
3738
                        Err(TryRecvError::Disconnected) => {
1✔
3739
                            panic!("to_main channel must still be open")
1✔
3740
                        }
1✔
3741
                        Ok(PeerTaskToMain::Transaction(_)) => (),
1✔
3742
                        _ => panic!("Unexpected result from channel"),
1✔
3743
                    }
1✔
3744
                }
1✔
3745
            }
1✔
3746
        }
1✔
3747
    }
3748

3749
    mod sync_challenges {
3750
        use super::*;
3751
        use crate::tests::shared::fake_valid_sequence_of_blocks_for_tests_dyn;
3752

UNCOV
3753
        #[traced_test]
×
3754
        #[tokio::test]
3755
        async fn bad_sync_challenge_height_greater_than_tip() {
1✔
3756
            // Criterium: Challenge height may not exceed that of tip in the
1✔
3757
            // request.
1✔
3758

1✔
3759
            let network = Network::Main;
1✔
3760
            let (
1✔
3761
                _alice_main_to_peer_tx,
1✔
3762
                alice_main_to_peer_rx,
1✔
3763
                alice_peer_to_main_tx,
1✔
3764
                alice_peer_to_main_rx,
1✔
3765
                mut alice,
1✔
3766
                alice_hsd,
1✔
3767
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default())
1✔
3768
                .await
1✔
3769
                .unwrap();
1✔
3770
            let genesis_block: Block = Block::genesis(network);
1✔
3771
            let blocks: [Block; 11] = fake_valid_sequence_of_blocks_for_tests(
1✔
3772
                &genesis_block,
1✔
3773
                Timestamp::hours(1),
1✔
3774
                [0u8; 32],
1✔
3775
            )
1✔
3776
            .await;
1✔
3777
            for block in &blocks {
12✔
3778
                alice.set_new_tip(block.clone()).await.unwrap();
11✔
3779
            }
1✔
3780

1✔
3781
            let bh12 = blocks.last().unwrap().header().height;
1✔
3782
            let sync_challenge = SyncChallenge {
1✔
3783
                tip_digest: blocks[9].hash(),
1✔
3784
                challenges: [bh12; 10],
1✔
3785
            };
1✔
3786
            let alice_p2p_messages = Mock::new(vec![
1✔
3787
                Action::Read(PeerMessage::SyncChallenge(sync_challenge)),
1✔
3788
                Action::Read(PeerMessage::Bye),
1✔
3789
            ]);
1✔
3790

1✔
3791
            let peer_address = get_dummy_socket_address(0);
1✔
3792
            let mut alice_peer_loop_handler = PeerLoopHandler::new(
1✔
3793
                alice_peer_to_main_tx.clone(),
1✔
3794
                alice.clone(),
1✔
3795
                peer_address,
1✔
3796
                alice_hsd,
1✔
3797
                false,
1✔
3798
                1,
1✔
3799
            );
1✔
3800
            alice_peer_loop_handler
1✔
3801
                .run_wrapper(alice_p2p_messages, alice_main_to_peer_rx)
1✔
3802
                .await
1✔
3803
                .unwrap();
1✔
3804

1✔
3805
            drop(alice_peer_to_main_rx);
1✔
3806

1✔
3807
            let latest_sanction = alice
1✔
3808
                .lock_guard()
1✔
3809
                .await
1✔
3810
                .net
1✔
3811
                .get_peer_standing_from_database(peer_address.ip())
1✔
3812
                .await
1✔
3813
                .unwrap();
1✔
3814
            assert_eq!(
1✔
3815
                NegativePeerSanction::InvalidSyncChallenge,
1✔
3816
                latest_sanction
1✔
3817
                    .latest_punishment
1✔
3818
                    .expect("peer must be sanctioned")
1✔
3819
                    .0
1✔
3820
            );
1✔
3821
        }
1✔
3822

UNCOV
3823
        #[traced_test]
×
3824
        #[tokio::test]
3825
        async fn bad_sync_challenge_genesis_block_doesnt_crash_client() {
1✔
3826
            // Criterium: Challenge may not point to genesis block, or block 1, as
1✔
3827
            // tip.
1✔
3828

1✔
3829
            let network = Network::Main;
1✔
3830
            let genesis_block: Block = Block::genesis(network);
1✔
3831

1✔
3832
            let alice_cli = cli_args::Args::default();
1✔
3833
            let (
1✔
3834
                _alice_main_to_peer_tx,
1✔
3835
                alice_main_to_peer_rx,
1✔
3836
                alice_peer_to_main_tx,
1✔
3837
                alice_peer_to_main_rx,
1✔
3838
                alice,
1✔
3839
                alice_hsd,
1✔
3840
            ) = get_test_genesis_setup(network, 0, alice_cli).await.unwrap();
1✔
3841

1✔
3842
            let sync_challenge = SyncChallenge {
1✔
3843
                tip_digest: genesis_block.hash(),
1✔
3844
                challenges: [BlockHeight::genesis(); 10],
1✔
3845
            };
1✔
3846

1✔
3847
            let alice_p2p_messages = Mock::new(vec![
1✔
3848
                Action::Read(PeerMessage::SyncChallenge(sync_challenge)),
1✔
3849
                Action::Read(PeerMessage::Bye),
1✔
3850
            ]);
1✔
3851

1✔
3852
            let peer_address = get_dummy_socket_address(0);
1✔
3853
            let mut alice_peer_loop_handler = PeerLoopHandler::new(
1✔
3854
                alice_peer_to_main_tx.clone(),
1✔
3855
                alice.clone(),
1✔
3856
                peer_address,
1✔
3857
                alice_hsd,
1✔
3858
                false,
1✔
3859
                1,
1✔
3860
            );
1✔
3861
            alice_peer_loop_handler
1✔
3862
                .run_wrapper(alice_p2p_messages, alice_main_to_peer_rx)
1✔
3863
                .await
1✔
3864
                .unwrap();
1✔
3865

1✔
3866
            drop(alice_peer_to_main_rx);
1✔
3867

1✔
3868
            let latest_sanction = alice
1✔
3869
                .lock_guard()
1✔
3870
                .await
1✔
3871
                .net
1✔
3872
                .get_peer_standing_from_database(peer_address.ip())
1✔
3873
                .await
1✔
3874
                .unwrap();
1✔
3875
            assert_eq!(
1✔
3876
                NegativePeerSanction::InvalidSyncChallenge,
1✔
3877
                latest_sanction
1✔
3878
                    .latest_punishment
1✔
3879
                    .expect("peer must be sanctioned")
1✔
3880
                    .0
1✔
3881
            );
1✔
3882
        }
1✔
3883

UNCOV
3884
        #[traced_test]
×
3885
        #[tokio::test]
3886
        async fn sync_challenge_happy_path() -> Result<()> {
1✔
3887
            // Bob notifies Alice of a block whose parameters satisfy the sync mode
1✔
3888
            // criterion. Alice issues a challenge. Bob responds. Alice enters into
1✔
3889
            // sync mode.
1✔
3890

1✔
3891
            let mut rng = rand::rng();
1✔
3892
            let network = Network::Main;
1✔
3893
            let genesis_block: Block = Block::genesis(network);
1✔
3894

1✔
3895
            const ALICE_SYNC_MODE_THRESHOLD: usize = 10;
1✔
3896
            let alice_cli = cli_args::Args {
1✔
3897
                sync_mode_threshold: ALICE_SYNC_MODE_THRESHOLD,
1✔
3898
                ..Default::default()
1✔
3899
            };
1✔
3900
            let (
1✔
3901
                _alice_main_to_peer_tx,
1✔
3902
                alice_main_to_peer_rx,
1✔
3903
                alice_peer_to_main_tx,
1✔
3904
                mut alice_peer_to_main_rx,
1✔
3905
                mut alice,
1✔
3906
                alice_hsd,
1✔
3907
            ) = get_test_genesis_setup(network, 0, alice_cli).await?;
1✔
3908
            let _alice_socket_address = get_dummy_socket_address(0);
1✔
3909

1✔
3910
            let (
1✔
3911
                _bob_main_to_peer_tx,
1✔
3912
                _bob_main_to_peer_rx,
1✔
3913
                _bob_peer_to_main_tx,
1✔
3914
                _bob_peer_to_main_rx,
1✔
3915
                mut bob,
1✔
3916
                _bob_hsd,
1✔
3917
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
3918
            let bob_socket_address = get_dummy_socket_address(0);
1✔
3919

1✔
3920
            let now = genesis_block.header().timestamp + Timestamp::hours(1);
1✔
3921
            let block_1 = fake_valid_block_for_tests(&alice, rng.random()).await;
1✔
3922
            assert!(
1✔
3923
                block_1.is_valid(&genesis_block, now).await,
1✔
3924
                "Block must be valid for this test to make sense"
1✔
3925
            );
1✔
3926
            let alice_tip = &block_1;
1✔
3927
            alice.set_new_tip(block_1.clone()).await?;
1✔
3928
            bob.set_new_tip(block_1.clone()).await?;
1✔
3929

1✔
3930
            // produce enough blocks to ensure alice needs to go into sync mode
1✔
3931
            // with this block notification.
1✔
3932
            let blocks = fake_valid_sequence_of_blocks_for_tests_dyn(
1✔
3933
                &block_1,
1✔
3934
                TARGET_BLOCK_INTERVAL,
1✔
3935
                rng.random(),
1✔
3936
                rng.random_range(ALICE_SYNC_MODE_THRESHOLD + 1..20),
1✔
3937
            )
1✔
3938
            .await;
1✔
3939
            for block in &blocks {
12✔
3940
                bob.set_new_tip(block.clone()).await?;
11✔
3941
            }
1✔
3942
            let bob_tip = blocks.last().unwrap();
1✔
3943

1✔
3944
            let block_notification_from_bob = PeerBlockNotification {
1✔
3945
                hash: bob_tip.hash(),
1✔
3946
                height: bob_tip.header().height,
1✔
3947
                cumulative_proof_of_work: bob_tip.header().cumulative_proof_of_work,
1✔
3948
            };
1✔
3949

1✔
3950
            let alice_rng_seed = rng.random::<[u8; 32]>();
1✔
3951
            let mut alice_rng_clone = StdRng::from_seed(alice_rng_seed);
1✔
3952
            let sync_challenge_from_alice = SyncChallenge::generate(
1✔
3953
                &block_notification_from_bob,
1✔
3954
                alice_tip.header().height,
1✔
3955
                alice_rng_clone.random(),
1✔
3956
            );
1✔
3957

1✔
3958
            println!(
1✔
3959
                "sync challenge from alice:\n{:?}",
1✔
3960
                sync_challenge_from_alice
1✔
3961
            );
1✔
3962

1✔
3963
            let sync_challenge_response_from_bob = bob
1✔
3964
                .lock_guard()
1✔
3965
                .await
1✔
3966
                .response_to_sync_challenge(sync_challenge_from_alice)
1✔
3967
                .await
1✔
3968
                .expect("should be able to respond to sync challenge");
1✔
3969

1✔
3970
            let alice_p2p_messages = Mock::new(vec![
1✔
3971
                Action::Read(PeerMessage::BlockNotification(block_notification_from_bob)),
1✔
3972
                Action::Write(PeerMessage::SyncChallenge(sync_challenge_from_alice)),
1✔
3973
                Action::Read(PeerMessage::SyncChallengeResponse(Box::new(
1✔
3974
                    sync_challenge_response_from_bob,
1✔
3975
                ))),
1✔
3976
                Action::Read(PeerMessage::BlockNotification(block_notification_from_bob)),
1✔
3977
                // Ensure no 2nd sync challenge is sent, as timeout has not yet passed.
1✔
3978
                // The absence of a Write here checks that a 2nd challenge isn't sent
1✔
3979
                // when a successful was just received.
1✔
3980
                Action::Read(PeerMessage::Bye),
1✔
3981
            ]);
1✔
3982

1✔
3983
            let mut alice_peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
3984
                alice_peer_to_main_tx.clone(),
1✔
3985
                alice.clone(),
1✔
3986
                bob_socket_address,
1✔
3987
                alice_hsd,
1✔
3988
                false,
1✔
3989
                1,
1✔
3990
                bob_tip.header().timestamp,
1✔
3991
            );
1✔
3992
            alice_peer_loop_handler.set_rng(StdRng::from_seed(alice_rng_seed));
1✔
3993
            alice_peer_loop_handler
1✔
3994
                .run_wrapper(alice_p2p_messages, alice_main_to_peer_rx)
1✔
3995
                .await?;
1✔
3996

1✔
3997
            // AddPeerMaxBlockHeight message triggered *after* sync challenge
1✔
3998
            let mut expected_anchor_mmra = bob_tip.body().block_mmr_accumulator.clone();
1✔
3999
            expected_anchor_mmra.append(bob_tip.hash());
1✔
4000
            let expected_message_from_alice_peer_loop = PeerTaskToMain::AddPeerMaxBlockHeight {
1✔
4001
                peer_address: bob_socket_address,
1✔
4002
                claimed_height: bob_tip.header().height,
1✔
4003
                claimed_cumulative_pow: bob_tip.header().cumulative_proof_of_work,
1✔
4004
                claimed_block_mmra: expected_anchor_mmra,
1✔
4005
            };
1✔
4006
            let observed_message_from_alice_peer_loop = alice_peer_to_main_rx.recv().await.unwrap();
1✔
4007
            assert_eq!(
1✔
4008
                expected_message_from_alice_peer_loop,
1✔
4009
                observed_message_from_alice_peer_loop
1✔
4010
            );
1✔
4011

1✔
4012
            Ok(())
1✔
4013
        }
1✔
4014
    }
4015
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc