• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Neptune-Crypto / neptune-core / 13897743430

14 Mar 2025 03:15PM UTC coverage: 43.131% (-41.0%) from 84.172%
13897743430

push

github

jan-ferdinand
feat: Conditionally connect to bootstrap nodes

In order to help newcomers to connect to peers on the network, certain
nodes act as bootstrap nodes. Those nodes tend to be well known, and
users tend to specify them as potential peers via command line
arguments.

Previously, connections to potential peers specified as command line
arguments were always re-initiated when dropped. This can cause an
involuntary denial-of-service of those well-known bootstrap nodes.

Now, a well-connected node does not initiate connections with bootstrap
nodes.

119 of 164 new or added lines in 4 files covered. (72.56%)

656 existing lines in 10 files now uncovered.

2926 of 6784 relevant lines covered (43.13%)

1571562.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.0
/src/peer_loop.rs
1
use std::cmp;
2
use std::marker::Unpin;
3
use std::net::SocketAddr;
4
use std::time::Duration;
5
use std::time::SystemTime;
6

7
use anyhow::bail;
8
use anyhow::Result;
9
use chrono::DateTime;
10
use chrono::Utc;
11
use futures::sink::Sink;
12
use futures::sink::SinkExt;
13
use futures::stream::TryStream;
14
use futures::stream::TryStreamExt;
15
use itertools::Itertools;
16
use rand::rngs::StdRng;
17
use rand::Rng;
18
use rand::SeedableRng;
19
use tasm_lib::triton_vm::prelude::Digest;
20
use tasm_lib::twenty_first::prelude::Mmr;
21
use tasm_lib::twenty_first::prelude::MmrMembershipProof;
22
use tasm_lib::twenty_first::util_types::mmr::mmr_accumulator::MmrAccumulator;
23
use tokio::select;
24
use tokio::sync::broadcast;
25
use tokio::sync::mpsc;
26
use tracing::debug;
27
use tracing::error;
28
use tracing::info;
29
use tracing::warn;
30

31
use crate::connect_to_peers::close_peer_connected_callback;
32
use crate::macros::fn_name;
33
use crate::macros::log_slow_scope;
34
use crate::main_loop::MAX_NUM_DIGESTS_IN_BATCH_REQUEST;
35
use crate::models::blockchain::block::block_height::BlockHeight;
36
use crate::models::blockchain::block::mutator_set_update::MutatorSetUpdate;
37
use crate::models::blockchain::block::Block;
38
use crate::models::blockchain::transaction::transaction_kernel::TransactionConfirmabilityError;
39
use crate::models::blockchain::transaction::Transaction;
40
use crate::models::channel::MainToPeerTask;
41
use crate::models::channel::PeerTaskToMain;
42
use crate::models::channel::PeerTaskToMainTransaction;
43
use crate::models::peer::bootstrap_info::BootstrapInfo;
44
use crate::models::peer::bootstrap_info::BootstrapStatus;
45
use crate::models::peer::handshake_data::HandshakeData;
46
use crate::models::peer::peer_info::PeerConnectionInfo;
47
use crate::models::peer::peer_info::PeerInfo;
48
use crate::models::peer::transfer_block::TransferBlock;
49
use crate::models::peer::BlockProposalRequest;
50
use crate::models::peer::BlockRequestBatch;
51
use crate::models::peer::IssuedSyncChallenge;
52
use crate::models::peer::MutablePeerState;
53
use crate::models::peer::NegativePeerSanction;
54
use crate::models::peer::PeerMessage;
55
use crate::models::peer::PeerSanction;
56
use crate::models::peer::PeerStanding;
57
use crate::models::peer::PositivePeerSanction;
58
use crate::models::peer::SyncChallenge;
59
use crate::models::proof_abstractions::mast_hash::MastHash;
60
use crate::models::proof_abstractions::timestamp::Timestamp;
61
use crate::models::state::block_proposal::BlockProposalRejectError;
62
use crate::models::state::mempool::MEMPOOL_IGNORE_TRANSACTIONS_THIS_MANY_SECS_AHEAD;
63
use crate::models::state::mempool::MEMPOOL_TX_THRESHOLD_AGE_IN_SECS;
64
use crate::models::state::GlobalState;
65
use crate::models::state::GlobalStateLock;
66
use crate::util_types::mutator_set::removal_record::RemovalRecordValidityError;
67

68
const STANDARD_BLOCK_BATCH_SIZE: usize = 250;
69
const MAX_PEER_LIST_LENGTH: usize = 10;
70
const MINIMUM_BLOCK_BATCH_SIZE: usize = 2;
71

72
const KEEP_CONNECTION_ALIVE: bool = false;
73
const DISCONNECT_CONNECTION: bool = true;
74

75
pub type PeerStandingNumber = i32;
76

77
/// Handles messages from peers via TCP
78
///
79
/// also handles messages from main task over the main-to-peer-tasks broadcast
80
/// channel.
81
#[derive(Debug, Clone)]
82
pub struct PeerLoopHandler {
83
    to_main_tx: mpsc::Sender<PeerTaskToMain>,
84
    global_state_lock: GlobalStateLock,
85
    peer_address: SocketAddr,
86
    peer_handshake_data: HandshakeData,
87
    inbound_connection: bool,
88
    distance: u8,
89
    rng: StdRng,
90
    #[cfg(test)]
91
    mock_now: Option<Timestamp>,
92
}
93

94
impl PeerLoopHandler {
95
    pub(crate) fn new(
39✔
96
        to_main_tx: mpsc::Sender<PeerTaskToMain>,
39✔
97
        global_state_lock: GlobalStateLock,
39✔
98
        peer_address: SocketAddr,
39✔
99
        peer_handshake_data: HandshakeData,
39✔
100
        inbound_connection: bool,
39✔
101
        distance: u8,
39✔
102
    ) -> Self {
39✔
103
        Self {
39✔
104
            to_main_tx,
39✔
105
            global_state_lock,
39✔
106
            peer_address,
39✔
107
            peer_handshake_data,
39✔
108
            inbound_connection,
39✔
109
            distance,
39✔
110
            rng: StdRng::from_rng(&mut rand::rng()),
39✔
111
            #[cfg(test)]
39✔
112
            mock_now: None,
39✔
113
        }
39✔
114
    }
39✔
115

116
    /// Allows for mocked timestamps such that time dependencies may be tested.
117
    #[cfg(test)]
118
    pub(crate) fn with_mocked_time(
20✔
119
        to_main_tx: mpsc::Sender<PeerTaskToMain>,
20✔
120
        global_state_lock: GlobalStateLock,
20✔
121
        peer_address: SocketAddr,
20✔
122
        peer_handshake_data: HandshakeData,
20✔
123
        inbound_connection: bool,
20✔
124
        distance: u8,
20✔
125
        mocked_time: Timestamp,
20✔
126
    ) -> Self {
20✔
127
        Self {
20✔
128
            to_main_tx,
20✔
129
            global_state_lock,
20✔
130
            peer_address,
20✔
131
            peer_handshake_data,
20✔
132
            inbound_connection,
20✔
133
            distance,
20✔
134
            mock_now: Some(mocked_time),
20✔
135
            rng: StdRng::from_rng(&mut rand::rng()),
20✔
136
        }
20✔
137
    }
20✔
138

139
    /// Overwrite the random number generator object with a specific one.
140
    ///
141
    /// Useful for derandomizing tests.
142
    #[cfg(test)]
143
    fn set_rng(&mut self, rng: StdRng) {
1✔
144
        self.rng = rng;
1✔
145
    }
1✔
146

147
    fn now(&self) -> Timestamp {
27✔
148
        #[cfg(not(test))]
27✔
149
        {
27✔
150
            Timestamp::now()
27✔
151
        }
27✔
152
        #[cfg(test)]
27✔
153
        {
27✔
154
            self.mock_now.unwrap_or(Timestamp::now())
27✔
155
        }
27✔
156
    }
27✔
157

158
    /// Punish a peer for bad behavior.
159
    ///
160
    /// Return `Err` if the peer in question is (now) banned.
161
    ///
162
    /// # Locking:
163
    ///   * acquires `global_state_lock` for write
164
    async fn punish(&mut self, reason: NegativePeerSanction) -> Result<()> {
6✔
165
        let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
6✔
166
        warn!("Punishing peer {} for {:?}", self.peer_address.ip(), reason);
6✔
167
        debug!(
6✔
168
            "Peer standing before punishment is {}",
×
169
            global_state_mut
×
170
                .net
×
UNCOV
171
                .peer_map
×
UNCOV
172
                .get(&self.peer_address)
×
UNCOV
173
                .unwrap()
×
174
                .standing
175
        );
176

177
        let Some(peer_info) = global_state_mut.net.peer_map.get_mut(&self.peer_address) else {
6✔
UNCOV
178
            bail!("Could not read peer map.");
×
179
        };
180
        let sanction_result = peer_info.standing.sanction(PeerSanction::Negative(reason));
6✔
181
        if let Err(err) = sanction_result {
6✔
182
            warn!("Banning peer: {err}");
1✔
183
        }
5✔
184

185
        sanction_result.map_err(|err| anyhow::anyhow!("Banning peer: {err}"))
6✔
186
    }
6✔
187

188
    /// Reward a peer for good behavior.
189
    ///
190
    /// Return `Err` if the peer in question is banned.
191
    ///
192
    /// # Locking:
193
    ///   * acquires `global_state_lock` for write
194
    async fn reward(&mut self, reason: PositivePeerSanction) -> Result<()> {
7✔
195
        let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
7✔
196
        info!("Rewarding peer {} for {:?}", self.peer_address.ip(), reason);
7✔
197
        let Some(peer_info) = global_state_mut.net.peer_map.get_mut(&self.peer_address) else {
7✔
198
            error!("Could not read peer map.");
1✔
199
            return Ok(());
1✔
200
        };
201
        let sanction_result = peer_info.standing.sanction(PeerSanction::Positive(reason));
6✔
202
        if sanction_result.is_err() {
6✔
UNCOV
203
            error!("Cannot reward banned peer");
×
204
        }
6✔
205

206
        sanction_result.map_err(|err| anyhow::anyhow!("Cannot reward banned peer: {err}"))
6✔
207
    }
7✔
208

209
    /// Potentially update a peer's bootstrap status.
210
    ///
211
    /// Per connection, the bootstrap status can be set once without
212
    /// incurring [punishment](Self::punish). After a cooldown period, the
213
    /// bootstrap status can be set again. The cooldown exists to prevent peers
214
    /// from forcing many write-lock acquisition of the global state lock.
215
    ///
216
    /// This method only acquires a write lock for the global state lock if
217
    /// necessary.
218
    ///
219
    /// # Locking:
220
    ///   * acquires `global_state_lock` for read
221
    ///   * might acquire `global_state_lock` for write
222
    async fn handle_bootstrap_status_message(&mut self, status: BootstrapStatus) -> Result<()> {
20✔
223
        const BOOTSTRAP_STATUS_UPDATE_COOLDOWN_PERIOD: Duration = Duration::from_secs(5 * 60);
224

225
        let peer_address = self.peer_address;
20✔
226
        let Some(connection_start_time) = self
20✔
227
            .global_state_lock
20✔
228
            .lock_guard()
20✔
229
            .await
20✔
230
            .net
231
            .peer_map
232
            .get(&peer_address)
20✔
233
            .map(|info| info.own_timestamp_connection_established)
20✔
234
        else {
UNCOV
235
            warn!("Peer {peer_address} not found in peer map when updating bootstrap status.");
×
UNCOV
236
            return Ok(()); // not great, not ban-worthy
×
237
        };
238
        let last_update_time = self
20✔
239
            .global_state_lock
20✔
240
            .lock_guard()
20✔
241
            .await
20✔
242
            .net
243
            .bootstrap_status
244
            .get(&peer_address)
20✔
245
            .map(|info| info.last_set)
20✔
246
            .unwrap_or(SystemTime::UNIX_EPOCH);
20✔
247

20✔
248
        if last_update_time < connection_start_time {
20✔
249
            debug!("Setting bootstrap status of peer {peer_address} to \"{status}\"");
20✔
250
            self.global_state_lock
20✔
251
                .lock_guard_mut()
20✔
252
                .await
20✔
253
                .net
254
                .bootstrap_status
255
                .insert(peer_address, BootstrapInfo::new(status));
20✔
256
            return Ok(());
20✔
UNCOV
257
        }
×
258

UNCOV
259
        let Ok(duration_since_last_update) = SystemTime::now().duration_since(last_update_time)
×
260
        else {
UNCOV
261
            warn!("Last bootstrap update of peer {peer_address} is in the future.");
×
UNCOV
262
            return Ok(()); // not great, not ban-worthy
×
263
        };
UNCOV
264
        if duration_since_last_update < BOOTSTRAP_STATUS_UPDATE_COOLDOWN_PERIOD {
×
UNCOV
265
            info!(
×
UNCOV
266
                "Punishing peer {peer_address} for bootstrap update within cooldown period. \
×
UNCOV
267
                Update received after {duration} seconds of last update, \
×
UNCOV
268
                cooldown period is {cooldown} seconds.",
×
UNCOV
269
                duration = duration_since_last_update.as_secs(),
×
UNCOV
270
                cooldown = BOOTSTRAP_STATUS_UPDATE_COOLDOWN_PERIOD.as_secs(),
×
271
            );
UNCOV
272
            self.punish(NegativePeerSanction::BootstrapStatusUpdateSpam)
×
UNCOV
273
                .await?;
×
UNCOV
274
            return Ok(());
×
UNCOV
275
        }
×
UNCOV
276

×
UNCOV
277
        debug!("Updating bootstrap status of peer {peer_address} to \"{status}\"");
×
UNCOV
278
        self.global_state_lock
×
UNCOV
279
            .lock_guard_mut()
×
UNCOV
280
            .await
×
281
            .net
282
            .bootstrap_status
UNCOV
283
            .insert(peer_address, BootstrapInfo::new(status));
×
UNCOV
284
        self.punish(NegativePeerSanction::BootstrapStatusUpdate)
×
UNCOV
285
            .await?;
×
286

287
        Ok(())
×
288
    }
20✔
289

290
    /// Construct a batch response, with blocks and their MMR membership proofs
291
    /// relative to a specified anchor.
292
    ///
293
    /// Returns `None` if the anchor has a lower leaf count than the blocks, or
294
    /// a block height of the response exceeds own tip height.
295
    async fn batch_response(
16✔
296
        state: &GlobalState,
16✔
297
        blocks: Vec<Block>,
16✔
298
        anchor: &MmrAccumulator,
16✔
299
    ) -> Option<Vec<(TransferBlock, MmrMembershipProof)>> {
16✔
300
        let own_tip_height = state.chain.light_state().header().height;
16✔
301
        let block_heights_match_anchor = blocks
16✔
302
            .iter()
16✔
303
            .all(|bl| bl.header().height < anchor.num_leafs().into());
46✔
304
        let block_heights_known = blocks.iter().all(|bl| bl.header().height <= own_tip_height);
46✔
305
        if !block_heights_match_anchor || !block_heights_known {
16✔
UNCOV
306
            let max_block_height = match blocks.iter().map(|bl| bl.header().height).max() {
×
UNCOV
307
                Some(height) => height.to_string(),
×
UNCOV
308
                None => "None".to_owned(),
×
309
            };
310

311
            debug!("max_block_height: {max_block_height}");
×
UNCOV
312
            debug!("own_tip_height: {own_tip_height}");
×
UNCOV
313
            debug!("anchor.num_leafs(): {}", anchor.num_leafs());
×
UNCOV
314
            debug!("block_heights_match_anchor: {block_heights_match_anchor}");
×
UNCOV
315
            debug!("block_heights_known: {block_heights_known}");
×
UNCOV
316
            return None;
×
317
        }
16✔
318

16✔
319
        let mut ret = vec![];
16✔
320
        for block in blocks {
62✔
321
            let mmr_mp = state
46✔
322
                .chain
46✔
323
                .archival_state()
46✔
324
                .archival_block_mmr
46✔
325
                .ammr()
46✔
326
                .prove_membership_relative_to_smaller_mmr(
46✔
327
                    block.header().height.into(),
46✔
328
                    anchor.num_leafs(),
46✔
329
                )
46✔
330
                .await;
46✔
331
            let block: TransferBlock = block.try_into().unwrap();
46✔
332
            ret.push((block, mmr_mp));
46✔
333
        }
334

335
        Some(ret)
16✔
336
    }
16✔
337

338
    /// Handle validation and send all blocks to the main task if they're all
339
    /// valid. Use with a list of blocks or a single block. When the
340
    /// `received_blocks` is a list, the parent of the `i+1`th block in the
341
    /// list is the `i`th block. The parent of element zero in this list is
342
    /// `parent_of_first_block`.
343
    ///
344
    /// # Return Value
345
    ///  - `Err` when the connection should be closed;
346
    ///  - `Ok(None)` if some block is invalid
347
    ///  - `Ok(None)` if the last block has insufficient cumulative PoW and we
348
    ///    are not syncing;
349
    ///  - `Ok(None)` if the last block has insufficient height and we are
350
    ///    syncing;
351
    ///  - `Ok(Some(block_height))` otherwise, referring to the block with the
352
    ///    highest height in the batch.
353
    ///
354
    /// A return value of Ok(Some(_)) means that the message was passed on to
355
    /// main loop.
356
    ///
357
    /// # Locking
358
    ///   * Acquires `global_state_lock` for write via `self.punish(..)` and
359
    ///     `self.reward(..)`.
360
    ///
361
    /// # Panics
362
    ///
363
    ///  - Panics if called with the empty list.
364
    async fn handle_blocks(
8✔
365
        &mut self,
8✔
366
        received_blocks: Vec<Block>,
8✔
367
        parent_of_first_block: Block,
8✔
368
    ) -> Result<Option<BlockHeight>> {
8✔
369
        debug!(
8✔
UNCOV
370
            "attempting to validate {} {}",
×
UNCOV
371
            received_blocks.len(),
×
UNCOV
372
            if received_blocks.len() == 1 {
×
UNCOV
373
                "block"
×
374
            } else {
UNCOV
375
                "blocks"
×
376
            }
377
        );
378
        let now = self.now();
8✔
379
        debug!("validating with respect to current timestamp {now}");
8✔
380
        let mut previous_block = &parent_of_first_block;
8✔
381
        for new_block in &received_blocks {
24✔
382
            let new_block_has_proof_of_work = new_block.has_proof_of_work(previous_block.header());
17✔
383
            debug!("new block has proof of work? {new_block_has_proof_of_work}");
17✔
384
            let new_block_is_valid = new_block.is_valid(previous_block, now).await;
17✔
385
            debug!("new block is valid? {new_block_is_valid}");
17✔
386
            if !new_block_has_proof_of_work {
17✔
387
                warn!(
1✔
UNCOV
388
                    "Received invalid proof-of-work for block of height {} from peer with IP {}",
×
UNCOV
389
                    new_block.kernel.header.height, self.peer_address
×
390
                );
391
                warn!("Difficulty is {}.", previous_block.kernel.header.difficulty);
1✔
392
                warn!(
1✔
UNCOV
393
                    "Proof of work should be {} (or more) but was [{}].",
×
UNCOV
394
                    previous_block.kernel.header.difficulty.target(),
×
UNCOV
395
                    new_block.hash().values().iter().join(", ")
×
396
                );
397
                self.punish(NegativePeerSanction::InvalidBlock((
1✔
398
                    new_block.kernel.header.height,
1✔
399
                    new_block.hash(),
1✔
400
                )))
1✔
401
                .await?;
1✔
402
                warn!("Failed to validate block due to insufficient PoW");
1✔
403
                return Ok(None);
1✔
404
            } else if !new_block_is_valid {
16✔
UNCOV
405
                warn!(
×
UNCOV
406
                    "Received invalid block of height {} from peer with IP {}",
×
UNCOV
407
                    new_block.kernel.header.height, self.peer_address
×
408
                );
UNCOV
409
                self.punish(NegativePeerSanction::InvalidBlock((
×
UNCOV
410
                    new_block.kernel.header.height,
×
UNCOV
411
                    new_block.hash(),
×
UNCOV
412
                )))
×
UNCOV
413
                .await?;
×
UNCOV
414
                warn!("Failed to validate block: invalid block");
×
UNCOV
415
                return Ok(None);
×
416
            }
16✔
417
            info!(
16✔
UNCOV
418
                "Block with height {} is valid. mined: {}",
×
UNCOV
419
                new_block.kernel.header.height,
×
UNCOV
420
                new_block.kernel.header.timestamp.standard_format()
×
421
            );
422

423
            previous_block = new_block;
16✔
424
        }
425

426
        // evaluate the fork choice rule
427
        debug!("Checking last block's canonicity ...");
7✔
428
        let last_block = received_blocks.last().unwrap();
7✔
429
        let is_canonical = self
7✔
430
            .global_state_lock
7✔
431
            .lock_guard()
7✔
432
            .await
7✔
433
            .incoming_block_is_more_canonical(last_block);
7✔
434
        let last_block_height = last_block.header().height;
7✔
435
        let sync_mode_active_and_have_new_champion = self
7✔
436
            .global_state_lock
7✔
437
            .lock_guard()
7✔
438
            .await
7✔
439
            .net
440
            .sync_anchor
441
            .as_ref()
7✔
442
            .is_some_and(|x| {
7✔
UNCOV
443
                x.champion
×
UNCOV
444
                    .is_none_or(|(height, _)| height < last_block_height)
×
445
            });
7✔
446
        if !is_canonical && !sync_mode_active_and_have_new_champion {
7✔
447
            warn!(
1✔
UNCOV
448
                "Received {} blocks from peer but incoming blocks are less \
×
UNCOV
449
            canonical than current tip, or current sync-champion.",
×
UNCOV
450
                received_blocks.len()
×
451
            );
452
            return Ok(None);
1✔
453
        }
6✔
454

6✔
455
        // Send the new blocks to the main task which handles the state update
6✔
456
        // and storage to the database.
6✔
457
        let number_of_received_blocks = received_blocks.len();
6✔
458
        self.to_main_tx
6✔
459
            .send(PeerTaskToMain::NewBlocks(received_blocks))
6✔
460
            .await?;
6✔
461
        info!(
6✔
UNCOV
462
            "Updated block info by block from peer. block height {}",
×
463
            last_block_height
464
        );
465

466
        // Valuable, new, hard-to-produce information. Reward peer.
467
        self.reward(PositivePeerSanction::ValidBlocks(number_of_received_blocks))
6✔
468
            .await?;
6✔
469

470
        Ok(Some(last_block_height))
6✔
471
    }
8✔
472

473
    /// Take a single block received from a peer and (attempt to) find a path
474
    /// between the received block and a common ancestor stored in the blocks
475
    /// database.
476
    ///
477
    /// This function attempts to find the parent of the received block, either
478
    /// by searching the database or by requesting it from a peer.
479
    ///  - If the parent is not stored, it is requested from the peer and the
480
    ///    received block is pushed to the fork reconciliation list for later
481
    ///    handling by this function. The fork reconciliation list starts out
482
    ///    empty, but grows as more parents are requested and transmitted.
483
    ///  - If the parent is found in the database, a) block handling continues:
484
    ///    the entire list of fork reconciliation blocks are passed down the
485
    ///    pipeline, potentially leading to a state update; and b) the fork
486
    ///    reconciliation list is cleared.
487
    ///
488
    /// Locking:
489
    ///   * Acquires `global_state_lock` for write via `self.punish(..)` and
490
    ///     `self.reward(..)`.
491
    async fn try_ensure_path<S>(
20✔
492
        &mut self,
20✔
493
        received_block: Box<Block>,
20✔
494
        peer: &mut S,
20✔
495
        peer_state: &mut MutablePeerState,
20✔
496
    ) -> Result<()>
20✔
497
    where
20✔
498
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
20✔
499
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
20✔
500
        <S as TryStream>::Error: std::error::Error,
20✔
501
    {
20✔
502
        // Does the received block match the fork reconciliation list?
503
        let received_block_matches_fork_reconciliation_list = if let Some(successor) =
20✔
504
            peer_state.fork_reconciliation_blocks.last()
20✔
505
        {
506
            let valid = successor
10✔
507
                .is_valid(received_block.as_ref(), self.now())
10✔
508
                .await;
10✔
509
            if !valid {
10✔
UNCOV
510
                warn!(
×
UNCOV
511
                        "Fork reconciliation failed after receiving {} blocks: successor of received block is invalid",
×
UNCOV
512
                        peer_state.fork_reconciliation_blocks.len() + 1
×
513
                    );
514
            }
10✔
515
            valid
10✔
516
        } else {
517
            true
10✔
518
        };
519

520
        // Are we running out of RAM?
521
        let too_many_blocks = peer_state.fork_reconciliation_blocks.len() + 1
20✔
522
            >= self.global_state_lock.cli().sync_mode_threshold;
20✔
523
        if too_many_blocks {
20✔
524
            warn!(
1✔
UNCOV
525
                "Fork reconciliation failed after receiving {} blocks: block count exceeds sync mode threshold",
×
UNCOV
526
                peer_state.fork_reconciliation_blocks.len() + 1
×
527
            );
528
        }
19✔
529

530
        // Block mismatch or too many blocks: abort!
531
        if !received_block_matches_fork_reconciliation_list || too_many_blocks {
20✔
532
            self.punish(NegativePeerSanction::ForkResolutionError((
1✔
533
                received_block.header().height,
1✔
534
                peer_state.fork_reconciliation_blocks.len() as u16,
1✔
535
                received_block.hash(),
1✔
536
            )))
1✔
537
            .await?;
1✔
538
            peer_state.fork_reconciliation_blocks = vec![];
1✔
539
            return Ok(());
1✔
540
        }
19✔
541

19✔
542
        // otherwise, append
19✔
543
        peer_state.fork_reconciliation_blocks.push(*received_block);
19✔
544

19✔
545
        // Try fetch parent
19✔
546
        let received_block_header = *peer_state
19✔
547
            .fork_reconciliation_blocks
19✔
548
            .last()
19✔
549
            .unwrap()
19✔
550
            .header();
19✔
551

19✔
552
        let parent_digest = received_block_header.prev_block_digest;
19✔
553
        let parent_height = received_block_header.height.previous()
19✔
554
            .expect("transferred block must have previous height because genesis block cannot be transferred");
19✔
555
        debug!("Try ensure path: fetching parent block");
19✔
556
        let parent_block = self
19✔
557
            .global_state_lock
19✔
558
            .lock_guard()
19✔
559
            .await
19✔
560
            .chain
561
            .archival_state()
19✔
562
            .get_block(parent_digest)
19✔
563
            .await?;
19✔
564
        debug!(
19✔
UNCOV
565
            "Completed parent block fetching from DB: {}",
×
UNCOV
566
            if parent_block.is_some() {
×
UNCOV
567
                "found".to_string()
×
568
            } else {
UNCOV
569
                "not found".to_string()
×
570
            }
571
        );
572

573
        // If parent is not known (but not genesis) request it.
574
        let Some(parent_block) = parent_block else {
19✔
575
            if parent_height.is_genesis() {
11✔
576
                peer_state.fork_reconciliation_blocks.clear();
1✔
577
                self.punish(NegativePeerSanction::DifferentGenesis).await?;
1✔
UNCOV
578
                return Ok(());
×
579
            }
10✔
580
            info!(
10✔
UNCOV
581
                "Parent not known: Requesting previous block with height {} from peer",
×
582
                parent_height
583
            );
584

585
            peer.send(PeerMessage::BlockRequestByHash(parent_digest))
10✔
586
                .await?;
10✔
587

588
            return Ok(());
10✔
589
        };
590

591
        // We want to treat the received fork reconciliation blocks (plus the
592
        // received block) in reverse order, from oldest to newest, because
593
        // they were requested from high to low block height.
594
        let mut new_blocks = peer_state.fork_reconciliation_blocks.clone();
8✔
595
        new_blocks.reverse();
8✔
596

8✔
597
        // Reset the fork resolution state since we got all the way back to a
8✔
598
        // block that we have.
8✔
599
        let fork_reconciliation_event = !peer_state.fork_reconciliation_blocks.is_empty();
8✔
600
        peer_state.fork_reconciliation_blocks.clear();
8✔
601

602
        if let Some(new_block_height) = self.handle_blocks(new_blocks, parent_block).await? {
8✔
603
            // If `BlockNotification` was received during a block reconciliation
604
            // event, then the peer might have one (or more (unlikely)) blocks
605
            // that we do not have. We should thus request those blocks.
606
            if fork_reconciliation_event
6✔
607
                && peer_state.highest_shared_block_height > new_block_height
6✔
608
            {
609
                peer.send(PeerMessage::BlockRequestByHeight(
1✔
610
                    peer_state.highest_shared_block_height,
1✔
611
                ))
1✔
612
                .await?;
1✔
613
            }
5✔
614
        }
2✔
615

616
        Ok(())
8✔
617
    }
20✔
618

619
    /// Handle peer messages and returns Ok(true) if connection should be closed.
620
    /// Connection should also be closed if an error is returned.
621
    /// Otherwise, returns OK(false).
622
    ///
623
    /// Locking:
624
    ///   * Acquires `global_state_lock` for read.
625
    ///   * Acquires `global_state_lock` for write via `self.punish(..)` and
626
    ///     `self.reward(..)`.
627
    async fn handle_peer_message<S>(
129✔
628
        &mut self,
129✔
629
        msg: PeerMessage,
129✔
630
        peer: &mut S,
129✔
631
        peer_state_info: &mut MutablePeerState,
129✔
632
    ) -> Result<bool>
129✔
633
    where
129✔
634
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
129✔
635
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
129✔
636
        <S as TryStream>::Error: std::error::Error,
129✔
637
    {
129✔
638
        debug!(
129✔
UNCOV
639
            "Received {} from peer {}",
×
UNCOV
640
            msg.get_type(),
×
641
            self.peer_address
642
        );
643
        match msg {
129✔
644
            PeerMessage::Bye => {
645
                // Note that the current peer is not removed from the global_state.peer_map here
646
                // but that this is done by the caller.
647
                info!("Got bye. Closing connection to peer");
58✔
648
                Ok(DISCONNECT_CONNECTION)
58✔
649
            }
650
            PeerMessage::PeerListRequest => {
651
                let peer_info = {
2✔
652
                    log_slow_scope!(fn_name!() + "::PeerMessage::PeerListRequest");
2✔
653

654
                    // We are interested in the address on which peers accept ingoing connections,
655
                    // not in the address in which they are connected to us. We are only interested in
656
                    // peers that accept incoming connections.
657
                    let mut peer_info: Vec<(SocketAddr, u128)> = self
2✔
658
                        .global_state_lock
2✔
659
                        .lock_guard()
2✔
660
                        .await
2✔
661
                        .net
662
                        .peer_map
663
                        .values()
2✔
664
                        .filter(|peer_info| peer_info.listen_address().is_some())
5✔
665
                        .take(MAX_PEER_LIST_LENGTH) // limit length of response
2✔
666
                        .map(|peer_info| {
5✔
667
                            (
5✔
668
                                // unwrap is safe bc of above `filter`
5✔
669
                                peer_info.listen_address().unwrap(),
5✔
670
                                peer_info.instance_id(),
5✔
671
                            )
5✔
672
                        })
5✔
673
                        .collect();
2✔
674

2✔
675
                    // We sort the returned list, so this function is easier to test
2✔
676
                    peer_info.sort_by_cached_key(|x| x.0);
5✔
677
                    peer_info
2✔
678
                };
2✔
679

2✔
680
                debug!("Responding with: {:?}", peer_info);
2✔
681
                peer.send(PeerMessage::PeerListResponse(peer_info)).await?;
2✔
682
                Ok(KEEP_CONNECTION_ALIVE)
2✔
683
            }
UNCOV
684
            PeerMessage::PeerListResponse(peers) => {
×
UNCOV
685
                log_slow_scope!(fn_name!() + "::PeerMessage::PeerListResponse");
×
UNCOV
686

×
UNCOV
687
                if peers.len() > MAX_PEER_LIST_LENGTH {
×
UNCOV
688
                    self.punish(NegativePeerSanction::FloodPeerListResponse)
×
UNCOV
689
                        .await?;
×
UNCOV
690
                }
×
UNCOV
691
                self.to_main_tx
×
NEW
692
                    .send(PeerTaskToMain::PeerDiscoveryAnswer(
×
UNCOV
693
                        peers,
×
UNCOV
694
                        self.distance + 1,
×
NEW
695
                    ))
×
UNCOV
696
                    .await?;
×
UNCOV
697
                Ok(KEEP_CONNECTION_ALIVE)
×
698
            }
699
            PeerMessage::BlockNotificationRequest => {
UNCOV
700
                debug!("Got BlockNotificationRequest");
×
701

UNCOV
702
                peer.send(PeerMessage::BlockNotification(
×
UNCOV
703
                    self.global_state_lock
×
UNCOV
704
                        .lock_guard()
×
705
                        .await
×
706
                        .chain
707
                        .light_state()
×
708
                        .into(),
×
UNCOV
709
                ))
×
UNCOV
710
                .await?;
×
711

UNCOV
712
                Ok(KEEP_CONNECTION_ALIVE)
×
713
            }
714
            PeerMessage::BlockNotification(block_notification) => {
4✔
715
                const SYNC_CHALLENGE_COOLDOWN: Timestamp = Timestamp::minutes(10);
716

717
                let (tip_header, sync_anchor_is_set) = {
4✔
718
                    let state = self.global_state_lock.lock_guard().await;
4✔
719
                    (
4✔
720
                        *state.chain.light_state().header(),
4✔
721
                        state.net.sync_anchor.is_some(),
4✔
722
                    )
4✔
723
                };
4✔
724
                debug!(
4✔
UNCOV
725
                    "Got BlockNotification of height {}. Own height is {}",
×
726
                    block_notification.height, tip_header.height
727
                );
728

729
                let sync_mode_threshold = self.global_state_lock.cli().sync_mode_threshold;
4✔
730
                let now = self.now();
4✔
731
                let time_since_latest_successful_challenge = peer_state_info
4✔
732
                    .successful_sync_challenge_response_time
4✔
733
                    .map(|then| now - then);
4✔
734
                let cooldown_expired = time_since_latest_successful_challenge
4✔
735
                    .is_none_or(|time_passed| time_passed > SYNC_CHALLENGE_COOLDOWN);
4✔
736
                let exceeds_sync_mode_threshold = GlobalState::sync_mode_threshold_stateless(
4✔
737
                    &tip_header,
4✔
738
                    block_notification.height,
4✔
739
                    block_notification.cumulative_proof_of_work,
4✔
740
                    sync_mode_threshold,
4✔
741
                );
4✔
742
                if cooldown_expired && exceeds_sync_mode_threshold {
4✔
743
                    debug!("sync mode criterion satisfied.");
1✔
744

745
                    if peer_state_info.sync_challenge.is_some() {
1✔
UNCOV
746
                        warn!("Cannot launch new sync challenge because one is already on-going.");
×
UNCOV
747
                        return Ok(KEEP_CONNECTION_ALIVE);
×
748
                    }
1✔
749

1✔
750
                    info!(
1✔
UNCOV
751
                        "Peer indicates block which satisfies sync mode criterion, issuing challenge."
×
752
                    );
753
                    let challenge = SyncChallenge::generate(
1✔
754
                        &block_notification,
1✔
755
                        tip_header.height,
1✔
756
                        self.rng.random(),
1✔
757
                    );
1✔
758
                    peer_state_info.sync_challenge = Some(IssuedSyncChallenge::new(
1✔
759
                        challenge,
1✔
760
                        block_notification.cumulative_proof_of_work,
1✔
761
                        self.now(),
1✔
762
                    ));
1✔
763

1✔
764
                    debug!("sending challenge ...");
1✔
765
                    peer.send(PeerMessage::SyncChallenge(challenge)).await?;
1✔
766

767
                    return Ok(KEEP_CONNECTION_ALIVE);
1✔
768
                }
3✔
769

3✔
770
                peer_state_info.highest_shared_block_height = block_notification.height;
3✔
771
                let block_is_new = tip_header.cumulative_proof_of_work
3✔
772
                    < block_notification.cumulative_proof_of_work;
3✔
773

3✔
774
                debug!("block_is_new: {}", block_is_new);
3✔
775

776
                if block_is_new
3✔
777
                    && peer_state_info.fork_reconciliation_blocks.is_empty()
3✔
778
                    && !sync_anchor_is_set
2✔
779
                    && !exceeds_sync_mode_threshold
2✔
780
                {
781
                    debug!(
1✔
UNCOV
782
                        "sending BlockRequestByHeight to peer for block with height {}",
×
783
                        block_notification.height
784
                    );
785
                    peer.send(PeerMessage::BlockRequestByHeight(block_notification.height))
1✔
786
                        .await?;
1✔
787
                } else {
788
                    debug!(
2✔
UNCOV
789
                        "ignoring peer block. height {}. new: {}, reconciling_fork: {}",
×
UNCOV
790
                        block_notification.height,
×
UNCOV
791
                        block_is_new,
×
UNCOV
792
                        !peer_state_info.fork_reconciliation_blocks.is_empty()
×
793
                    );
794
                }
795

796
                Ok(KEEP_CONNECTION_ALIVE)
3✔
797
            }
798
            PeerMessage::SyncChallenge(sync_challenge) => {
2✔
UNCOV
799
                let response = {
×
800
                    log_slow_scope!(fn_name!() + "::PeerMessage::SyncChallenge");
2✔
801

2✔
802
                    info!("Got sync challenge from {}", self.peer_address.ip());
2✔
803

804
                    let response = self
2✔
805
                        .global_state_lock
2✔
806
                        .lock_guard()
2✔
807
                        .await
2✔
808
                        .response_to_sync_challenge(sync_challenge)
2✔
809
                        .await;
2✔
810

811
                    match response {
2✔
UNCOV
812
                        Ok(resp) => resp,
×
813
                        Err(e) => {
2✔
814
                            warn!("could not generate sync challenge response:\n{e}");
2✔
815
                            self.punish(NegativePeerSanction::InvalidSyncChallenge)
2✔
816
                                .await?;
2✔
817
                            return Ok(KEEP_CONNECTION_ALIVE);
2✔
818
                        }
819
                    }
820
                };
821

UNCOV
822
                info!(
×
823
                    "Responding to sync challenge from {}",
×
824
                    self.peer_address.ip()
×
825
                );
UNCOV
826
                peer.send(PeerMessage::SyncChallengeResponse(Box::new(response)))
×
UNCOV
827
                    .await?;
×
828

UNCOV
829
                Ok(KEEP_CONNECTION_ALIVE)
×
830
            }
831
            PeerMessage::SyncChallengeResponse(challenge_response) => {
1✔
832
                const SYNC_RESPONSE_TIMEOUT: Timestamp = Timestamp::seconds(45);
833

834
                log_slow_scope!(fn_name!() + "::PeerMessage::SyncChallengeResponse");
1✔
835
                info!(
1✔
UNCOV
836
                    "Got sync challenge response from {}",
×
UNCOV
837
                    self.peer_address.ip()
×
838
                );
839

840
                // The purpose of the sync challenge and sync challenge response
841
                // is to avoid going into sync mode based on a malicious target
842
                // fork. Instead of verifying that the claimed proof-of-work
843
                // number is correct (which would require sending and verifying,
844
                // at least, all blocks between luca (whatever that is) and the
845
                // claimed tip), we use a heuristic that requires less
846
                // communication and less verification work. The downside of
847
                // using a heuristic here is a nonzero false positive and false
848
                // negative rate. Note that the false negative event
849
                // (maliciously sending someone into sync mode based on a bogus
850
                // fork) still requires a significant amount of work from the
851
                // attacker, *in addition* to being lucky. Also, down the line
852
                // succinctness (and more specifically, recursive block
853
                // validation) obviates this entire subprotocol.
854

855
                // Did we issue a challenge?
856
                let Some(issued_challenge) = peer_state_info.sync_challenge else {
1✔
UNCOV
857
                    warn!("Sync challenge response was not prompted.");
×
858
                    self.punish(NegativePeerSanction::UnexpectedSyncChallengeResponse)
×
859
                        .await?;
×
UNCOV
860
                    return Ok(KEEP_CONNECTION_ALIVE);
×
861
                };
862

863
                // Reset the challenge, regardless of the response's success.
864
                peer_state_info.sync_challenge = None;
1✔
865

1✔
866
                // Does response match issued challenge?
1✔
867
                if !challenge_response.matches(issued_challenge) {
1✔
UNCOV
868
                    self.punish(NegativePeerSanction::InvalidSyncChallengeResponse)
×
UNCOV
869
                        .await?;
×
UNCOV
870
                    return Ok(KEEP_CONNECTION_ALIVE);
×
871
                }
1✔
872

1✔
873
                // Does response verify?
1✔
874
                let claimed_tip_height = challenge_response.tip.header.height;
1✔
875
                let now = self.now();
1✔
876
                if !challenge_response.is_valid(now).await {
1✔
UNCOV
877
                    self.punish(NegativePeerSanction::InvalidSyncChallengeResponse)
×
UNCOV
878
                        .await?;
×
UNCOV
879
                    return Ok(KEEP_CONNECTION_ALIVE);
×
880
                }
1✔
881

882
                // Does cumulative proof-of-work evolve reasonably?
883
                let own_tip_header = *self
1✔
884
                    .global_state_lock
1✔
885
                    .lock_guard()
1✔
886
                    .await
1✔
887
                    .chain
888
                    .light_state()
1✔
889
                    .header();
1✔
890
                if !challenge_response
1✔
891
                    .check_pow(self.global_state_lock.cli().network, own_tip_header.height)
1✔
892
                {
UNCOV
893
                    self.punish(NegativePeerSanction::FishyPowEvolutionChallengeResponse)
×
UNCOV
894
                        .await?;
×
UNCOV
895
                    return Ok(KEEP_CONNECTION_ALIVE);
×
896
                }
1✔
897

1✔
898
                // Is there some specific (*i.e.*, not aggregate) proof of work?
1✔
899
                if !challenge_response.check_difficulty(own_tip_header.difficulty) {
1✔
UNCOV
900
                    self.punish(NegativePeerSanction::FishyDifficultiesChallengeResponse)
×
UNCOV
901
                        .await?;
×
UNCOV
902
                    return Ok(KEEP_CONNECTION_ALIVE);
×
903
                }
1✔
904

1✔
905
                // Did it come in time?
1✔
906
                if now - issued_challenge.issued_at > SYNC_RESPONSE_TIMEOUT {
1✔
UNCOV
907
                    self.punish(NegativePeerSanction::TimedOutSyncChallengeResponse)
×
UNCOV
908
                        .await?;
×
UNCOV
909
                    return Ok(KEEP_CONNECTION_ALIVE);
×
910
                }
1✔
911

1✔
912
                info!("Successful sync challenge response; relaying peer tip info to main loop.");
1✔
913
                peer_state_info.successful_sync_challenge_response_time = Some(now);
1✔
914

1✔
915
                let mut sync_mmra_anchor = challenge_response.tip.body.block_mmr_accumulator;
1✔
916
                sync_mmra_anchor.append(issued_challenge.challenge.tip_digest);
1✔
917

1✔
918
                // Inform main loop
1✔
919
                self.to_main_tx
1✔
920
                    .send(PeerTaskToMain::AddPeerMaxBlockHeight {
1✔
921
                        peer_address: self.peer_address,
1✔
922
                        claimed_height: claimed_tip_height,
1✔
923
                        claimed_cumulative_pow: issued_challenge.accumulated_pow,
1✔
924
                        claimed_block_mmra: sync_mmra_anchor,
1✔
925
                    })
1✔
926
                    .await?;
1✔
927

928
                Ok(KEEP_CONNECTION_ALIVE)
1✔
929
            }
UNCOV
930
            PeerMessage::BlockRequestByHash(block_digest) => {
×
UNCOV
931
                match self
×
UNCOV
932
                    .global_state_lock
×
UNCOV
933
                    .lock_guard()
×
UNCOV
934
                    .await
×
935
                    .chain
936
                    .archival_state()
×
937
                    .get_block(block_digest)
×
938
                    .await?
×
939
                {
940
                    None => {
941
                        // TODO: Consider punishing here
UNCOV
942
                        warn!("Peer requested unknown block with hash {}", block_digest);
×
UNCOV
943
                        Ok(KEEP_CONNECTION_ALIVE)
×
944
                    }
UNCOV
945
                    Some(b) => {
×
UNCOV
946
                        peer.send(PeerMessage::Block(Box::new(b.try_into().unwrap())))
×
UNCOV
947
                            .await?;
×
UNCOV
948
                        Ok(KEEP_CONNECTION_ALIVE)
×
949
                    }
950
                }
951
            }
952
            PeerMessage::BlockRequestByHeight(block_height) => {
4✔
953
                let block_response = {
3✔
954
                    log_slow_scope!(fn_name!() + "::PeerMessage::BlockRequestByHeight");
4✔
955

4✔
956
                    debug!("Got BlockRequestByHeight of height {}", block_height);
4✔
957

958
                    let canonical_block_digest = self
4✔
959
                        .global_state_lock
4✔
960
                        .lock_guard()
4✔
961
                        .await
4✔
962
                        .chain
963
                        .archival_state()
4✔
964
                        .archival_block_mmr
4✔
965
                        .ammr()
4✔
966
                        .try_get_leaf(block_height.into())
4✔
967
                        .await;
4✔
968

969
                    let canonical_block_digest = match canonical_block_digest {
4✔
970
                        None => {
971
                            let own_tip_height = self
1✔
972
                                .global_state_lock
1✔
973
                                .lock_guard()
1✔
974
                                .await
1✔
975
                                .chain
976
                                .light_state()
1✔
977
                                .header()
1✔
978
                                .height;
1✔
979
                            warn!("Got block request by height ({block_height}) for unknown block. Own tip height is {own_tip_height}.");
1✔
980
                            self.punish(NegativePeerSanction::BlockRequestUnknownHeight)
1✔
981
                                .await?;
1✔
982

983
                            return Ok(KEEP_CONNECTION_ALIVE);
1✔
984
                        }
985
                        Some(digest) => digest,
3✔
986
                    };
987

988
                    let canonical_chain_block: Block = self
3✔
989
                        .global_state_lock
3✔
990
                        .lock_guard()
3✔
991
                        .await
3✔
992
                        .chain
993
                        .archival_state()
3✔
994
                        .get_block(canonical_block_digest)
3✔
995
                        .await?
3✔
996
                        .unwrap();
3✔
997

3✔
998
                    PeerMessage::Block(Box::new(canonical_chain_block.try_into().unwrap()))
3✔
999
                };
3✔
1000

3✔
1001
                debug!("Sending block");
3✔
1002
                peer.send(block_response).await?;
3✔
1003
                debug!("Sent block");
3✔
1004
                Ok(KEEP_CONNECTION_ALIVE)
3✔
1005
            }
1006
            PeerMessage::Block(t_block) => {
20✔
1007
                log_slow_scope!(fn_name!() + "::PeerMessage::Block");
20✔
1008

20✔
1009
                info!(
20✔
UNCOV
1010
                    "Got new block from peer {}, height {}, mined {}",
×
UNCOV
1011
                    self.peer_address,
×
UNCOV
1012
                    t_block.header.height,
×
UNCOV
1013
                    t_block.header.timestamp.standard_format()
×
1014
                );
1015
                let new_block_height = t_block.header.height;
20✔
1016

1017
                let block = match Block::try_from(*t_block) {
20✔
1018
                    Ok(block) => Box::new(block),
20✔
UNCOV
1019
                    Err(e) => {
×
UNCOV
1020
                        warn!("Peer sent invalid block: {e:?}");
×
UNCOV
1021
                        self.punish(NegativePeerSanction::InvalidTransferBlock)
×
UNCOV
1022
                            .await?;
×
1023

UNCOV
1024
                        return Ok(KEEP_CONNECTION_ALIVE);
×
1025
                    }
1026
                };
1027

1028
                // Update the value for the highest known height that peer possesses iff
1029
                // we are not in a fork reconciliation state.
1030
                if peer_state_info.fork_reconciliation_blocks.is_empty() {
20✔
1031
                    peer_state_info.highest_shared_block_height = new_block_height;
10✔
1032
                }
10✔
1033

1034
                self.try_ensure_path(block, peer, peer_state_info).await?;
20✔
1035

1036
                // Reward happens as part of `try_ensure_path`
1037

1038
                Ok(KEEP_CONNECTION_ALIVE)
19✔
1039
            }
1040
            PeerMessage::BlockRequestBatch(BlockRequestBatch {
1041
                known_blocks,
8✔
1042
                max_response_len,
8✔
1043
                anchor,
8✔
1044
            }) => {
8✔
1045
                debug!(
8✔
UNCOV
1046
                    "Received BlockRequestBatch from peer {}, max_response_len: {max_response_len}",
×
1047
                    self.peer_address
1048
                );
1049

1050
                if known_blocks.len() > MAX_NUM_DIGESTS_IN_BATCH_REQUEST {
8✔
UNCOV
1051
                    self.punish(NegativePeerSanction::BatchBlocksRequestTooManyDigests)
×
UNCOV
1052
                        .await?;
×
1053

UNCOV
1054
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1055
                }
8✔
1056

1057
                // The last block in the list of the peers known block is the
1058
                // earliest block, block with lowest height, the peer has
1059
                // requested. If it does not belong to canonical chain, none of
1060
                // the later will. So we can do an early abort in that case.
1061
                let least_preferred = match known_blocks.last() {
8✔
1062
                    Some(least_preferred) => *least_preferred,
8✔
1063
                    None => {
UNCOV
1064
                        self.punish(NegativePeerSanction::BatchBlocksRequestEmpty)
×
UNCOV
1065
                            .await?;
×
1066

UNCOV
1067
                        return Ok(KEEP_CONNECTION_ALIVE);
×
1068
                    }
1069
                };
1070

1071
                let state = self.global_state_lock.lock_guard().await;
8✔
1072
                let block_mmr_num_leafs = state.chain.light_state().header().height.next().into();
8✔
1073
                let luca_is_known = state
8✔
1074
                    .chain
8✔
1075
                    .archival_state()
8✔
1076
                    .block_belongs_to_canonical_chain(least_preferred)
8✔
1077
                    .await;
8✔
1078
                if !luca_is_known || anchor.num_leafs() > block_mmr_num_leafs {
8✔
UNCOV
1079
                    drop(state);
×
UNCOV
1080
                    self.punish(NegativePeerSanction::BatchBlocksUnknownRequest)
×
UNCOV
1081
                        .await?;
×
UNCOV
1082
                    peer.send(PeerMessage::UnableToSatisfyBatchRequest).await?;
×
1083

UNCOV
1084
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1085
                }
8✔
1086

1087
                // Happy case: At least *one* of the blocks referenced by peer
1088
                // is known to us.
1089
                let first_block_in_response = {
8✔
1090
                    let mut first_block_in_response: Option<BlockHeight> = None;
8✔
1091
                    for block_digest in known_blocks {
10✔
1092
                        if state
10✔
1093
                            .chain
10✔
1094
                            .archival_state()
10✔
1095
                            .block_belongs_to_canonical_chain(block_digest)
10✔
1096
                            .await
10✔
1097
                        {
1098
                            let height = state
8✔
1099
                                .chain
8✔
1100
                                .archival_state()
8✔
1101
                                .get_block_header(block_digest)
8✔
1102
                                .await
8✔
1103
                                .unwrap()
8✔
1104
                                .height;
8✔
1105
                            first_block_in_response = Some(height);
8✔
1106
                            debug!(
8✔
UNCOV
1107
                                "Found block in canonical chain for batch response: {}",
×
1108
                                block_digest
1109
                            );
1110
                            break;
8✔
1111
                        }
2✔
1112
                    }
1113

1114
                    first_block_in_response
8✔
1115
                        .expect("existence of LUCA should have been established already.")
8✔
1116
                };
8✔
1117

8✔
1118
                debug!(
8✔
UNCOV
1119
                    "Peer's most preferred block has height {first_block_in_response}.\
×
1120
                 Now building response from that height."
×
1121
                );
1122

1123
                // Get the relevant blocks, at most batch-size many, descending from the
1124
                // peer's (alleged) most canonical block. Don't exceed `max_response_len`
1125
                // or `STANDARD_BLOCK_BATCH_SIZE` number of blocks in response.
1126
                let max_response_len = cmp::min(
8✔
1127
                    max_response_len,
8✔
1128
                    self.global_state_lock.cli().sync_mode_threshold,
8✔
1129
                );
8✔
1130
                let max_response_len = cmp::max(max_response_len, MINIMUM_BLOCK_BATCH_SIZE);
8✔
1131
                let max_response_len = cmp::min(max_response_len, STANDARD_BLOCK_BATCH_SIZE);
8✔
1132

8✔
1133
                let mut digests_of_returned_blocks = Vec::with_capacity(max_response_len);
8✔
1134
                let response_start_height: u64 = first_block_in_response.into();
8✔
1135
                let mut i: u64 = 1;
8✔
1136
                while digests_of_returned_blocks.len() < max_response_len {
31✔
1137
                    let block_height = response_start_height + i;
31✔
1138
                    match state
31✔
1139
                        .chain
31✔
1140
                        .archival_state()
31✔
1141
                        .archival_block_mmr
31✔
1142
                        .ammr()
31✔
1143
                        .try_get_leaf(block_height)
31✔
1144
                        .await
31✔
1145
                    {
1146
                        Some(digest) => {
23✔
1147
                            digests_of_returned_blocks.push(digest);
23✔
1148
                        }
23✔
1149
                        None => break,
8✔
1150
                    }
1151
                    i += 1;
23✔
1152
                }
1153

1154
                let mut returned_blocks: Vec<Block> =
8✔
1155
                    Vec::with_capacity(digests_of_returned_blocks.len());
8✔
1156
                for block_digest in digests_of_returned_blocks {
31✔
1157
                    let block = state
23✔
1158
                        .chain
23✔
1159
                        .archival_state()
23✔
1160
                        .get_block(block_digest)
23✔
1161
                        .await?
23✔
1162
                        .unwrap();
23✔
1163
                    returned_blocks.push(block);
23✔
1164
                }
1165

1166
                let response = Self::batch_response(&state, returned_blocks, &anchor).await;
8✔
1167

1168
                // issue 457. do not hold lock across a peer.send(), nor self.punish()
1169
                drop(state);
8✔
1170

1171
                let Some(response) = response else {
8✔
1172
                    warn!("Unable to satisfy batch-block request");
×
1173
                    self.punish(NegativePeerSanction::BatchBlocksUnknownRequest)
×
1174
                        .await?;
×
1175
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1176
                };
1177

1178
                debug!("Returning {} blocks in batch response", response.len());
8✔
1179

1180
                let response = PeerMessage::BlockResponseBatch(response);
8✔
1181
                peer.send(response).await?;
8✔
1182

1183
                Ok(KEEP_CONNECTION_ALIVE)
8✔
1184
            }
UNCOV
1185
            PeerMessage::BlockResponseBatch(authenticated_blocks) => {
×
UNCOV
1186
                log_slow_scope!(fn_name!() + "::PeerMessage::BlockResponseBatch");
×
UNCOV
1187

×
1188
                debug!(
×
1189
                    "handling block response batch with {} blocks",
×
UNCOV
1190
                    authenticated_blocks.len()
×
1191
                );
1192

1193
                // (Alan:) why is there even a minimum?
UNCOV
1194
                if authenticated_blocks.len() < MINIMUM_BLOCK_BATCH_SIZE {
×
UNCOV
1195
                    warn!("Got smaller batch response than allowed");
×
1196
                    self.punish(NegativePeerSanction::TooShortBlockBatch)
×
1197
                        .await?;
×
1198
                    return Ok(KEEP_CONNECTION_ALIVE);
×
UNCOV
1199
                }
×
1200

1201
                // Verify that we are in fact in syncing mode
1202
                // TODO: Separate peer messages into those allowed under syncing
1203
                // and those that are not
UNCOV
1204
                let Some(sync_anchor) = self
×
1205
                    .global_state_lock
×
1206
                    .lock_guard()
×
1207
                    .await
×
1208
                    .net
1209
                    .sync_anchor
1210
                    .clone()
×
1211
                else {
UNCOV
1212
                    warn!("Received a batch of blocks without being in syncing mode");
×
UNCOV
1213
                    self.punish(NegativePeerSanction::ReceivedBatchBlocksOutsideOfSync)
×
1214
                        .await?;
×
1215
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1216
                };
1217

1218
                // Verify that the response matches the current state
1219
                // We get the latest block from the DB here since this message is
1220
                // only valid for archival nodes.
1221
                let (first_block, _) = &authenticated_blocks[0];
×
UNCOV
1222
                let first_blocks_parent_digest: Digest = first_block.header.prev_block_digest;
×
UNCOV
1223
                let most_canonical_own_block_match: Option<Block> = self
×
UNCOV
1224
                    .global_state_lock
×
UNCOV
1225
                    .lock_guard()
×
UNCOV
1226
                    .await
×
1227
                    .chain
1228
                    .archival_state()
×
1229
                    .get_block(first_blocks_parent_digest)
×
1230
                    .await
×
UNCOV
1231
                    .expect("Block lookup must succeed");
×
UNCOV
1232
                let most_canonical_own_block_match: Block = match most_canonical_own_block_match {
×
UNCOV
1233
                    Some(block) => block,
×
1234
                    None => {
UNCOV
1235
                        warn!("Got batch response with invalid start block");
×
UNCOV
1236
                        self.punish(NegativePeerSanction::BatchBlocksInvalidStartHeight)
×
1237
                            .await?;
×
1238
                        return Ok(KEEP_CONNECTION_ALIVE);
×
1239
                    }
1240
                };
1241

1242
                // Convert all blocks to Block objects
UNCOV
1243
                debug!(
×
UNCOV
1244
                    "Found own block of height {} to match received batch",
×
UNCOV
1245
                    most_canonical_own_block_match.kernel.header.height
×
1246
                );
1247
                let mut received_blocks = vec![];
×
1248
                for (t_block, membership_proof) in authenticated_blocks {
×
1249
                    let Ok(block) = Block::try_from(t_block) else {
×
1250
                        warn!("Received invalid transfer block from peer");
×
UNCOV
1251
                        self.punish(NegativePeerSanction::InvalidTransferBlock)
×
UNCOV
1252
                            .await?;
×
UNCOV
1253
                        return Ok(KEEP_CONNECTION_ALIVE);
×
1254
                    };
1255

1256
                    if !membership_proof.verify(
×
1257
                        block.header().height.into(),
×
1258
                        block.hash(),
×
UNCOV
1259
                        &sync_anchor.block_mmr.peaks(),
×
UNCOV
1260
                        sync_anchor.block_mmr.num_leafs(),
×
UNCOV
1261
                    ) {
×
UNCOV
1262
                        warn!("Authentication of received block fails relative to anchor");
×
UNCOV
1263
                        self.punish(NegativePeerSanction::InvalidBlockMmrAuthentication)
×
UNCOV
1264
                            .await?;
×
UNCOV
1265
                        return Ok(KEEP_CONNECTION_ALIVE);
×
UNCOV
1266
                    }
×
UNCOV
1267

×
UNCOV
1268
                    received_blocks.push(block);
×
1269
                }
1270

1271
                // Get the latest block that we know of and handle all received blocks
1272
                self.handle_blocks(received_blocks, most_canonical_own_block_match)
×
UNCOV
1273
                    .await?;
×
1274

1275
                // Reward happens as part of `handle_blocks`.
1276

UNCOV
1277
                Ok(KEEP_CONNECTION_ALIVE)
×
1278
            }
1279
            PeerMessage::UnableToSatisfyBatchRequest => {
UNCOV
1280
                log_slow_scope!(fn_name!() + "::PeerMessage::UnableToSatisfyBatchRequest");
×
UNCOV
1281
                warn!(
×
UNCOV
1282
                    "Peer {} reports inability to satisfy batch request.",
×
1283
                    self.peer_address
1284
                );
1285

UNCOV
1286
                Ok(KEEP_CONNECTION_ALIVE)
×
1287
            }
1288
            PeerMessage::Handshake(_) => {
1289
                log_slow_scope!(fn_name!() + "::PeerMessage::Handshake");
×
1290

×
UNCOV
1291
                // The handshake should have been sent during connection
×
UNCOV
1292
                // initialization. Here it is out of order at best, malicious at
×
1293
                // worst.
×
1294
                self.punish(NegativePeerSanction::InvalidMessage).await?;
×
1295
                Ok(KEEP_CONNECTION_ALIVE)
×
1296
            }
1297
            PeerMessage::ConnectionStatus(_) => {
1298
                log_slow_scope!(fn_name!() + "::PeerMessage::ConnectionStatus");
×
1299

×
1300
                // The connection status should have been sent during connection
×
1301
                // initialization. Here it is out of order at best, malicious at
×
1302
                // worst.
×
1303

×
1304
                self.punish(NegativePeerSanction::InvalidMessage).await?;
×
UNCOV
1305
                Ok(KEEP_CONNECTION_ALIVE)
×
1306
            }
1307
            PeerMessage::Transaction(transaction) => {
2✔
1308
                log_slow_scope!(fn_name!() + "::PeerMessage::Transaction");
2✔
1309

2✔
1310
                debug!(
2✔
UNCOV
1311
                    "`peer_loop` received following transaction from peer. {} inputs, {} outputs. Synced to mutator set hash: {}",
×
UNCOV
1312
                    transaction.kernel.inputs.len(),
×
1313
                    transaction.kernel.outputs.len(),
×
1314
                    transaction.kernel.mutator_set_hash
×
1315
                );
1316

1317
                let transaction: Transaction = (*transaction).into();
2✔
1318

2✔
1319
                // 1. If transaction is invalid, punish.
2✔
1320
                if !transaction.is_valid().await {
2✔
UNCOV
1321
                    warn!("Received invalid tx");
×
UNCOV
1322
                    self.punish(NegativePeerSanction::InvalidTransaction)
×
1323
                        .await?;
×
1324
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1325
                }
2✔
1326

2✔
1327
                // 2. If transaction has coinbase, punish.
2✔
1328
                // Transactions received from peers have not been mined yet.
2✔
1329
                // Only the miner is allowed to produce transactions with non-empty coinbase fields.
2✔
1330
                if transaction.kernel.coinbase.is_some() {
2✔
1331
                    warn!("Received non-mined transaction with coinbase.");
×
1332
                    self.punish(NegativePeerSanction::NonMinedTransactionHasCoinbase)
×
UNCOV
1333
                        .await?;
×
UNCOV
1334
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1335
                }
2✔
1336

2✔
1337
                // 3. If negative fee, punish.
2✔
1338
                if transaction.kernel.fee.is_negative() {
2✔
UNCOV
1339
                    warn!("Received negative-fee transaction.");
×
UNCOV
1340
                    self.punish(NegativePeerSanction::TransactionWithNegativeFee)
×
UNCOV
1341
                        .await?;
×
UNCOV
1342
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1343
                }
2✔
1344

2✔
1345
                // 4. If transaction is already known, ignore.
2✔
1346
                if self
2✔
1347
                    .global_state_lock
2✔
1348
                    .lock_guard()
2✔
1349
                    .await
2✔
1350
                    .mempool
1351
                    .contains_with_higher_proof_quality(
1352
                        transaction.kernel.txid(),
2✔
1353
                        transaction.proof.proof_quality()?,
2✔
1354
                    )
1355
                {
UNCOV
1356
                    warn!("Received transaction that was already known");
×
1357

1358
                    // We received a transaction that we *probably* haven't requested.
1359
                    // Consider punishing here, if this is abused.
UNCOV
1360
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1361
                }
2✔
1362

1363
                // 5. if transaction is not confirmable, punish.
1364
                let mutator_set_accumulator_after = self
2✔
1365
                    .global_state_lock
2✔
1366
                    .lock_guard()
2✔
1367
                    .await
2✔
1368
                    .chain
1369
                    .light_state()
2✔
1370
                    .mutator_set_accumulator_after();
2✔
1371
                if !transaction.is_confirmable_relative_to(&mutator_set_accumulator_after) {
2✔
UNCOV
1372
                    warn!(
×
UNCOV
1373
                        "Received unconfirmable transaction with TXID {}. Unconfirmable because:",
×
UNCOV
1374
                        transaction.kernel.txid()
×
1375
                    );
1376
                    // get fine-grained error code for informative logging
UNCOV
1377
                    let confirmability_error_code = transaction
×
UNCOV
1378
                        .kernel
×
UNCOV
1379
                        .is_confirmable_relative_to(&mutator_set_accumulator_after);
×
UNCOV
1380
                    match confirmability_error_code {
×
UNCOV
1381
                        Ok(_) => unreachable!(),
×
UNCOV
1382
                        Err(TransactionConfirmabilityError::InvalidRemovalRecord(index)) => {
×
UNCOV
1383
                            warn!("invalid removal record (at index {index})");
×
UNCOV
1384
                            let invalid_removal_record = transaction.kernel.inputs[index].clone();
×
UNCOV
1385
                            let removal_record_error_code = invalid_removal_record
×
UNCOV
1386
                                .validate_inner(&mutator_set_accumulator_after);
×
UNCOV
1387
                            debug!(
×
UNCOV
1388
                                "Absolute index set of removal record {index}: {:?}",
×
1389
                                invalid_removal_record.absolute_indices
1390
                            );
UNCOV
1391
                            match removal_record_error_code {
×
UNCOV
1392
                                Ok(_) => unreachable!(),
×
1393
                                Err(RemovalRecordValidityError::AbsentAuthenticatedChunk) => {
UNCOV
1394
                                    debug!("invalid because membership proof is missing");
×
1395
                                }
1396
                                Err(RemovalRecordValidityError::InvalidSwbfiMmrMp {
UNCOV
1397
                                    chunk_index,
×
UNCOV
1398
                                }) => {
×
UNCOV
1399
                                    debug!("invalid because membership proof for chunk index {chunk_index} is invalid");
×
1400
                                }
1401
                            };
UNCOV
1402
                            self.punish(NegativePeerSanction::UnconfirmableTransaction)
×
UNCOV
1403
                                .await?;
×
UNCOV
1404
                            return Ok(KEEP_CONNECTION_ALIVE);
×
1405
                        }
1406
                        Err(TransactionConfirmabilityError::DuplicateInputs) => {
UNCOV
1407
                            warn!("duplicate inputs");
×
UNCOV
1408
                            self.punish(NegativePeerSanction::DoubleSpendingTransaction)
×
UNCOV
1409
                                .await?;
×
UNCOV
1410
                            return Ok(KEEP_CONNECTION_ALIVE);
×
1411
                        }
UNCOV
1412
                        Err(TransactionConfirmabilityError::AlreadySpentInput(index)) => {
×
UNCOV
1413
                            warn!("already spent input (at index {index})");
×
UNCOV
1414
                            self.punish(NegativePeerSanction::DoubleSpendingTransaction)
×
UNCOV
1415
                                .await?;
×
UNCOV
1416
                            return Ok(KEEP_CONNECTION_ALIVE);
×
1417
                        }
1418
                    };
1419
                }
2✔
1420

2✔
1421
                // If transaction cannot be applied to mutator set, punish.
2✔
1422
                // I don't think this can happen when above checks pass but we include
2✔
1423
                // the check to ensure that transaction can be applied.
2✔
1424
                let ms_update = MutatorSetUpdate::new(
2✔
1425
                    transaction.kernel.inputs.clone(),
2✔
1426
                    transaction.kernel.outputs.clone(),
2✔
1427
                );
2✔
1428
                let can_apply = ms_update
2✔
1429
                    .apply_to_accumulator(&mut mutator_set_accumulator_after.clone())
2✔
1430
                    .is_ok();
2✔
1431
                if !can_apply {
2✔
1432
                    warn!("Cannot apply transaction to current mutator set.");
×
1433
                    warn!("Transaction ID: {}", transaction.kernel.txid());
×
1434
                    self.punish(NegativePeerSanction::CannotApplyTransactionToMutatorSet)
×
1435
                        .await?;
×
UNCOV
1436
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1437
                }
2✔
1438

2✔
1439
                let tx_timestamp = transaction.kernel.timestamp;
2✔
1440

2✔
1441
                // 6. Ignore if transaction is too old
2✔
1442
                let now = self.now();
2✔
1443
                if tx_timestamp < now - Timestamp::seconds(MEMPOOL_TX_THRESHOLD_AGE_IN_SECS) {
2✔
1444
                    // TODO: Consider punishing here
1445
                    warn!("Received too old tx");
×
UNCOV
1446
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1447
                }
2✔
1448

2✔
1449
                // 7. Ignore if transaction is too far into the future
2✔
1450
                if tx_timestamp
2✔
1451
                    > now + Timestamp::seconds(MEMPOOL_IGNORE_TRANSACTIONS_THIS_MANY_SECS_AHEAD)
2✔
1452
                {
1453
                    // TODO: Consider punishing here
UNCOV
1454
                    warn!("Received tx too far into the future. Got timestamp: {tx_timestamp:?}");
×
UNCOV
1455
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1456
                }
2✔
1457

1458
                // Otherwise, relay to main
1459
                let pt2m_transaction = PeerTaskToMainTransaction {
2✔
1460
                    transaction,
2✔
1461
                    confirmable_for_block: self
2✔
1462
                        .global_state_lock
2✔
1463
                        .lock_guard()
2✔
1464
                        .await
2✔
1465
                        .chain
1466
                        .light_state()
2✔
1467
                        .hash(),
2✔
1468
                };
2✔
1469
                self.to_main_tx
2✔
1470
                    .send(PeerTaskToMain::Transaction(Box::new(pt2m_transaction)))
2✔
1471
                    .await?;
2✔
1472

1473
                Ok(KEEP_CONNECTION_ALIVE)
2✔
1474
            }
1475
            PeerMessage::TransactionNotification(tx_notification) => {
6✔
1476
                // addresses #457
6✔
1477
                // new scope for state read-lock to avoid holding across peer.send()
6✔
1478
                {
6✔
1479
                    log_slow_scope!(fn_name!() + "::PeerMessage::TransactionNotification");
6✔
1480

1481
                    // 1. Ignore if we already know this transaction, and
1482
                    // the proof quality is not higher than what we already know.
1483
                    let state = self.global_state_lock.lock_guard().await;
6✔
1484
                    let transaction_of_same_or_higher_proof_quality_is_known =
6✔
1485
                        state.mempool.contains_with_higher_proof_quality(
6✔
1486
                            tx_notification.txid,
6✔
1487
                            tx_notification.proof_quality,
6✔
1488
                        );
6✔
1489
                    if transaction_of_same_or_higher_proof_quality_is_known {
6✔
1490
                        debug!("transaction with same or higher proof quality was already known");
4✔
1491
                        return Ok(KEEP_CONNECTION_ALIVE);
4✔
1492
                    }
2✔
1493

2✔
1494
                    // Only accept transactions that do not require executing
2✔
1495
                    // `update`.
2✔
1496
                    if state
2✔
1497
                        .chain
2✔
1498
                        .light_state()
2✔
1499
                        .mutator_set_accumulator_after()
2✔
1500
                        .hash()
2✔
1501
                        != tx_notification.mutator_set_hash
2✔
1502
                    {
UNCOV
1503
                        debug!("transaction refers to non-canonical mutator set state");
×
UNCOV
1504
                        return Ok(KEEP_CONNECTION_ALIVE);
×
1505
                    }
2✔
1506
                }
2✔
1507

2✔
1508
                // 2. Request the actual `Transaction` from peer
2✔
1509
                debug!("requesting transaction from peer");
2✔
1510
                peer.send(PeerMessage::TransactionRequest(tx_notification.txid))
2✔
1511
                    .await?;
2✔
1512

1513
                Ok(KEEP_CONNECTION_ALIVE)
2✔
1514
            }
1515
            PeerMessage::TransactionRequest(transaction_identifier) => {
×
1516
                if let Some(transaction) = self
×
UNCOV
1517
                    .global_state_lock
×
UNCOV
1518
                    .lock_guard()
×
1519
                    .await
×
1520
                    .mempool
UNCOV
1521
                    .get(transaction_identifier)
×
1522
                {
UNCOV
1523
                    if let Ok(transfer_transaction) = transaction.try_into() {
×
UNCOV
1524
                        peer.send(PeerMessage::Transaction(Box::new(transfer_transaction)))
×
UNCOV
1525
                            .await?;
×
1526
                    } else {
UNCOV
1527
                        warn!("Peer requested transaction that cannot be converted to transfer object");
×
1528
                    }
UNCOV
1529
                }
×
1530

UNCOV
1531
                Ok(KEEP_CONNECTION_ALIVE)
×
1532
            }
1533
            PeerMessage::BlockProposalNotification(block_proposal_notification) => {
1✔
1534
                let verdict = self
1✔
1535
                    .global_state_lock
1✔
1536
                    .lock_guard()
1✔
1537
                    .await
1✔
1538
                    .favor_incoming_block_proposal(
1✔
1539
                        block_proposal_notification.height,
1✔
1540
                        block_proposal_notification.guesser_fee,
1✔
1541
                    );
1✔
1542
                match verdict {
1✔
1543
                    Ok(_) => {
1544
                        peer.send(PeerMessage::BlockProposalRequest(
1✔
1545
                            BlockProposalRequest::new(block_proposal_notification.body_mast_hash),
1✔
1546
                        ))
1✔
1547
                        .await?
1✔
1548
                    }
UNCOV
1549
                    Err(reject_reason) => info!(
×
UNCOV
1550
                        "Got unfavorable block proposal notification from {} peer; rejecting. Reason:\n{reject_reason}",
×
1551
                        self.peer_address
1552
                    ),
1553
                }
1554

1555
                Ok(KEEP_CONNECTION_ALIVE)
1✔
1556
            }
UNCOV
1557
            PeerMessage::BlockProposalRequest(block_proposal_request) => {
×
UNCOV
1558
                let matching_proposal = self
×
UNCOV
1559
                    .global_state_lock
×
UNCOV
1560
                    .lock_guard()
×
UNCOV
1561
                    .await
×
1562
                    .mining_state
1563
                    .block_proposal
UNCOV
1564
                    .filter(|x| x.body().mast_hash() == block_proposal_request.body_mast_hash)
×
UNCOV
1565
                    .map(|x| x.to_owned());
×
UNCOV
1566
                if let Some(proposal) = matching_proposal {
×
UNCOV
1567
                    peer.send(PeerMessage::BlockProposal(Box::new(proposal)))
×
UNCOV
1568
                        .await?;
×
1569
                } else {
1570
                    self.punish(NegativePeerSanction::BlockProposalNotFound)
×
1571
                        .await?;
×
1572
                }
1573

1574
                Ok(KEEP_CONNECTION_ALIVE)
×
1575
            }
1576
            PeerMessage::BlockProposal(block) => {
1✔
1577
                info!("Got block proposal from peer.");
1✔
1578

1579
                let should_punish = {
1✔
1580
                    log_slow_scope!(fn_name!() + "::PeerMessage::BlockProposal::should_punish");
1✔
1581

1582
                    let (verdict, tip) = {
1✔
1583
                        let state = self.global_state_lock.lock_guard().await;
1✔
1584

1585
                        let verdict = state.favor_incoming_block_proposal(
1✔
1586
                            block.header().height,
1✔
1587
                            block.total_guesser_reward(),
1✔
1588
                        );
1✔
1589
                        let tip = state.chain.light_state().to_owned();
1✔
1590
                        (verdict, tip)
1✔
1591
                    };
1592

1593
                    if let Err(rejection_reason) = verdict {
1✔
UNCOV
1594
                        match rejection_reason {
×
1595
                            // no need to punish and log if the fees are equal.  we just ignore the incoming proposal.
1596
                            BlockProposalRejectError::InsufficientFee { current, received }
×
1597
                                if Some(received) == current =>
×
1598
                            {
×
1599
                                debug!("ignoring new block proposal because the fee is equal to the present one");
×
1600
                                None
×
1601
                            }
1602
                            _ => {
1603
                                warn!("Rejecting new block proposal:\n{rejection_reason}");
×
1604
                                Some(NegativePeerSanction::NonFavorableBlockProposal)
×
1605
                            }
1606
                        }
1607
                    } else {
1608
                        // Verify validity and that proposal is child of current tip
1609
                        if block.is_valid(&tip, self.now()).await {
1✔
1610
                            None // all is well, no punishment.
1✔
1611
                        } else {
UNCOV
1612
                            Some(NegativePeerSanction::InvalidBlockProposal)
×
1613
                        }
1614
                    }
1615
                };
1616

1617
                if let Some(sanction) = should_punish {
1✔
1618
                    self.punish(sanction).await?;
×
1619
                } else {
1620
                    self.send_to_main(PeerTaskToMain::BlockProposal(block), line!())
1✔
1621
                        .await?;
1✔
1622

1623
                    // Valuable, new, hard-to-produce information. Reward peer.
1624
                    self.reward(PositivePeerSanction::NewBlockProposal).await?;
1✔
1625
                }
1626

1627
                Ok(KEEP_CONNECTION_ALIVE)
1✔
1628
            }
1629
            PeerMessage::BootstrapStatus(status) => {
20✔
1630
                self.handle_bootstrap_status_message(status).await?;
20✔
1631
                Ok(KEEP_CONNECTION_ALIVE)
20✔
1632
            }
1633
        }
1634
    }
129✔
1635

1636
    /// send msg to main via mpsc channel `to_main_tx` and logs if slow.
1637
    ///
1638
    /// the channel could potentially fill up in which case the send() will
1639
    /// block until there is capacity.  we wrap the send() so we can log if
1640
    /// that ever happens to the extent it passes slow-scope threshold.
1641
    async fn send_to_main(
1✔
1642
        &self,
1✔
1643
        msg: PeerTaskToMain,
1✔
1644
        line: u32,
1✔
1645
    ) -> Result<(), tokio::sync::mpsc::error::SendError<PeerTaskToMain>> {
1✔
1646
        // we measure across the send() in case the channel ever fills up.
1✔
1647
        log_slow_scope!(fn_name!() + &format!("peer_loop.rs:{}", line));
1✔
1648

1✔
1649
        self.to_main_tx.send(msg).await
1✔
1650
    }
1✔
1651

1652
    /// Handle message from main task. The boolean return value indicates if
1653
    /// the connection should be closed.
1654
    ///
1655
    /// Locking:
1656
    ///   * acquires `global_state_lock` for write via Self::punish()
1657
    async fn handle_main_task_message<S>(
×
1658
        &mut self,
×
1659
        msg: MainToPeerTask,
×
1660
        peer: &mut S,
×
1661
        peer_state_info: &mut MutablePeerState,
×
1662
    ) -> Result<bool>
×
UNCOV
1663
    where
×
1664
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
×
1665
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
×
1666
        <S as TryStream>::Error: std::error::Error,
×
1667
    {
×
1668
        debug!("Handling {} message from main in peer loop", msg.get_type());
×
1669
        match msg {
×
1670
            MainToPeerTask::Block(block) => {
×
1671
                // We don't currently differentiate whether a new block came from a peer, or from our
×
UNCOV
1672
                // own miner. It's always shared through this logic.
×
UNCOV
1673
                let new_block_height = block.kernel.header.height;
×
1674
                if new_block_height > peer_state_info.highest_shared_block_height {
×
UNCOV
1675
                    debug!("Sending PeerMessage::BlockNotification");
×
UNCOV
1676
                    peer_state_info.highest_shared_block_height = new_block_height;
×
UNCOV
1677
                    peer.send(PeerMessage::BlockNotification(block.as_ref().into()))
×
UNCOV
1678
                        .await?;
×
UNCOV
1679
                    debug!("Sent PeerMessage::BlockNotification");
×
UNCOV
1680
                }
×
UNCOV
1681
                Ok(KEEP_CONNECTION_ALIVE)
×
1682
            }
UNCOV
1683
            MainToPeerTask::RequestBlockBatch(batch_block_request) => {
×
UNCOV
1684
                // Only ask one of the peers about the batch of blocks
×
UNCOV
1685
                if batch_block_request.peer_addr_target != self.peer_address {
×
UNCOV
1686
                    return Ok(KEEP_CONNECTION_ALIVE);
×
UNCOV
1687
                }
×
UNCOV
1688

×
UNCOV
1689
                let max_response_len = std::cmp::min(
×
UNCOV
1690
                    STANDARD_BLOCK_BATCH_SIZE,
×
UNCOV
1691
                    self.global_state_lock.cli().sync_mode_threshold,
×
UNCOV
1692
                );
×
UNCOV
1693

×
UNCOV
1694
                peer.send(PeerMessage::BlockRequestBatch(BlockRequestBatch {
×
UNCOV
1695
                    known_blocks: batch_block_request.known_blocks,
×
1696
                    max_response_len,
×
1697
                    anchor: batch_block_request.anchor_mmr,
×
1698
                }))
×
1699
                .await?;
×
1700

UNCOV
1701
                Ok(KEEP_CONNECTION_ALIVE)
×
1702
            }
1703
            MainToPeerTask::PeerSynchronizationTimeout(socket_addr) => {
×
1704
                log_slow_scope!(fn_name!() + "::MainToPeerTask::PeerSynchronizationTimeout");
×
UNCOV
1705

×
UNCOV
1706
                if self.peer_address != socket_addr {
×
UNCOV
1707
                    return Ok(KEEP_CONNECTION_ALIVE);
×
UNCOV
1708
                }
×
UNCOV
1709

×
UNCOV
1710
                self.punish(NegativePeerSanction::SynchronizationTimeout)
×
1711
                    .await?;
×
1712

1713
                // If this peer failed the last synchronization attempt, we only
1714
                // sanction, we don't disconnect.
UNCOV
1715
                Ok(KEEP_CONNECTION_ALIVE)
×
1716
            }
1717
            MainToPeerTask::MakePeerDiscoveryRequest => {
1718
                peer.send(PeerMessage::PeerListRequest).await?;
×
UNCOV
1719
                Ok(KEEP_CONNECTION_ALIVE)
×
1720
            }
UNCOV
1721
            MainToPeerTask::Disconnect(peer_address) => {
×
UNCOV
1722
                log_slow_scope!(fn_name!() + "::MainToPeerTask::Disconnect");
×
UNCOV
1723

×
UNCOV
1724
                // Only disconnect from the peer the main task requested a disconnect for.
×
UNCOV
1725
                if peer_address != self.peer_address {
×
UNCOV
1726
                    return Ok(KEEP_CONNECTION_ALIVE);
×
UNCOV
1727
                }
×
UNCOV
1728
                self.register_peer_disconnection().await;
×
1729

UNCOV
1730
                Ok(DISCONNECT_CONNECTION)
×
1731
            }
1732
            MainToPeerTask::DisconnectAll() => {
UNCOV
1733
                self.register_peer_disconnection().await;
×
1734

UNCOV
1735
                Ok(DISCONNECT_CONNECTION)
×
1736
            }
UNCOV
1737
            MainToPeerTask::MakeSpecificPeerDiscoveryRequest(target_socket_addr) => {
×
UNCOV
1738
                if target_socket_addr == self.peer_address {
×
UNCOV
1739
                    peer.send(PeerMessage::PeerListRequest).await?;
×
UNCOV
1740
                }
×
1741
                Ok(KEEP_CONNECTION_ALIVE)
×
1742
            }
1743
            MainToPeerTask::TransactionNotification(transaction_notification) => {
×
1744
                debug!("Sending PeerMessage::TransactionNotification");
×
1745
                peer.send(PeerMessage::TransactionNotification(
×
1746
                    transaction_notification,
×
1747
                ))
×
1748
                .await?;
×
1749
                debug!("Sent PeerMessage::TransactionNotification");
×
1750
                Ok(KEEP_CONNECTION_ALIVE)
×
1751
            }
1752
            MainToPeerTask::BlockProposalNotification(block_proposal_notification) => {
×
1753
                debug!("Sending PeerMessage::BlockProposalNotification");
×
UNCOV
1754
                peer.send(PeerMessage::BlockProposalNotification(
×
UNCOV
1755
                    block_proposal_notification,
×
1756
                ))
×
1757
                .await?;
×
UNCOV
1758
                debug!("Sent PeerMessage::BlockProposalNotification");
×
UNCOV
1759
                Ok(KEEP_CONNECTION_ALIVE)
×
1760
            }
1761
        }
UNCOV
1762
    }
×
1763

1764
    /// Loop for the peer tasks. Awaits either a message from the peer over TCP,
1765
    /// or a message from main over the main-to-peer-tasks broadcast channel.
1766
    async fn run<S>(
59✔
1767
        &mut self,
59✔
1768
        mut peer: S,
59✔
1769
        mut from_main_rx: broadcast::Receiver<MainToPeerTask>,
59✔
1770
        peer_state_info: &mut MutablePeerState,
59✔
1771
    ) -> Result<()>
59✔
1772
    where
59✔
1773
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
59✔
1774
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
59✔
1775
        <S as TryStream>::Error: std::error::Error,
59✔
1776
    {
59✔
1777
        loop {
1778
            select! {
129✔
1779
                // Handle peer messages
1780
                peer_message = peer.try_next() => {
129✔
1781
                    let peer_address = self.peer_address;
129✔
1782
                    let peer_message = match peer_message {
129✔
1783
                        Ok(message) => message,
129✔
UNCOV
1784
                        Err(err) => {
×
UNCOV
1785
                            let msg = format!("Error when receiving from peer: {peer_address}");
×
UNCOV
1786
                            error!("{msg}. Error: {err}");
×
UNCOV
1787
                            bail!("{msg}. Closing connection.");
×
1788
                        }
1789
                    };
1790
                    let Some(peer_message) = peer_message else {
129✔
UNCOV
1791
                        info!("Peer {peer_address} closed connection.");
×
UNCOV
1792
                        break;
×
1793
                    };
1794

1795
                    let syncing =
129✔
1796
                        self.global_state_lock.lock(|s| s.net.sync_anchor.is_some()).await;
129✔
1797
                    let message_type = peer_message.get_type();
129✔
1798
                    if peer_message.ignore_during_sync() && syncing {
129✔
UNCOV
1799
                        debug!(
×
UNCOV
1800
                            "Ignoring {message_type} message when syncing, from {peer_address}",
×
1801
                        );
UNCOV
1802
                        continue;
×
1803
                    }
129✔
1804
                    if peer_message.ignore_when_not_sync() && !syncing {
129✔
UNCOV
1805
                        debug!(
×
UNCOV
1806
                            "Ignoring {message_type} message when not syncing, from {peer_address}",
×
1807
                        );
UNCOV
1808
                        continue;
×
1809
                    }
129✔
1810

129✔
1811
                    match self
129✔
1812
                        .handle_peer_message(peer_message, &mut peer, peer_state_info)
129✔
1813
                        .await
129✔
1814
                    {
1815
                        Ok(false) => {}
70✔
1816
                        Ok(true) => {
1817
                            info!("Closing connection to {peer_address}");
58✔
1818
                            break;
58✔
1819
                        }
1820
                        Err(err) => {
1✔
1821
                            warn!("Closing connection to {peer_address} because of error {err}.");
1✔
1822
                            bail!("{err}");
1✔
1823
                        }
1824
                    };
1825
                }
1826

1827
                // Handle messages from main task
1828
                main_msg_res = from_main_rx.recv() => {
129✔
UNCOV
1829
                    let main_msg = main_msg_res
×
UNCOV
1830
                        .unwrap_or_else(|e| panic!("Failed to read from main loop: {e}"));
×
UNCOV
1831
                    let close_connection = self
×
UNCOV
1832
                        .handle_main_task_message(main_msg, &mut peer, peer_state_info)
×
UNCOV
1833
                        .await
×
UNCOV
1834
                        .unwrap_or_else(|err| {
×
UNCOV
1835
                            warn!("handle_main_task_message returned an error: {err}");
×
UNCOV
1836
                            true
×
1837
                        });
×
UNCOV
1838

×
UNCOV
1839
                    if close_connection {
×
UNCOV
1840
                        info!(
×
1841
                            "handle_main_task_message is closing the connection to {}",
×
1842
                            self.peer_address
1843
                        );
UNCOV
1844
                        break;
×
UNCOV
1845
                    }
×
1846
                }
1847
            }
1848
        }
1849

1850
        Ok(())
58✔
1851
    }
59✔
1852

1853
    /// Function called before entering the peer loop. Reads the potentially stored
1854
    /// peer standing from the database and does other book-keeping before entering
1855
    /// its final resting place: the `peer_loop`. Note that the peer has already been
1856
    /// accepted for a connection for this loop to be entered. So we don't need
1857
    /// to check the standing again.
1858
    ///
1859
    /// Locking:
1860
    ///   * acquires `global_state_lock` for write
1861
    pub(crate) async fn run_wrapper<S>(
51✔
1862
        &mut self,
51✔
1863
        mut peer: S,
51✔
1864
        from_main_rx: broadcast::Receiver<MainToPeerTask>,
51✔
1865
    ) -> Result<()>
51✔
1866
    where
51✔
1867
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
51✔
1868
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
51✔
1869
        <S as TryStream>::Error: std::error::Error,
51✔
1870
    {
51✔
1871
        const TIME_DIFFERENCE_WARN_THRESHOLD_IN_SECONDS: i128 = 120;
1872

1873
        let cli_args = self.global_state_lock.cli().clone();
51✔
1874
        let global_state = self.global_state_lock.lock_guard().await;
51✔
1875

1876
        let standing = global_state
51✔
1877
            .net
51✔
1878
            .peer_databases
51✔
1879
            .peer_standings
51✔
1880
            .get(self.peer_address.ip())
51✔
1881
            .await
51✔
1882
            .unwrap_or_else(|| PeerStanding::new(cli_args.peer_tolerance));
51✔
1883

51✔
1884
        // Add peer to peer map
51✔
1885
        let peer_connection_info = PeerConnectionInfo::new(
51✔
1886
            self.peer_handshake_data.listen_port,
51✔
1887
            self.peer_address,
51✔
1888
            self.inbound_connection,
51✔
1889
        );
51✔
1890
        let new_peer = PeerInfo::new(
51✔
1891
            peer_connection_info,
51✔
1892
            &self.peer_handshake_data,
51✔
1893
            SystemTime::now(),
51✔
1894
            cli_args.peer_tolerance,
51✔
1895
        )
51✔
1896
        .with_standing(standing);
51✔
1897

51✔
1898
        // If timestamps are different, we currently just log a warning.
51✔
1899
        let peer_clock_ahead_in_seconds = new_peer.time_difference_in_seconds();
51✔
1900
        let own_clock_ahead_in_seconds = -peer_clock_ahead_in_seconds;
51✔
1901
        if peer_clock_ahead_in_seconds > TIME_DIFFERENCE_WARN_THRESHOLD_IN_SECONDS
51✔
1902
            || own_clock_ahead_in_seconds > TIME_DIFFERENCE_WARN_THRESHOLD_IN_SECONDS
51✔
1903
        {
1904
            let own_datetime_utc: DateTime<Utc> =
×
UNCOV
1905
                new_peer.own_timestamp_connection_established.into();
×
1906
            let peer_datetime_utc: DateTime<Utc> =
×
1907
                new_peer.peer_timestamp_connection_established.into();
×
UNCOV
1908
            warn!(
×
UNCOV
1909
                "New peer {} disagrees with us about time. Peer reports time {} but our clock at handshake was {}.",
×
UNCOV
1910
                new_peer.connected_address(),
×
UNCOV
1911
                peer_datetime_utc.format("%Y-%m-%d %H:%M:%S"),
×
UNCOV
1912
                own_datetime_utc.format("%Y-%m-%d %H:%M:%S"));
×
1913
        }
51✔
1914

1915
        // There is potential for a race-condition in the peer_map here, as we've previously
1916
        // counted the number of entries and checked if instance ID was already connected. But
1917
        // this check could have been invalidated by other tasks so we perform it again
1918

1919
        if global_state
51✔
1920
            .net
51✔
1921
            .peer_map
51✔
1922
            .values()
51✔
1923
            .any(|pi| pi.instance_id() == self.peer_handshake_data.instance_id)
51✔
1924
        {
UNCOV
1925
            bail!("Attempted to connect to already connected peer. Aborting connection.");
×
1926
        }
51✔
1927

51✔
1928
        if global_state.net.peer_map.len() >= cli_args.max_num_peers {
51✔
UNCOV
1929
            bail!("Attempted to connect to more peers than allowed. Aborting connection.");
×
1930
        }
51✔
1931

51✔
1932
        if global_state.net.peer_map.contains_key(&self.peer_address) {
51✔
1933
            // This shouldn't be possible, unless the peer reports a different instance ID than
1934
            // for the other connection. Only a malignant client would do that.
UNCOV
1935
            bail!("Already connected to peer. Aborting connection");
×
1936
        }
51✔
1937
        drop(global_state);
51✔
1938

51✔
1939
        self.global_state_lock
51✔
1940
            .lock_mut(|s| s.net.peer_map.insert(self.peer_address, new_peer))
51✔
1941
            .await;
51✔
1942

1943
        // `MutablePeerState` contains the part of the peer-loop's state that is mutable
1944
        let mut peer_state = MutablePeerState::new(self.peer_handshake_data.tip_header.height);
51✔
1945

51✔
1946
        // If peer indicates more canonical block, request a block notification to catch up ASAP
51✔
1947
        if self.peer_handshake_data.tip_header.cumulative_proof_of_work
51✔
1948
            > self
51✔
1949
                .global_state_lock
51✔
1950
                .lock_guard()
51✔
1951
                .await
51✔
1952
                .chain
1953
                .light_state()
51✔
1954
                .kernel
1955
                .header
1956
                .cumulative_proof_of_work
1957
        {
1958
            // Send block notification request to catch up ASAP, in case we're
1959
            // behind the newly-connected peer.
UNCOV
1960
            peer.send(PeerMessage::BlockNotificationRequest).await?;
×
1961
        }
51✔
1962

1963
        let res = self.run(peer, from_main_rx, &mut peer_state).await;
51✔
1964
        debug!("Exited peer loop for {}", self.peer_address);
51✔
1965

1966
        close_peer_connected_callback(
51✔
1967
            self.global_state_lock.clone(),
51✔
1968
            self.peer_address,
51✔
1969
            &self.to_main_tx,
51✔
1970
        )
51✔
1971
        .await;
51✔
1972

1973
        debug!("Ending peer loop for {}", self.peer_address);
51✔
1974

1975
        // Return any error that `run` returned. Returning and not suppressing errors is a quite nice
1976
        // feature to have for testing purposes.
1977
        res
51✔
1978
    }
51✔
1979

1980
    /// Register graceful peer disconnection in the global state.
1981
    ///
1982
    /// See also [`NetworkingState::register_peer_disconnection`][1].
1983
    ///
1984
    /// # Locking:
1985
    ///   * acquires `global_state_lock` for write
1986
    ///
1987
    /// [1]: crate::models::state::networking_state::NetworkingState::register_peer_disconnection
UNCOV
1988
    async fn register_peer_disconnection(&mut self) {
×
UNCOV
1989
        let peer_id = self.peer_handshake_data.instance_id;
×
UNCOV
1990
        self.global_state_lock
×
UNCOV
1991
            .lock_guard_mut()
×
UNCOV
1992
            .await
×
1993
            .net
UNCOV
1994
            .register_peer_disconnection(peer_id, SystemTime::now());
×
UNCOV
1995
    }
×
1996
}
1997

1998
#[cfg(test)]
1999
mod peer_loop_tests {
2000
    use rand::rngs::StdRng;
2001
    use rand::Rng;
2002
    use rand::SeedableRng;
2003
    use tokio::sync::mpsc::error::TryRecvError;
2004
    use tracing_test::traced_test;
2005

2006
    use super::*;
2007
    use crate::config_models::cli_args;
2008
    use crate::config_models::network::Network;
2009
    use crate::job_queue::triton_vm::TritonVmJobQueue;
2010
    use crate::models::blockchain::block::block_header::TARGET_BLOCK_INTERVAL;
2011
    use crate::models::blockchain::type_scripts::native_currency_amount::NativeCurrencyAmount;
2012
    use crate::models::peer::peer_block_notifications::PeerBlockNotification;
2013
    use crate::models::peer::transaction_notification::TransactionNotification;
2014
    use crate::models::state::mempool::TransactionOrigin;
2015
    use crate::models::state::tx_proving_capability::TxProvingCapability;
2016
    use crate::models::state::wallet::utxo_notification::UtxoNotificationMedium;
2017
    use crate::models::state::wallet::wallet_entropy::WalletEntropy;
2018
    use crate::tests::shared::fake_valid_block_for_tests;
2019
    use crate::tests::shared::fake_valid_sequence_of_blocks_for_tests;
2020
    use crate::tests::shared::get_dummy_handshake_data_for_genesis;
2021
    use crate::tests::shared::get_dummy_peer_connection_data_genesis;
2022
    use crate::tests::shared::get_dummy_socket_address;
2023
    use crate::tests::shared::get_test_genesis_setup;
2024
    use crate::tests::shared::Action;
2025
    use crate::tests::shared::Mock;
2026

UNCOV
2027
    #[traced_test]
×
2028
    #[tokio::test]
2029
    async fn test_peer_loop_bye() -> Result<()> {
1✔
2030
        let mock = Mock::new(vec![Action::Read(PeerMessage::Bye)]);
1✔
2031

1✔
2032
        let (peer_broadcast_tx, _from_main_rx_clone, to_main_tx, _to_main_rx1, state_lock, hsd) =
1✔
2033
            get_test_genesis_setup(Network::Alpha, 2, cli_args::Args::default()).await?;
1✔
2034

1✔
2035
        let peer_address = get_dummy_socket_address(2);
1✔
2036
        let from_main_rx_clone = peer_broadcast_tx.subscribe();
1✔
2037
        let mut peer_loop_handler =
1✔
2038
            PeerLoopHandler::new(to_main_tx, state_lock.clone(), peer_address, hsd, true, 1);
1✔
2039
        peer_loop_handler
1✔
2040
            .run_wrapper(mock, from_main_rx_clone)
1✔
2041
            .await?;
1✔
2042

1✔
2043
        assert_eq!(
1✔
2044
            2,
1✔
2045
            state_lock.lock_guard().await.net.peer_map.len(),
1✔
2046
            "peer map length must be back to 2 after goodbye"
1✔
2047
        );
1✔
2048

1✔
2049
        Ok(())
1✔
2050
    }
1✔
2051

UNCOV
2052
    #[traced_test]
×
2053
    #[tokio::test]
2054
    async fn test_peer_loop_peer_list() {
1✔
2055
        let (peer_broadcast_tx, _from_main_rx_clone, to_main_tx, _to_main_rx1, state_lock, _hsd) =
1✔
2056
            get_test_genesis_setup(Network::Alpha, 2, cli_args::Args::default())
1✔
2057
                .await
1✔
2058
                .unwrap();
1✔
2059

1✔
2060
        let mut peer_infos = state_lock
1✔
2061
            .lock_guard()
1✔
2062
            .await
1✔
2063
            .net
1✔
2064
            .peer_map
1✔
2065
            .clone()
1✔
2066
            .into_values()
1✔
2067
            .collect::<Vec<_>>();
1✔
2068
        peer_infos.sort_by_cached_key(|x| x.connected_address());
2✔
2069
        let (peer_address0, instance_id0) = (
1✔
2070
            peer_infos[0].connected_address(),
1✔
2071
            peer_infos[0].instance_id(),
1✔
2072
        );
1✔
2073
        let (peer_address1, instance_id1) = (
1✔
2074
            peer_infos[1].connected_address(),
1✔
2075
            peer_infos[1].instance_id(),
1✔
2076
        );
1✔
2077

1✔
2078
        let (hsd2, sa2) = get_dummy_peer_connection_data_genesis(Network::Alpha, 2);
1✔
2079
        let expected_response = vec![
1✔
2080
            (peer_address0, instance_id0),
1✔
2081
            (peer_address1, instance_id1),
1✔
2082
            (sa2, hsd2.instance_id),
1✔
2083
        ];
1✔
2084
        let mock = Mock::new(vec![
1✔
2085
            Action::Read(PeerMessage::PeerListRequest),
1✔
2086
            Action::Write(PeerMessage::PeerListResponse(expected_response)),
1✔
2087
            Action::Read(PeerMessage::Bye),
1✔
2088
        ]);
1✔
2089

1✔
2090
        let from_main_rx_clone = peer_broadcast_tx.subscribe();
1✔
2091

1✔
2092
        let mut peer_loop_handler =
1✔
2093
            PeerLoopHandler::new(to_main_tx, state_lock.clone(), sa2, hsd2, true, 0);
1✔
2094
        peer_loop_handler
1✔
2095
            .run_wrapper(mock, from_main_rx_clone)
1✔
2096
            .await
1✔
2097
            .unwrap();
1✔
2098

1✔
2099
        assert_eq!(
1✔
2100
            2,
1✔
2101
            state_lock.lock_guard().await.net.peer_map.len(),
1✔
2102
            "peer map must have length 2 after saying goodbye to peer 2"
1✔
2103
        );
1✔
2104
    }
1✔
2105

UNCOV
2106
    #[traced_test]
×
2107
    #[tokio::test]
2108
    async fn different_genesis_test() -> Result<()> {
1✔
2109
        // In this scenario a peer provides another genesis block than what has been
1✔
2110
        // hardcoded. This should lead to the closing of the connection to this peer
1✔
2111
        // and a ban.
1✔
2112

1✔
2113
        let network = Network::Main;
1✔
2114
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, mut to_main_rx1, state_lock, hsd) =
1✔
2115
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
2116
        assert_eq!(1000, state_lock.cli().peer_tolerance);
1✔
2117
        let peer_address = get_dummy_socket_address(0);
1✔
2118

1✔
2119
        // Although the database is empty, `get_latest_block` still returns the genesis block,
1✔
2120
        // since that block is hardcoded.
1✔
2121
        let mut different_genesis_block = state_lock
1✔
2122
            .lock_guard()
1✔
2123
            .await
1✔
2124
            .chain
1✔
2125
            .archival_state()
1✔
2126
            .get_tip()
1✔
2127
            .await;
1✔
2128

1✔
2129
        different_genesis_block.set_header_nonce(StdRng::seed_from_u64(5550001).random());
1✔
2130
        let [block_1_with_different_genesis] = fake_valid_sequence_of_blocks_for_tests(
1✔
2131
            &different_genesis_block,
1✔
2132
            Timestamp::hours(1),
1✔
2133
            StdRng::seed_from_u64(5550001).random(),
1✔
2134
        )
1✔
2135
        .await;
1✔
2136
        let mock = Mock::new(vec![Action::Read(PeerMessage::Block(Box::new(
1✔
2137
            block_1_with_different_genesis.try_into().unwrap(),
1✔
2138
        )))]);
1✔
2139

1✔
2140
        let mut peer_loop_handler = PeerLoopHandler::new(
1✔
2141
            to_main_tx.clone(),
1✔
2142
            state_lock.clone(),
1✔
2143
            peer_address,
1✔
2144
            hsd,
1✔
2145
            true,
1✔
2146
            1,
1✔
2147
        );
1✔
2148
        let res = peer_loop_handler
1✔
2149
            .run_wrapper(mock, from_main_rx_clone)
1✔
2150
            .await;
1✔
2151
        assert!(
1✔
2152
            res.is_err(),
1✔
2153
            "run_wrapper must return failure when genesis is different"
1✔
2154
        );
1✔
2155

1✔
2156
        match to_main_rx1.recv().await {
1✔
2157
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
1✔
2158
            _ => bail!("Must receive remove of peer block max height"),
1✔
2159
        }
1✔
2160

1✔
2161
        // Verify that no further message was sent to main loop
1✔
2162
        match to_main_rx1.try_recv() {
1✔
2163
            Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (),
1✔
2164
            _ => bail!("Block notification must not be sent for block with invalid PoW"),
1✔
2165
        };
1✔
2166

1✔
2167
        drop(to_main_tx);
1✔
2168

1✔
2169
        let peer_standing = state_lock
1✔
2170
            .lock_guard()
1✔
2171
            .await
1✔
2172
            .net
1✔
2173
            .get_peer_standing_from_database(peer_address.ip())
1✔
2174
            .await;
1✔
2175
        assert_eq!(
1✔
2176
            -i32::from(state_lock.cli().peer_tolerance),
1✔
2177
            peer_standing.unwrap().standing
1✔
2178
        );
1✔
2179
        assert_eq!(
1✔
2180
            NegativePeerSanction::DifferentGenesis,
1✔
2181
            peer_standing.unwrap().latest_punishment.unwrap().0
1✔
2182
        );
1✔
2183

1✔
2184
        Ok(())
1✔
2185
    }
1✔
2186

UNCOV
2187
    #[traced_test]
×
2188
    #[tokio::test]
2189
    async fn node_does_not_record_disconnection_time_when_peer_initiates_disconnect() -> Result<()>
1✔
2190
    {
1✔
2191
        let args = cli_args::Args::default();
1✔
2192
        let network = args.network;
1✔
2193
        let (from_main_tx, from_main_rx, to_main_tx, to_main_rx, state_lock, _) =
1✔
2194
            get_test_genesis_setup(network, 0, args).await?;
1✔
2195

1✔
2196
        let peer_address = get_dummy_socket_address(0);
1✔
2197
        let peer_handshake_data = get_dummy_handshake_data_for_genesis(network);
1✔
2198
        let peer_id = peer_handshake_data.instance_id;
1✔
2199
        let mut peer_loop_handler = PeerLoopHandler::new(
1✔
2200
            to_main_tx,
1✔
2201
            state_lock.clone(),
1✔
2202
            peer_address,
1✔
2203
            peer_handshake_data,
1✔
2204
            true,
1✔
2205
            1,
1✔
2206
        );
1✔
2207
        let mock = Mock::new(vec![Action::Read(PeerMessage::Bye)]);
1✔
2208
        peer_loop_handler.run_wrapper(mock, from_main_rx).await?;
1✔
2209

1✔
2210
        let global_state = state_lock.lock_guard().await;
1✔
2211
        assert!(global_state
1✔
2212
            .net
1✔
2213
            .last_disconnection_time_of_peer(peer_id)
1✔
2214
            .is_none());
1✔
2215

1✔
2216
        drop(to_main_rx);
1✔
2217
        drop(from_main_tx);
1✔
2218

1✔
2219
        Ok(())
1✔
2220
    }
1✔
2221

UNCOV
2222
    #[traced_test]
×
2223
    #[tokio::test]
2224
    async fn block_without_valid_pow_test() -> Result<()> {
1✔
2225
        // In this scenario, a block without a valid PoW is received. This block should be rejected
1✔
2226
        // by the peer loop and a notification should never reach the main loop.
1✔
2227

1✔
2228
        let network = Network::Main;
1✔
2229
        let (peer_broadcast_tx, _from_main_rx_clone, to_main_tx, mut to_main_rx1, state_lock, hsd) =
1✔
2230
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
2231
        let peer_address = get_dummy_socket_address(0);
1✔
2232
        let genesis_block: Block = state_lock
1✔
2233
            .lock_guard()
1✔
2234
            .await
1✔
2235
            .chain
1✔
2236
            .archival_state()
1✔
2237
            .get_tip()
1✔
2238
            .await;
1✔
2239

1✔
2240
        // Make a with hash above what the implied threshold from
1✔
2241
        let [mut block_without_valid_pow] = fake_valid_sequence_of_blocks_for_tests(
1✔
2242
            &genesis_block,
1✔
2243
            Timestamp::hours(1),
1✔
2244
            StdRng::seed_from_u64(5550001).random(),
1✔
2245
        )
1✔
2246
        .await;
1✔
2247

1✔
2248
        // This *probably* is invalid PoW -- and needs to be for this test to
1✔
2249
        // work.
1✔
2250
        block_without_valid_pow.set_header_nonce(Digest::default());
1✔
2251

1✔
2252
        // Sending an invalid block will not necessarily result in a ban. This depends on the peer
1✔
2253
        // tolerance that is set in the client. For this reason, we include a "Bye" here.
1✔
2254
        let mock = Mock::new(vec![
1✔
2255
            Action::Read(PeerMessage::Block(Box::new(
1✔
2256
                block_without_valid_pow.clone().try_into().unwrap(),
1✔
2257
            ))),
1✔
2258
            Action::Read(PeerMessage::Bye),
1✔
2259
        ]);
1✔
2260

1✔
2261
        let from_main_rx_clone = peer_broadcast_tx.subscribe();
1✔
2262

1✔
2263
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
2264
            to_main_tx.clone(),
1✔
2265
            state_lock.clone(),
1✔
2266
            peer_address,
1✔
2267
            hsd,
1✔
2268
            true,
1✔
2269
            1,
1✔
2270
            block_without_valid_pow.header().timestamp,
1✔
2271
        );
1✔
2272
        peer_loop_handler
1✔
2273
            .run_wrapper(mock, from_main_rx_clone)
1✔
2274
            .await
1✔
2275
            .expect("sending (one) invalid block should not result in closed connection");
1✔
2276

1✔
2277
        match to_main_rx1.recv().await {
1✔
2278
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
1✔
2279
            _ => bail!("Must receive remove of peer block max height"),
1✔
2280
        }
1✔
2281

1✔
2282
        // Verify that no further message was sent to main loop
1✔
2283
        match to_main_rx1.try_recv() {
1✔
2284
            Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (),
1✔
2285
            _ => bail!("Block notification must not be sent for block with invalid PoW"),
1✔
2286
        };
1✔
2287

1✔
2288
        // We need to have the transmitter in scope until we have received from it
1✔
2289
        // otherwise the receiver will report the disconnected error when we attempt
1✔
2290
        // to read from it. And the purpose is to verify that the channel is empty,
1✔
2291
        // not that it has been closed.
1✔
2292
        drop(to_main_tx);
1✔
2293

1✔
2294
        // Verify that peer standing was stored in database
1✔
2295
        let standing = state_lock
1✔
2296
            .lock_guard()
1✔
2297
            .await
1✔
2298
            .net
1✔
2299
            .peer_databases
1✔
2300
            .peer_standings
1✔
2301
            .get(peer_address.ip())
1✔
2302
            .await
1✔
2303
            .unwrap();
1✔
2304
        assert!(
1✔
2305
            standing.standing < 0,
1✔
2306
            "Peer must be sanctioned for sending a bad block"
1✔
2307
        );
1✔
2308

1✔
2309
        Ok(())
1✔
2310
    }
1✔
2311

UNCOV
2312
    #[traced_test]
×
2313
    #[tokio::test]
2314
    async fn test_peer_loop_block_with_block_in_db() -> Result<()> {
1✔
2315
        // The scenario tested here is that a client receives a block that is already
1✔
2316
        // known and stored. The expected behavior is to ignore the block and not send
1✔
2317
        // a message to the main task.
1✔
2318

1✔
2319
        let network = Network::Main;
1✔
2320
        let (peer_broadcast_tx, _from_main_rx_clone, to_main_tx, mut to_main_rx1, mut alice, hsd) =
1✔
2321
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
2322
        let peer_address = get_dummy_socket_address(0);
1✔
2323
        let genesis_block: Block = Block::genesis(network);
1✔
2324

1✔
2325
        let now = genesis_block.header().timestamp + Timestamp::hours(1);
1✔
2326
        let block_1 =
1✔
2327
            fake_valid_block_for_tests(&alice, StdRng::seed_from_u64(5550001).random()).await;
1✔
2328
        assert!(
1✔
2329
            block_1.is_valid(&genesis_block, now).await,
1✔
2330
            "Block must be valid for this test to make sense"
1✔
2331
        );
1✔
2332
        alice.set_new_tip(block_1.clone()).await?;
1✔
2333

1✔
2334
        let mock_peer_messages = Mock::new(vec![
1✔
2335
            Action::Read(PeerMessage::Block(Box::new(
1✔
2336
                block_1.clone().try_into().unwrap(),
1✔
2337
            ))),
1✔
2338
            Action::Read(PeerMessage::Bye),
1✔
2339
        ]);
1✔
2340

1✔
2341
        let from_main_rx_clone = peer_broadcast_tx.subscribe();
1✔
2342

1✔
2343
        let mut alice_peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
2344
            to_main_tx.clone(),
1✔
2345
            alice.clone(),
1✔
2346
            peer_address,
1✔
2347
            hsd,
1✔
2348
            false,
1✔
2349
            1,
1✔
2350
            block_1.header().timestamp,
1✔
2351
        );
1✔
2352
        alice_peer_loop_handler
1✔
2353
            .run_wrapper(mock_peer_messages, from_main_rx_clone)
1✔
2354
            .await?;
1✔
2355

1✔
2356
        match to_main_rx1.recv().await {
1✔
2357
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
1✔
2358
            other => bail!("Must receive remove of peer block max height. Got:\n {other:?}"),
1✔
2359
        }
1✔
2360
        match to_main_rx1.try_recv() {
1✔
2361
            Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (),
1✔
2362
            _ => bail!("Block notification must not be sent for block with invalid PoW"),
1✔
2363
        };
1✔
2364
        drop(to_main_tx);
1✔
2365

1✔
2366
        if !alice.lock_guard().await.net.peer_map.is_empty() {
1✔
2367
            bail!("peer map must be empty after closing connection gracefully");
1✔
2368
        }
1✔
2369

1✔
2370
        Ok(())
1✔
2371
    }
1✔
2372

UNCOV
2373
    #[traced_test]
×
2374
    #[tokio::test]
2375
    async fn block_request_batch_simple() {
1✔
2376
        // Scenario: Six blocks (including genesis) are known. Peer requests
1✔
2377
        // from all possible starting points, and client responds with the
1✔
2378
        // correct list of blocks.
1✔
2379
        let network = Network::Main;
1✔
2380
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, mut state_lock, hsd) =
1✔
2381
            get_test_genesis_setup(network, 0, cli_args::Args::default())
1✔
2382
                .await
1✔
2383
                .unwrap();
1✔
2384
        let genesis_block: Block = Block::genesis(network);
1✔
2385
        let peer_address = get_dummy_socket_address(0);
1✔
2386
        let [block_1, block_2, block_3, block_4, block_5] =
1✔
2387
            fake_valid_sequence_of_blocks_for_tests(
1✔
2388
                &genesis_block,
1✔
2389
                Timestamp::hours(1),
1✔
2390
                StdRng::seed_from_u64(5550001).random(),
1✔
2391
            )
1✔
2392
            .await;
1✔
2393
        let blocks = vec![
1✔
2394
            genesis_block,
1✔
2395
            block_1,
1✔
2396
            block_2,
1✔
2397
            block_3,
1✔
2398
            block_4,
1✔
2399
            block_5.clone(),
1✔
2400
        ];
1✔
2401
        for block in blocks.iter().skip(1) {
5✔
2402
            state_lock.set_new_tip(block.to_owned()).await.unwrap();
5✔
2403
        }
1✔
2404

1✔
2405
        let mmra = state_lock
1✔
2406
            .lock_guard()
1✔
2407
            .await
1✔
2408
            .chain
1✔
2409
            .archival_state()
1✔
2410
            .archival_block_mmr
1✔
2411
            .ammr()
1✔
2412
            .to_accumulator_async()
1✔
2413
            .await;
1✔
2414
        for i in 0..=4 {
6✔
2415
            let expected_response = {
5✔
2416
                let state = state_lock.lock_guard().await;
5✔
2417
                let blocks_for_response = blocks.iter().skip(i + 1).cloned().collect_vec();
5✔
2418
                PeerLoopHandler::batch_response(&state, blocks_for_response, &mmra)
5✔
2419
                    .await
5✔
2420
                    .unwrap()
5✔
2421
            };
5✔
2422
            let mock = Mock::new(vec![
5✔
2423
                Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch {
5✔
2424
                    known_blocks: vec![blocks[i].hash()],
5✔
2425
                    max_response_len: 14,
5✔
2426
                    anchor: mmra.clone(),
5✔
2427
                })),
5✔
2428
                Action::Write(PeerMessage::BlockResponseBatch(expected_response)),
5✔
2429
                Action::Read(PeerMessage::Bye),
5✔
2430
            ]);
5✔
2431
            let mut peer_loop_handler = PeerLoopHandler::new(
5✔
2432
                to_main_tx.clone(),
5✔
2433
                state_lock.clone(),
5✔
2434
                peer_address,
5✔
2435
                hsd.clone(),
5✔
2436
                false,
5✔
2437
                1,
5✔
2438
            );
5✔
2439

5✔
2440
            peer_loop_handler
5✔
2441
                .run_wrapper(mock, from_main_rx_clone.resubscribe())
5✔
2442
                .await
5✔
2443
                .unwrap();
5✔
2444
        }
1✔
2445
    }
1✔
2446

UNCOV
2447
    #[traced_test]
×
2448
    #[tokio::test]
2449
    async fn block_request_batch_in_order_test() -> Result<()> {
1✔
2450
        // Scenario: A fork began at block 2, node knows two blocks of height 2 and two of height 3.
1✔
2451
        // A peer requests a batch of blocks starting from block 1. Ensure that the correct blocks
1✔
2452
        // are returned.
1✔
2453

1✔
2454
        let network = Network::Main;
1✔
2455
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, mut state_lock, hsd) =
1✔
2456
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
2457
        let genesis_block: Block = Block::genesis(network);
1✔
2458
        let peer_address = get_dummy_socket_address(0);
1✔
2459
        let [block_1, block_2_a, block_3_a] = fake_valid_sequence_of_blocks_for_tests(
1✔
2460
            &genesis_block,
1✔
2461
            Timestamp::hours(1),
1✔
2462
            StdRng::seed_from_u64(5550001).random(),
1✔
2463
        )
1✔
2464
        .await;
1✔
2465
        let [block_2_b, block_3_b] = fake_valid_sequence_of_blocks_for_tests(
1✔
2466
            &block_1,
1✔
2467
            Timestamp::hours(1),
1✔
2468
            StdRng::seed_from_u64(5550002).random(),
1✔
2469
        )
1✔
2470
        .await;
1✔
2471
        assert_ne!(block_2_b.hash(), block_2_a.hash());
1✔
2472

1✔
2473
        state_lock.set_new_tip(block_1.clone()).await?;
1✔
2474
        state_lock.set_new_tip(block_2_a.clone()).await?;
1✔
2475
        state_lock.set_new_tip(block_2_b.clone()).await?;
1✔
2476
        state_lock.set_new_tip(block_3_b.clone()).await?;
1✔
2477
        state_lock.set_new_tip(block_3_a.clone()).await?;
1✔
2478

1✔
2479
        let anchor = state_lock
1✔
2480
            .lock_guard()
1✔
2481
            .await
1✔
2482
            .chain
1✔
2483
            .archival_state()
1✔
2484
            .archival_block_mmr
1✔
2485
            .ammr()
1✔
2486
            .to_accumulator_async()
1✔
2487
            .await;
1✔
2488
        let response_1 = {
1✔
2489
            let state_lock = state_lock.lock_guard().await;
1✔
2490
            PeerLoopHandler::batch_response(
1✔
2491
                &state_lock,
1✔
2492
                vec![block_1.clone(), block_2_a.clone(), block_3_a.clone()],
1✔
2493
                &anchor,
1✔
2494
            )
1✔
2495
            .await
1✔
2496
            .unwrap()
1✔
2497
        };
1✔
2498

1✔
2499
        let mut mock = Mock::new(vec![
1✔
2500
            Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch {
1✔
2501
                known_blocks: vec![genesis_block.hash()],
1✔
2502
                max_response_len: 14,
1✔
2503
                anchor: anchor.clone(),
1✔
2504
            })),
1✔
2505
            Action::Write(PeerMessage::BlockResponseBatch(response_1)),
1✔
2506
            Action::Read(PeerMessage::Bye),
1✔
2507
        ]);
1✔
2508

1✔
2509
        let mut peer_loop_handler_1 = PeerLoopHandler::with_mocked_time(
1✔
2510
            to_main_tx.clone(),
1✔
2511
            state_lock.clone(),
1✔
2512
            peer_address,
1✔
2513
            hsd.clone(),
1✔
2514
            false,
1✔
2515
            1,
1✔
2516
            block_3_a.header().timestamp,
1✔
2517
        );
1✔
2518

1✔
2519
        peer_loop_handler_1
1✔
2520
            .run_wrapper(mock, from_main_rx_clone.resubscribe())
1✔
2521
            .await?;
1✔
2522

1✔
2523
        // Peer knows block 2_b, verify that canonical chain with 2_a is returned
1✔
2524
        let response_2 = {
1✔
2525
            let state_lock = state_lock.lock_guard().await;
1✔
2526
            PeerLoopHandler::batch_response(
1✔
2527
                &state_lock,
1✔
2528
                vec![block_2_a, block_3_a.clone()],
1✔
2529
                &anchor,
1✔
2530
            )
1✔
2531
            .await
1✔
2532
            .unwrap()
1✔
2533
        };
1✔
2534
        mock = Mock::new(vec![
1✔
2535
            Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch {
1✔
2536
                known_blocks: vec![block_2_b.hash(), block_1.hash(), genesis_block.hash()],
1✔
2537
                max_response_len: 14,
1✔
2538
                anchor,
1✔
2539
            })),
1✔
2540
            Action::Write(PeerMessage::BlockResponseBatch(response_2)),
1✔
2541
            Action::Read(PeerMessage::Bye),
1✔
2542
        ]);
1✔
2543

1✔
2544
        let mut peer_loop_handler_2 = PeerLoopHandler::with_mocked_time(
1✔
2545
            to_main_tx.clone(),
1✔
2546
            state_lock.clone(),
1✔
2547
            peer_address,
1✔
2548
            hsd,
1✔
2549
            false,
1✔
2550
            1,
1✔
2551
            block_3_a.header().timestamp,
1✔
2552
        );
1✔
2553

1✔
2554
        peer_loop_handler_2
1✔
2555
            .run_wrapper(mock, from_main_rx_clone)
1✔
2556
            .await?;
1✔
2557

1✔
2558
        Ok(())
1✔
2559
    }
1✔
2560

UNCOV
2561
    #[traced_test]
×
2562
    #[tokio::test]
2563
    async fn block_request_batch_out_of_order_test() -> Result<()> {
1✔
2564
        // Scenario: A fork began at block 2, node knows two blocks of height 2 and two of height 3.
1✔
2565
        // A peer requests a batch of blocks starting from block 1, but the peer supplies their
1✔
2566
        // hashes in a wrong order. Ensure that the correct blocks are returned, in the right order.
1✔
2567
        // The blocks will be supplied in the correct order but starting from the first digest in
1✔
2568
        // the list that is known and canonical.
1✔
2569

1✔
2570
        let network = Network::Main;
1✔
2571
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, mut state_lock, hsd) =
1✔
2572
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
2573
        let genesis_block = Block::genesis(network);
1✔
2574
        let peer_address = get_dummy_socket_address(0);
1✔
2575
        let [block_1, block_2_a, block_3_a] = fake_valid_sequence_of_blocks_for_tests(
1✔
2576
            &genesis_block,
1✔
2577
            Timestamp::hours(1),
1✔
2578
            StdRng::seed_from_u64(5550001).random(),
1✔
2579
        )
1✔
2580
        .await;
1✔
2581
        let [block_2_b, block_3_b] = fake_valid_sequence_of_blocks_for_tests(
1✔
2582
            &block_1,
1✔
2583
            Timestamp::hours(1),
1✔
2584
            StdRng::seed_from_u64(5550002).random(),
1✔
2585
        )
1✔
2586
        .await;
1✔
2587
        assert_ne!(block_2_a.hash(), block_2_b.hash());
1✔
2588

1✔
2589
        state_lock.set_new_tip(block_1.clone()).await?;
1✔
2590
        state_lock.set_new_tip(block_2_a.clone()).await?;
1✔
2591
        state_lock.set_new_tip(block_2_b.clone()).await?;
1✔
2592
        state_lock.set_new_tip(block_3_b.clone()).await?;
1✔
2593
        state_lock.set_new_tip(block_3_a.clone()).await?;
1✔
2594

1✔
2595
        // Peer knows block 2_b, verify that canonical chain with 2_a is returned
1✔
2596
        let mut expected_anchor = block_3_a.body().block_mmr_accumulator.clone();
1✔
2597
        expected_anchor.append(block_3_a.hash());
1✔
2598
        let state_anchor = state_lock
1✔
2599
            .lock_guard()
1✔
2600
            .await
1✔
2601
            .chain
1✔
2602
            .archival_state()
1✔
2603
            .archival_block_mmr
1✔
2604
            .ammr()
1✔
2605
            .to_accumulator_async()
1✔
2606
            .await;
1✔
2607
        assert_eq!(
1✔
2608
            expected_anchor, state_anchor,
1✔
2609
            "Catching assumption about MMRA in tip and in archival state"
1✔
2610
        );
1✔
2611

1✔
2612
        let response = {
1✔
2613
            let state_lock = state_lock.lock_guard().await;
1✔
2614
            PeerLoopHandler::batch_response(
1✔
2615
                &state_lock,
1✔
2616
                vec![block_1.clone(), block_2_a, block_3_a.clone()],
1✔
2617
                &expected_anchor,
1✔
2618
            )
1✔
2619
            .await
1✔
2620
            .unwrap()
1✔
2621
        };
1✔
2622
        let mock = Mock::new(vec![
1✔
2623
            Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch {
1✔
2624
                known_blocks: vec![block_2_b.hash(), genesis_block.hash(), block_1.hash()],
1✔
2625
                max_response_len: 14,
1✔
2626
                anchor: expected_anchor,
1✔
2627
            })),
1✔
2628
            // Since genesis block is the 1st known in the list of known blocks,
1✔
2629
            // it's immediate descendent, block_1, is the first one returned.
1✔
2630
            Action::Write(PeerMessage::BlockResponseBatch(response)),
1✔
2631
            Action::Read(PeerMessage::Bye),
1✔
2632
        ]);
1✔
2633

1✔
2634
        let mut peer_loop_handler_2 = PeerLoopHandler::with_mocked_time(
1✔
2635
            to_main_tx.clone(),
1✔
2636
            state_lock.clone(),
1✔
2637
            peer_address,
1✔
2638
            hsd,
1✔
2639
            false,
1✔
2640
            1,
1✔
2641
            block_3_a.header().timestamp,
1✔
2642
        );
1✔
2643

1✔
2644
        peer_loop_handler_2
1✔
2645
            .run_wrapper(mock, from_main_rx_clone)
1✔
2646
            .await?;
1✔
2647

1✔
2648
        Ok(())
1✔
2649
    }
1✔
2650

UNCOV
2651
    #[traced_test]
×
2652
    #[tokio::test]
2653
    async fn request_unknown_height_doesnt_crash() {
1✔
2654
        // Scenario: Only genesis block is known. Peer requests block of height
1✔
2655
        // 2.
1✔
2656
        let network = Network::Main;
1✔
2657
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, state_lock, hsd) =
1✔
2658
            get_test_genesis_setup(network, 0, cli_args::Args::default())
1✔
2659
                .await
1✔
2660
                .unwrap();
1✔
2661
        let peer_address = get_dummy_socket_address(0);
1✔
2662
        let mock = Mock::new(vec![
1✔
2663
            Action::Read(PeerMessage::BlockRequestByHeight(2.into())),
1✔
2664
            Action::Read(PeerMessage::Bye),
1✔
2665
        ]);
1✔
2666

1✔
2667
        let mut peer_loop_handler = PeerLoopHandler::new(
1✔
2668
            to_main_tx.clone(),
1✔
2669
            state_lock.clone(),
1✔
2670
            peer_address,
1✔
2671
            hsd,
1✔
2672
            false,
1✔
2673
            1,
1✔
2674
        );
1✔
2675

1✔
2676
        // This will return error if seen read/write order does not match that of the
1✔
2677
        // mocked object.
1✔
2678
        peer_loop_handler
1✔
2679
            .run_wrapper(mock, from_main_rx_clone)
1✔
2680
            .await
1✔
2681
            .unwrap();
1✔
2682

1✔
2683
        // Verify that peer is sanctioned for this nonsense.
1✔
2684
        assert!(state_lock
1✔
2685
            .lock_guard()
1✔
2686
            .await
1✔
2687
            .net
1✔
2688
            .get_peer_standing_from_database(peer_address.ip())
1✔
2689
            .await
1✔
2690
            .unwrap()
1✔
2691
            .standing
1✔
2692
            .is_negative());
1✔
2693
    }
1✔
2694

UNCOV
2695
    #[traced_test]
×
2696
    #[tokio::test]
2697
    async fn find_canonical_chain_when_multiple_blocks_at_same_height_test() -> Result<()> {
1✔
2698
        // Scenario: A fork began at block 2, node knows two blocks of height 2 and two of height 3.
1✔
2699
        // A peer requests a block at height 2. Verify that the correct block at height 2 is
1✔
2700
        // returned.
1✔
2701

1✔
2702
        let network = Network::Main;
1✔
2703
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, mut state_lock, hsd) =
1✔
2704
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
2705
        let genesis_block = Block::genesis(network);
1✔
2706
        let peer_address = get_dummy_socket_address(0);
1✔
2707

1✔
2708
        let [block_1, block_2_a, block_3_a] = fake_valid_sequence_of_blocks_for_tests(
1✔
2709
            &genesis_block,
1✔
2710
            Timestamp::hours(1),
1✔
2711
            StdRng::seed_from_u64(5550001).random(),
1✔
2712
        )
1✔
2713
        .await;
1✔
2714
        let [block_2_b, block_3_b] = fake_valid_sequence_of_blocks_for_tests(
1✔
2715
            &block_1,
1✔
2716
            Timestamp::hours(1),
1✔
2717
            StdRng::seed_from_u64(5550002).random(),
1✔
2718
        )
1✔
2719
        .await;
1✔
2720
        assert_ne!(block_2_a.hash(), block_2_b.hash());
1✔
2721

1✔
2722
        state_lock.set_new_tip(block_1.clone()).await?;
1✔
2723
        state_lock.set_new_tip(block_2_a.clone()).await?;
1✔
2724
        state_lock.set_new_tip(block_2_b.clone()).await?;
1✔
2725
        state_lock.set_new_tip(block_3_b.clone()).await?;
1✔
2726
        state_lock.set_new_tip(block_3_a.clone()).await?;
1✔
2727

1✔
2728
        let mock = Mock::new(vec![
1✔
2729
            Action::Read(PeerMessage::BlockRequestByHeight(2.into())),
1✔
2730
            Action::Write(PeerMessage::Block(Box::new(block_2_a.try_into().unwrap()))),
1✔
2731
            Action::Read(PeerMessage::BlockRequestByHeight(3.into())),
1✔
2732
            Action::Write(PeerMessage::Block(Box::new(
1✔
2733
                block_3_a.clone().try_into().unwrap(),
1✔
2734
            ))),
1✔
2735
            Action::Read(PeerMessage::Bye),
1✔
2736
        ]);
1✔
2737

1✔
2738
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
2739
            to_main_tx.clone(),
1✔
2740
            state_lock.clone(),
1✔
2741
            peer_address,
1✔
2742
            hsd,
1✔
2743
            false,
1✔
2744
            1,
1✔
2745
            block_3_a.header().timestamp,
1✔
2746
        );
1✔
2747

1✔
2748
        // This will return error if seen read/write order does not match that of the
1✔
2749
        // mocked object.
1✔
2750
        peer_loop_handler
1✔
2751
            .run_wrapper(mock, from_main_rx_clone)
1✔
2752
            .await?;
1✔
2753

1✔
2754
        Ok(())
1✔
2755
    }
1✔
2756

UNCOV
2757
    #[traced_test]
×
2758
    #[tokio::test]
2759
    async fn receival_of_block_notification_height_1() {
1✔
2760
        // Scenario: client only knows genesis block. Then receives block
1✔
2761
        // notification of height 1. Must request block 1.
1✔
2762
        let network = Network::Main;
1✔
2763
        let mut rng = StdRng::seed_from_u64(5552401);
1✔
2764
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, to_main_rx1, state_lock, hsd) =
1✔
2765
            get_test_genesis_setup(network, 0, cli_args::Args::default())
1✔
2766
                .await
1✔
2767
                .unwrap();
1✔
2768
        let block_1 = fake_valid_block_for_tests(&state_lock, rng.random()).await;
1✔
2769
        let notification_height1 = (&block_1).into();
1✔
2770
        let mock = Mock::new(vec![
1✔
2771
            Action::Read(PeerMessage::BlockNotification(notification_height1)),
1✔
2772
            Action::Write(PeerMessage::BlockRequestByHeight(1u64.into())),
1✔
2773
            Action::Read(PeerMessage::Bye),
1✔
2774
        ]);
1✔
2775

1✔
2776
        let peer_address = get_dummy_socket_address(0);
1✔
2777
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
2778
            to_main_tx.clone(),
1✔
2779
            state_lock.clone(),
1✔
2780
            peer_address,
1✔
2781
            hsd,
1✔
2782
            false,
1✔
2783
            1,
1✔
2784
            block_1.header().timestamp,
1✔
2785
        );
1✔
2786
        peer_loop_handler
1✔
2787
            .run_wrapper(mock, from_main_rx_clone)
1✔
2788
            .await
1✔
2789
            .unwrap();
1✔
2790

1✔
2791
        drop(to_main_rx1);
1✔
2792
    }
1✔
2793

UNCOV
2794
    #[traced_test]
×
2795
    #[tokio::test]
2796
    async fn receive_block_request_by_height_block_7() {
1✔
2797
        // Scenario: client only knows blocks up to height 7. Then receives block-
1✔
2798
        // request-by-height for height 7. Must respond with block 7.
1✔
2799
        let network = Network::Main;
1✔
2800
        let mut rng = StdRng::seed_from_u64(5552401);
1✔
2801
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, to_main_rx1, mut state_lock, hsd) =
1✔
2802
            get_test_genesis_setup(network, 0, cli_args::Args::default())
1✔
2803
                .await
1✔
2804
                .unwrap();
1✔
2805
        let genesis_block = Block::genesis(network);
1✔
2806
        let blocks: [Block; 7] = fake_valid_sequence_of_blocks_for_tests(
1✔
2807
            &genesis_block,
1✔
2808
            Timestamp::hours(1),
1✔
2809
            rng.random(),
1✔
2810
        )
1✔
2811
        .await;
1✔
2812
        let block7 = blocks.last().unwrap().to_owned();
1✔
2813
        let tip_height: u64 = block7.header().height.into();
1✔
2814
        assert_eq!(7, tip_height);
1✔
2815

1✔
2816
        for block in &blocks {
8✔
2817
            state_lock.set_new_tip(block.to_owned()).await.unwrap();
7✔
2818
        }
1✔
2819

1✔
2820
        let block7_response = PeerMessage::Block(Box::new(block7.try_into().unwrap()));
1✔
2821
        let mock = Mock::new(vec![
1✔
2822
            Action::Read(PeerMessage::BlockRequestByHeight(7u64.into())),
1✔
2823
            Action::Write(block7_response),
1✔
2824
            Action::Read(PeerMessage::Bye),
1✔
2825
        ]);
1✔
2826

1✔
2827
        let peer_address = get_dummy_socket_address(0);
1✔
2828
        let mut peer_loop_handler = PeerLoopHandler::new(
1✔
2829
            to_main_tx.clone(),
1✔
2830
            state_lock.clone(),
1✔
2831
            peer_address,
1✔
2832
            hsd,
1✔
2833
            false,
1✔
2834
            1,
1✔
2835
        );
1✔
2836
        peer_loop_handler
1✔
2837
            .run_wrapper(mock, from_main_rx_clone)
1✔
2838
            .await
1✔
2839
            .unwrap();
1✔
2840

1✔
2841
        drop(to_main_rx1);
1✔
2842
    }
1✔
2843

UNCOV
2844
    #[traced_test]
×
2845
    #[tokio::test]
2846
    async fn test_peer_loop_receival_of_first_block() -> Result<()> {
1✔
2847
        // Scenario: client only knows genesis block. Then receives block 1.
1✔
2848

1✔
2849
        let network = Network::Main;
1✔
2850
        let mut rng = StdRng::seed_from_u64(5550001);
1✔
2851
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, mut to_main_rx1, state_lock, hsd) =
1✔
2852
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
2853
        let peer_address = get_dummy_socket_address(0);
1✔
2854

1✔
2855
        let block_1 = fake_valid_block_for_tests(&state_lock, rng.random()).await;
1✔
2856
        let mock = Mock::new(vec![
1✔
2857
            Action::Read(PeerMessage::Block(Box::new(
1✔
2858
                block_1.clone().try_into().unwrap(),
1✔
2859
            ))),
1✔
2860
            Action::Read(PeerMessage::Bye),
1✔
2861
        ]);
1✔
2862

1✔
2863
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
2864
            to_main_tx.clone(),
1✔
2865
            state_lock.clone(),
1✔
2866
            peer_address,
1✔
2867
            hsd,
1✔
2868
            false,
1✔
2869
            1,
1✔
2870
            block_1.header().timestamp,
1✔
2871
        );
1✔
2872
        peer_loop_handler
1✔
2873
            .run_wrapper(mock, from_main_rx_clone)
1✔
2874
            .await?;
1✔
2875

1✔
2876
        // Verify that a block was sent to `main_loop`
1✔
2877
        match to_main_rx1.recv().await {
1✔
2878
            Some(PeerTaskToMain::NewBlocks(_block)) => (),
1✔
2879
            _ => bail!("Did not find msg sent to main task"),
1✔
2880
        };
1✔
2881

1✔
2882
        match to_main_rx1.recv().await {
1✔
2883
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
1✔
2884
            _ => bail!("Must receive remove of peer block max height"),
1✔
2885
        }
1✔
2886

1✔
2887
        if !state_lock.lock_guard().await.net.peer_map.is_empty() {
1✔
2888
            bail!("peer map must be empty after closing connection gracefully");
1✔
2889
        }
1✔
2890

1✔
2891
        Ok(())
1✔
2892
    }
1✔
2893

UNCOV
2894
    #[traced_test]
×
2895
    #[tokio::test]
2896
    async fn test_peer_loop_receival_of_second_block_no_blocks_in_db() -> Result<()> {
1✔
2897
        // In this scenario, the client only knows the genesis block (block 0) and then
1✔
2898
        // receives block 2, meaning that block 1 will have to be requested.
1✔
2899

1✔
2900
        let network = Network::Main;
1✔
2901
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, mut to_main_rx1, state_lock, hsd) =
1✔
2902
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
2903
        let peer_address = get_dummy_socket_address(0);
1✔
2904
        let genesis_block: Block = state_lock
1✔
2905
            .lock_guard()
1✔
2906
            .await
1✔
2907
            .chain
1✔
2908
            .archival_state()
1✔
2909
            .get_tip()
1✔
2910
            .await;
1✔
2911
        let [block_1, block_2] = fake_valid_sequence_of_blocks_for_tests(
1✔
2912
            &genesis_block,
1✔
2913
            Timestamp::hours(1),
1✔
2914
            StdRng::seed_from_u64(5550001).random(),
1✔
2915
        )
1✔
2916
        .await;
1✔
2917

1✔
2918
        let mock = Mock::new(vec![
1✔
2919
            Action::Read(PeerMessage::Block(Box::new(
1✔
2920
                block_2.clone().try_into().unwrap(),
1✔
2921
            ))),
1✔
2922
            Action::Write(PeerMessage::BlockRequestByHash(block_1.hash())),
1✔
2923
            Action::Read(PeerMessage::Block(Box::new(
1✔
2924
                block_1.clone().try_into().unwrap(),
1✔
2925
            ))),
1✔
2926
            Action::Read(PeerMessage::Bye),
1✔
2927
        ]);
1✔
2928

1✔
2929
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
2930
            to_main_tx.clone(),
1✔
2931
            state_lock.clone(),
1✔
2932
            peer_address,
1✔
2933
            hsd,
1✔
2934
            true,
1✔
2935
            1,
1✔
2936
            block_2.header().timestamp,
1✔
2937
        );
1✔
2938
        peer_loop_handler
1✔
2939
            .run_wrapper(mock, from_main_rx_clone)
1✔
2940
            .await?;
1✔
2941

1✔
2942
        match to_main_rx1.recv().await {
1✔
2943
            Some(PeerTaskToMain::NewBlocks(blocks)) => {
1✔
2944
                if blocks[0].hash() != block_1.hash() {
1✔
2945
                    bail!("1st received block by main loop must be block 1");
1✔
2946
                }
1✔
2947
                if blocks[1].hash() != block_2.hash() {
1✔
2948
                    bail!("2nd received block by main loop must be block 2");
1✔
2949
                }
1✔
2950
            }
1✔
2951
            _ => bail!("Did not find msg sent to main task 1"),
1✔
2952
        };
1✔
2953
        match to_main_rx1.recv().await {
1✔
2954
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
1✔
2955
            _ => bail!("Must receive remove of peer block max height"),
1✔
2956
        }
1✔
2957

1✔
2958
        if !state_lock.lock_guard().await.net.peer_map.is_empty() {
1✔
2959
            bail!("peer map must be empty after closing connection gracefully");
1✔
2960
        }
1✔
2961

1✔
2962
        Ok(())
1✔
2963
    }
1✔
2964

UNCOV
2965
    #[traced_test]
×
2966
    #[tokio::test]
2967
    async fn prevent_ram_exhaustion_test() -> Result<()> {
1✔
2968
        // In this scenario the peer sends more blocks than the client allows to store in the
1✔
2969
        // fork-reconciliation field. This should result in abandonment of the fork-reconciliation
1✔
2970
        // process as the alternative is that the program will crash because it runs out of RAM.
1✔
2971

1✔
2972
        let network = Network::Main;
1✔
2973
        let mut rng = StdRng::seed_from_u64(5550001);
1✔
2974
        let (
1✔
2975
            _peer_broadcast_tx,
1✔
2976
            from_main_rx_clone,
1✔
2977
            to_main_tx,
1✔
2978
            mut to_main_rx1,
1✔
2979
            mut state_lock,
1✔
2980
            _hsd,
1✔
2981
        ) = get_test_genesis_setup(network, 1, cli_args::Args::default()).await?;
1✔
2982
        let genesis_block = Block::genesis(network);
1✔
2983

1✔
2984
        // Restrict max number of blocks held in memory to 2.
1✔
2985
        let mut cli = state_lock.cli().clone();
1✔
2986
        cli.sync_mode_threshold = 2;
1✔
2987
        state_lock.set_cli(cli).await;
1✔
2988

1✔
2989
        let (hsd1, peer_address1) = get_dummy_peer_connection_data_genesis(Network::Alpha, 1);
1✔
2990
        let [block_1, _block_2, block_3, block_4] = fake_valid_sequence_of_blocks_for_tests(
1✔
2991
            &genesis_block,
1✔
2992
            Timestamp::hours(1),
1✔
2993
            rng.random(),
1✔
2994
        )
1✔
2995
        .await;
1✔
2996
        state_lock.set_new_tip(block_1.clone()).await?;
1✔
2997

1✔
2998
        let mock = Mock::new(vec![
1✔
2999
            Action::Read(PeerMessage::Block(Box::new(
1✔
3000
                block_4.clone().try_into().unwrap(),
1✔
3001
            ))),
1✔
3002
            Action::Write(PeerMessage::BlockRequestByHash(block_3.hash())),
1✔
3003
            Action::Read(PeerMessage::Block(Box::new(
1✔
3004
                block_3.clone().try_into().unwrap(),
1✔
3005
            ))),
1✔
3006
            Action::Read(PeerMessage::Bye),
1✔
3007
        ]);
1✔
3008

1✔
3009
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
3010
            to_main_tx.clone(),
1✔
3011
            state_lock.clone(),
1✔
3012
            peer_address1,
1✔
3013
            hsd1,
1✔
3014
            true,
1✔
3015
            1,
1✔
3016
            block_4.header().timestamp,
1✔
3017
        );
1✔
3018
        peer_loop_handler
1✔
3019
            .run_wrapper(mock, from_main_rx_clone)
1✔
3020
            .await?;
1✔
3021

1✔
3022
        match to_main_rx1.recv().await {
1✔
3023
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
1✔
3024
            _ => bail!("Must receive remove of peer block max height"),
1✔
3025
        }
1✔
3026

1✔
3027
        // Verify that no block is sent to main loop.
1✔
3028
        match to_main_rx1.try_recv() {
1✔
3029
            Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (),
1✔
3030
            _ => bail!("Peer must not handle more fork-reconciliation blocks than specified in CLI arguments"),
1✔
3031
        };
1✔
3032
        drop(to_main_tx);
1✔
3033

1✔
3034
        // Verify that peer is sanctioned for failed fork reconciliation attempt
1✔
3035
        assert!(state_lock
1✔
3036
            .lock_guard()
1✔
3037
            .await
1✔
3038
            .net
1✔
3039
            .get_peer_standing_from_database(peer_address1.ip())
1✔
3040
            .await
1✔
3041
            .unwrap()
1✔
3042
            .standing
1✔
3043
            .is_negative());
1✔
3044

1✔
3045
        Ok(())
1✔
3046
    }
1✔
3047

UNCOV
3048
    #[traced_test]
×
3049
    #[tokio::test]
3050
    async fn test_peer_loop_receival_of_fourth_block_one_block_in_db() {
1✔
3051
        // In this scenario, the client know the genesis block (block 0) and block 1, it
1✔
3052
        // then receives block 4, meaning that block 3 and 2 will have to be requested.
1✔
3053

1✔
3054
        let network = Network::Main;
1✔
3055
        let (
1✔
3056
            _peer_broadcast_tx,
1✔
3057
            from_main_rx_clone,
1✔
3058
            to_main_tx,
1✔
3059
            mut to_main_rx1,
1✔
3060
            mut state_lock,
1✔
3061
            hsd,
1✔
3062
        ) = get_test_genesis_setup(network, 0, cli_args::Args::default())
1✔
3063
            .await
1✔
3064
            .unwrap();
1✔
3065
        let peer_address: SocketAddr = get_dummy_socket_address(0);
1✔
3066
        let genesis_block = Block::genesis(network);
1✔
3067
        let [block_1, block_2, block_3, block_4] = fake_valid_sequence_of_blocks_for_tests(
1✔
3068
            &genesis_block,
1✔
3069
            Timestamp::hours(1),
1✔
3070
            StdRng::seed_from_u64(5550001).random(),
1✔
3071
        )
1✔
3072
        .await;
1✔
3073
        state_lock.set_new_tip(block_1.clone()).await.unwrap();
1✔
3074

1✔
3075
        let mock = Mock::new(vec![
1✔
3076
            Action::Read(PeerMessage::Block(Box::new(
1✔
3077
                block_4.clone().try_into().unwrap(),
1✔
3078
            ))),
1✔
3079
            Action::Write(PeerMessage::BlockRequestByHash(block_3.hash())),
1✔
3080
            Action::Read(PeerMessage::Block(Box::new(
1✔
3081
                block_3.clone().try_into().unwrap(),
1✔
3082
            ))),
1✔
3083
            Action::Write(PeerMessage::BlockRequestByHash(block_2.hash())),
1✔
3084
            Action::Read(PeerMessage::Block(Box::new(
1✔
3085
                block_2.clone().try_into().unwrap(),
1✔
3086
            ))),
1✔
3087
            Action::Read(PeerMessage::Bye),
1✔
3088
        ]);
1✔
3089

1✔
3090
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
3091
            to_main_tx.clone(),
1✔
3092
            state_lock.clone(),
1✔
3093
            peer_address,
1✔
3094
            hsd,
1✔
3095
            true,
1✔
3096
            1,
1✔
3097
            block_4.header().timestamp,
1✔
3098
        );
1✔
3099
        peer_loop_handler
1✔
3100
            .run_wrapper(mock, from_main_rx_clone)
1✔
3101
            .await
1✔
3102
            .unwrap();
1✔
3103

1✔
3104
        let Some(PeerTaskToMain::NewBlocks(blocks)) = to_main_rx1.recv().await else {
1✔
3105
            panic!("Did not find msg sent to main task");
1✔
3106
        };
1✔
3107
        assert_eq!(blocks[0].hash(), block_2.hash());
1✔
3108
        assert_eq!(blocks[1].hash(), block_3.hash());
1✔
3109
        assert_eq!(blocks[2].hash(), block_4.hash());
1✔
3110

1✔
3111
        let Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) = to_main_rx1.recv().await else {
1✔
3112
            panic!("Must receive remove of peer block max height");
1✔
3113
        };
1✔
3114

1✔
3115
        assert!(
1✔
3116
            state_lock.lock_guard().await.net.peer_map.is_empty(),
1✔
3117
            "peer map must be empty after closing connection gracefully"
1✔
3118
        );
1✔
3119
    }
1✔
3120

UNCOV
3121
    #[traced_test]
×
3122
    #[tokio::test]
3123
    async fn test_peer_loop_receival_of_third_block_no_blocks_in_db() -> Result<()> {
1✔
3124
        // In this scenario, the client only knows the genesis block (block 0) and then
1✔
3125
        // receives block 3, meaning that block 2 and 1 will have to be requested.
1✔
3126

1✔
3127
        let network = Network::Main;
1✔
3128
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, mut to_main_rx1, state_lock, hsd) =
1✔
3129
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
3130
        let peer_address = get_dummy_socket_address(0);
1✔
3131
        let genesis_block = Block::genesis(network);
1✔
3132

1✔
3133
        let [block_1, block_2, block_3] = fake_valid_sequence_of_blocks_for_tests(
1✔
3134
            &genesis_block,
1✔
3135
            Timestamp::hours(1),
1✔
3136
            StdRng::seed_from_u64(5550001).random(),
1✔
3137
        )
1✔
3138
        .await;
1✔
3139

1✔
3140
        let mock = Mock::new(vec![
1✔
3141
            Action::Read(PeerMessage::Block(Box::new(
1✔
3142
                block_3.clone().try_into().unwrap(),
1✔
3143
            ))),
1✔
3144
            Action::Write(PeerMessage::BlockRequestByHash(block_2.hash())),
1✔
3145
            Action::Read(PeerMessage::Block(Box::new(
1✔
3146
                block_2.clone().try_into().unwrap(),
1✔
3147
            ))),
1✔
3148
            Action::Write(PeerMessage::BlockRequestByHash(block_1.hash())),
1✔
3149
            Action::Read(PeerMessage::Block(Box::new(
1✔
3150
                block_1.clone().try_into().unwrap(),
1✔
3151
            ))),
1✔
3152
            Action::Read(PeerMessage::Bye),
1✔
3153
        ]);
1✔
3154

1✔
3155
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
3156
            to_main_tx.clone(),
1✔
3157
            state_lock.clone(),
1✔
3158
            peer_address,
1✔
3159
            hsd,
1✔
3160
            true,
1✔
3161
            1,
1✔
3162
            block_3.header().timestamp,
1✔
3163
        );
1✔
3164
        peer_loop_handler
1✔
3165
            .run_wrapper(mock, from_main_rx_clone)
1✔
3166
            .await?;
1✔
3167

1✔
3168
        match to_main_rx1.recv().await {
1✔
3169
            Some(PeerTaskToMain::NewBlocks(blocks)) => {
1✔
3170
                if blocks[0].hash() != block_1.hash() {
1✔
3171
                    bail!("1st received block by main loop must be block 1");
1✔
3172
                }
1✔
3173
                if blocks[1].hash() != block_2.hash() {
1✔
3174
                    bail!("2nd received block by main loop must be block 2");
1✔
3175
                }
1✔
3176
                if blocks[2].hash() != block_3.hash() {
1✔
3177
                    bail!("3rd received block by main loop must be block 3");
1✔
3178
                }
1✔
3179
            }
1✔
3180
            _ => bail!("Did not find msg sent to main task"),
1✔
3181
        };
1✔
3182
        match to_main_rx1.recv().await {
1✔
3183
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
1✔
3184
            _ => bail!("Must receive remove of peer block max height"),
1✔
3185
        }
1✔
3186

1✔
3187
        if !state_lock.lock_guard().await.net.peer_map.is_empty() {
1✔
3188
            bail!("peer map must be empty after closing connection gracefully");
1✔
3189
        }
1✔
3190

1✔
3191
        Ok(())
1✔
3192
    }
1✔
3193

UNCOV
3194
    #[traced_test]
×
3195
    #[tokio::test]
3196
    async fn test_block_reconciliation_interrupted_by_block_notification() -> Result<()> {
1✔
3197
        // In this scenario, the client know the genesis block (block 0) and block 1, it
1✔
3198
        // then receives block 4, meaning that block 3, 2, and 1 will have to be requested.
1✔
3199
        // But the requests are interrupted by the peer sending another message: a new block
1✔
3200
        // notification.
1✔
3201

1✔
3202
        let network = Network::Main;
1✔
3203
        let (
1✔
3204
            _peer_broadcast_tx,
1✔
3205
            from_main_rx_clone,
1✔
3206
            to_main_tx,
1✔
3207
            mut to_main_rx1,
1✔
3208
            mut state_lock,
1✔
3209
            hsd,
1✔
3210
        ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
3211
        let peer_socket_address: SocketAddr = get_dummy_socket_address(0);
1✔
3212
        let genesis_block: Block = state_lock
1✔
3213
            .lock_guard()
1✔
3214
            .await
1✔
3215
            .chain
1✔
3216
            .archival_state()
1✔
3217
            .get_tip()
1✔
3218
            .await;
1✔
3219

1✔
3220
        let [block_1, block_2, block_3, block_4, block_5] =
1✔
3221
            fake_valid_sequence_of_blocks_for_tests(
1✔
3222
                &genesis_block,
1✔
3223
                Timestamp::hours(1),
1✔
3224
                StdRng::seed_from_u64(5550001).random(),
1✔
3225
            )
1✔
3226
            .await;
1✔
3227
        state_lock.set_new_tip(block_1.clone()).await?;
1✔
3228

1✔
3229
        let mock = Mock::new(vec![
1✔
3230
            Action::Read(PeerMessage::Block(Box::new(
1✔
3231
                block_4.clone().try_into().unwrap(),
1✔
3232
            ))),
1✔
3233
            Action::Write(PeerMessage::BlockRequestByHash(block_3.hash())),
1✔
3234
            Action::Read(PeerMessage::Block(Box::new(
1✔
3235
                block_3.clone().try_into().unwrap(),
1✔
3236
            ))),
1✔
3237
            Action::Write(PeerMessage::BlockRequestByHash(block_2.hash())),
1✔
3238
            //
1✔
3239
            // Now make the interruption of the block reconciliation process
1✔
3240
            Action::Read(PeerMessage::BlockNotification((&block_5).into())),
1✔
3241
            //
1✔
3242
            // Complete the block reconciliation process by requesting the last block
1✔
3243
            // in this process, to get back to a mutually known block.
1✔
3244
            Action::Read(PeerMessage::Block(Box::new(
1✔
3245
                block_2.clone().try_into().unwrap(),
1✔
3246
            ))),
1✔
3247
            //
1✔
3248
            // Then anticipate the request of the block that was announced
1✔
3249
            // in the interruption.
1✔
3250
            // Note that we cannot anticipate the response, as only the main
1✔
3251
            // task writes to the database. And the database needs to be updated
1✔
3252
            // for the handling of block 5 to be done correctly.
1✔
3253
            Action::Write(PeerMessage::BlockRequestByHeight(
1✔
3254
                block_5.kernel.header.height,
1✔
3255
            )),
1✔
3256
            Action::Read(PeerMessage::Bye),
1✔
3257
        ]);
1✔
3258

1✔
3259
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
3260
            to_main_tx.clone(),
1✔
3261
            state_lock.clone(),
1✔
3262
            peer_socket_address,
1✔
3263
            hsd,
1✔
3264
            false,
1✔
3265
            1,
1✔
3266
            block_5.header().timestamp,
1✔
3267
        );
1✔
3268
        peer_loop_handler
1✔
3269
            .run_wrapper(mock, from_main_rx_clone)
1✔
3270
            .await?;
1✔
3271

1✔
3272
        match to_main_rx1.recv().await {
1✔
3273
            Some(PeerTaskToMain::NewBlocks(blocks)) => {
1✔
3274
                if blocks[0].hash() != block_2.hash() {
1✔
3275
                    bail!("1st received block by main loop must be block 1");
1✔
3276
                }
1✔
3277
                if blocks[1].hash() != block_3.hash() {
1✔
3278
                    bail!("2nd received block by main loop must be block 2");
1✔
3279
                }
1✔
3280
                if blocks[2].hash() != block_4.hash() {
1✔
3281
                    bail!("3rd received block by main loop must be block 3");
1✔
3282
                }
1✔
3283
            }
1✔
3284
            _ => bail!("Did not find msg sent to main task"),
1✔
3285
        };
1✔
3286
        match to_main_rx1.recv().await {
1✔
3287
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
1✔
3288
            _ => bail!("Must receive remove of peer block max height"),
1✔
3289
        }
1✔
3290

1✔
3291
        if !state_lock.lock_guard().await.net.peer_map.is_empty() {
1✔
3292
            bail!("peer map must be empty after closing connection gracefully");
1✔
3293
        }
1✔
3294

1✔
3295
        Ok(())
1✔
3296
    }
1✔
3297

UNCOV
3298
    #[traced_test]
×
3299
    #[tokio::test]
3300
    async fn test_block_reconciliation_interrupted_by_peer_list_request() -> Result<()> {
1✔
3301
        // In this scenario, the client knows the genesis block (block 0) and block 1, it
1✔
3302
        // then receives block 4, meaning that block 3, 2, and 1 will have to be requested.
1✔
3303
        // But the requests are interrupted by the peer sending another message: a request
1✔
3304
        // for a list of peers.
1✔
3305

1✔
3306
        let network = Network::Main;
1✔
3307
        let (
1✔
3308
            _peer_broadcast_tx,
1✔
3309
            from_main_rx_clone,
1✔
3310
            to_main_tx,
1✔
3311
            mut to_main_rx1,
1✔
3312
            mut state_lock,
1✔
3313
            _hsd,
1✔
3314
        ) = get_test_genesis_setup(network, 1, cli_args::Args::default()).await?;
1✔
3315
        let genesis_block = Block::genesis(network);
1✔
3316
        let peer_infos: Vec<PeerInfo> = state_lock
1✔
3317
            .lock_guard()
1✔
3318
            .await
1✔
3319
            .net
1✔
3320
            .peer_map
1✔
3321
            .clone()
1✔
3322
            .into_values()
1✔
3323
            .collect::<Vec<_>>();
1✔
3324

1✔
3325
        let [block_1, block_2, block_3, block_4] = fake_valid_sequence_of_blocks_for_tests(
1✔
3326
            &genesis_block,
1✔
3327
            Timestamp::hours(1),
1✔
3328
            StdRng::seed_from_u64(5550001).random(),
1✔
3329
        )
1✔
3330
        .await;
1✔
3331
        state_lock.set_new_tip(block_1.clone()).await?;
1✔
3332

1✔
3333
        let (hsd_1, sa_1) = get_dummy_peer_connection_data_genesis(network, 1);
1✔
3334
        let expected_peer_list_resp = vec![
1✔
3335
            (
1✔
3336
                peer_infos[0].listen_address().unwrap(),
1✔
3337
                peer_infos[0].instance_id(),
1✔
3338
            ),
1✔
3339
            (sa_1, hsd_1.instance_id),
1✔
3340
        ];
1✔
3341
        let mock = Mock::new(vec![
1✔
3342
            Action::Read(PeerMessage::Block(Box::new(
1✔
3343
                block_4.clone().try_into().unwrap(),
1✔
3344
            ))),
1✔
3345
            Action::Write(PeerMessage::BlockRequestByHash(block_3.hash())),
1✔
3346
            Action::Read(PeerMessage::Block(Box::new(
1✔
3347
                block_3.clone().try_into().unwrap(),
1✔
3348
            ))),
1✔
3349
            Action::Write(PeerMessage::BlockRequestByHash(block_2.hash())),
1✔
3350
            //
1✔
3351
            // Now make the interruption of the block reconciliation process
1✔
3352
            Action::Read(PeerMessage::PeerListRequest),
1✔
3353
            //
1✔
3354
            // Answer the request for a peer list
1✔
3355
            Action::Write(PeerMessage::PeerListResponse(expected_peer_list_resp)),
1✔
3356
            //
1✔
3357
            // Complete the block reconciliation process by requesting the last block
1✔
3358
            // in this process, to get back to a mutually known block.
1✔
3359
            Action::Read(PeerMessage::Block(Box::new(
1✔
3360
                block_2.clone().try_into().unwrap(),
1✔
3361
            ))),
1✔
3362
            Action::Read(PeerMessage::Bye),
1✔
3363
        ]);
1✔
3364

1✔
3365
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
3366
            to_main_tx,
1✔
3367
            state_lock.clone(),
1✔
3368
            sa_1,
1✔
3369
            hsd_1,
1✔
3370
            true,
1✔
3371
            1,
1✔
3372
            block_4.header().timestamp,
1✔
3373
        );
1✔
3374
        peer_loop_handler
1✔
3375
            .run_wrapper(mock, from_main_rx_clone)
1✔
3376
            .await?;
1✔
3377

1✔
3378
        // Verify that blocks are sent to `main_loop` in expected ordering
1✔
3379
        match to_main_rx1.recv().await {
1✔
3380
            Some(PeerTaskToMain::NewBlocks(blocks)) => {
1✔
3381
                if blocks[0].hash() != block_2.hash() {
1✔
3382
                    bail!("1st received block by main loop must be block 1");
1✔
3383
                }
1✔
3384
                if blocks[1].hash() != block_3.hash() {
1✔
3385
                    bail!("2nd received block by main loop must be block 2");
1✔
3386
                }
1✔
3387
                if blocks[2].hash() != block_4.hash() {
1✔
3388
                    bail!("3rd received block by main loop must be block 3");
1✔
3389
                }
1✔
3390
            }
1✔
3391
            _ => bail!("Did not find msg sent to main task"),
1✔
3392
        };
1✔
3393

1✔
3394
        assert_eq!(
1✔
3395
            1,
1✔
3396
            state_lock.lock_guard().await.net.peer_map.len(),
1✔
3397
            "One peer must remain in peer list after peer_1 closed gracefully"
1✔
3398
        );
1✔
3399

1✔
3400
        Ok(())
1✔
3401
    }
1✔
3402

UNCOV
3403
    #[traced_test]
×
3404
    #[tokio::test]
3405
    async fn empty_mempool_request_tx_test() {
1✔
3406
        // In this scenario the client receives a transaction notification from
1✔
3407
        // a peer of a transaction it doesn't know; the client must then request it.
1✔
3408

1✔
3409
        let network = Network::Main;
1✔
3410
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, mut to_main_rx1, state_lock, _hsd) =
1✔
3411
            get_test_genesis_setup(network, 1, cli_args::Args::default())
1✔
3412
                .await
1✔
3413
                .unwrap();
1✔
3414

1✔
3415
        let spending_key = state_lock
1✔
3416
            .lock_guard()
1✔
3417
            .await
1✔
3418
            .wallet_state
1✔
3419
            .wallet_entropy
1✔
3420
            .nth_symmetric_key_for_tests(0);
1✔
3421
        let genesis_block = Block::genesis(network);
1✔
3422
        let now = genesis_block.kernel.header.timestamp;
1✔
3423
        let (transaction_1, _, _change_output) = state_lock
1✔
3424
            .lock_guard()
1✔
3425
            .await
1✔
3426
            .create_transaction_with_prover_capability(
1✔
3427
                Default::default(),
1✔
3428
                spending_key.into(),
1✔
3429
                UtxoNotificationMedium::OffChain,
1✔
3430
                NativeCurrencyAmount::coins(0),
1✔
3431
                now,
1✔
3432
                TxProvingCapability::ProofCollection,
1✔
3433
                &TritonVmJobQueue::dummy(),
1✔
3434
            )
1✔
3435
            .await
1✔
3436
            .unwrap();
1✔
3437

1✔
3438
        // Build the resulting transaction notification
1✔
3439
        let tx_notification: TransactionNotification = (&transaction_1).try_into().unwrap();
1✔
3440
        let mock = Mock::new(vec![
1✔
3441
            Action::Read(PeerMessage::TransactionNotification(tx_notification)),
1✔
3442
            Action::Write(PeerMessage::TransactionRequest(tx_notification.txid)),
1✔
3443
            Action::Read(PeerMessage::Transaction(Box::new(
1✔
3444
                (&transaction_1).try_into().unwrap(),
1✔
3445
            ))),
1✔
3446
            Action::Read(PeerMessage::Bye),
1✔
3447
        ]);
1✔
3448

1✔
3449
        let (hsd_1, _sa_1) = get_dummy_peer_connection_data_genesis(network, 1);
1✔
3450

1✔
3451
        // Mock a timestamp to allow transaction to be considered valid
1✔
3452
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
3453
            to_main_tx,
1✔
3454
            state_lock.clone(),
1✔
3455
            get_dummy_socket_address(0),
1✔
3456
            hsd_1.clone(),
1✔
3457
            true,
1✔
3458
            1,
1✔
3459
            now,
1✔
3460
        );
1✔
3461

1✔
3462
        let mut peer_state = MutablePeerState::new(hsd_1.tip_header.height);
1✔
3463

1✔
3464
        assert!(
1✔
3465
            state_lock.lock_guard().await.mempool.is_empty(),
1✔
3466
            "Mempool must be empty at init"
1✔
3467
        );
1✔
3468
        peer_loop_handler
1✔
3469
            .run(mock, from_main_rx_clone, &mut peer_state)
1✔
3470
            .await
1✔
3471
            .unwrap();
1✔
3472

1✔
3473
        // Transaction must be sent to `main_loop`. The transaction is stored to the mempool
1✔
3474
        // by the `main_loop`.
1✔
3475
        match to_main_rx1.recv().await {
1✔
3476
            Some(PeerTaskToMain::Transaction(_)) => (),
1✔
3477
            _ => panic!("Must receive remove of peer block max height"),
1✔
3478
        };
1✔
3479
    }
1✔
3480

UNCOV
3481
    #[traced_test]
×
3482
    #[tokio::test]
3483
    async fn populated_mempool_request_tx_test() -> Result<()> {
1✔
3484
        // In this scenario the peer is informed of a transaction that it already knows
1✔
3485

1✔
3486
        let network = Network::Main;
1✔
3487
        let (
1✔
3488
            _peer_broadcast_tx,
1✔
3489
            from_main_rx_clone,
1✔
3490
            to_main_tx,
1✔
3491
            mut to_main_rx1,
1✔
3492
            mut state_lock,
1✔
3493
            _hsd,
1✔
3494
        ) = get_test_genesis_setup(network, 1, cli_args::Args::default())
1✔
3495
            .await
1✔
3496
            .unwrap();
1✔
3497
        let spending_key = state_lock
1✔
3498
            .lock_guard()
1✔
3499
            .await
1✔
3500
            .wallet_state
1✔
3501
            .wallet_entropy
1✔
3502
            .nth_symmetric_key_for_tests(0);
1✔
3503

1✔
3504
        let genesis_block = Block::genesis(network);
1✔
3505
        let now = genesis_block.kernel.header.timestamp;
1✔
3506
        let (transaction_1, _, _change_output) = state_lock
1✔
3507
            .lock_guard()
1✔
3508
            .await
1✔
3509
            .create_transaction_with_prover_capability(
1✔
3510
                Default::default(),
1✔
3511
                spending_key.into(),
1✔
3512
                UtxoNotificationMedium::OffChain,
1✔
3513
                NativeCurrencyAmount::coins(0),
1✔
3514
                now,
1✔
3515
                TxProvingCapability::ProofCollection,
1✔
3516
                &TritonVmJobQueue::dummy(),
1✔
3517
            )
1✔
3518
            .await
1✔
3519
            .unwrap();
1✔
3520

1✔
3521
        let (hsd_1, _sa_1) = get_dummy_peer_connection_data_genesis(network, 1);
1✔
3522
        let mut peer_loop_handler = PeerLoopHandler::new(
1✔
3523
            to_main_tx,
1✔
3524
            state_lock.clone(),
1✔
3525
            get_dummy_socket_address(0),
1✔
3526
            hsd_1.clone(),
1✔
3527
            true,
1✔
3528
            1,
1✔
3529
        );
1✔
3530
        let mut peer_state = MutablePeerState::new(hsd_1.tip_header.height);
1✔
3531

1✔
3532
        assert!(
1✔
3533
            state_lock.lock_guard().await.mempool.is_empty(),
1✔
3534
            "Mempool must be empty at init"
1✔
3535
        );
1✔
3536
        state_lock
1✔
3537
            .lock_guard_mut()
1✔
3538
            .await
1✔
3539
            .mempool_insert(transaction_1.clone(), TransactionOrigin::Foreign)
1✔
3540
            .await;
1✔
3541
        assert!(
1✔
3542
            !state_lock.lock_guard().await.mempool.is_empty(),
1✔
3543
            "Mempool must be non-empty after insertion"
1✔
3544
        );
1✔
3545

1✔
3546
        // Run the peer loop and verify expected exchange -- namely that the
1✔
3547
        // tx notification is received and the the transaction is *not*
1✔
3548
        // requested.
1✔
3549
        let tx_notification: TransactionNotification = (&transaction_1).try_into().unwrap();
1✔
3550
        let mock = Mock::new(vec![
1✔
3551
            Action::Read(PeerMessage::TransactionNotification(tx_notification)),
1✔
3552
            Action::Read(PeerMessage::Bye),
1✔
3553
        ]);
1✔
3554
        peer_loop_handler
1✔
3555
            .run(mock, from_main_rx_clone, &mut peer_state)
1✔
3556
            .await
1✔
3557
            .unwrap();
1✔
3558

1✔
3559
        // nothing is allowed to be sent to `main_loop`
1✔
3560
        match to_main_rx1.try_recv() {
1✔
3561
            Err(TryRecvError::Empty) => (),
1✔
3562
            Err(TryRecvError::Disconnected) => panic!("to_main channel must still be open"),
1✔
3563
            Ok(_) => panic!("to_main channel must be empty"),
1✔
3564
        };
1✔
3565
        Ok(())
1✔
3566
    }
1✔
3567

3568
    mod block_proposals {
3569
        use super::*;
3570
        use crate::tests::shared::get_dummy_handshake_data_for_genesis;
3571

3572
        struct TestSetup {
3573
            peer_loop_handler: PeerLoopHandler,
3574
            to_main_rx: mpsc::Receiver<PeerTaskToMain>,
3575
            from_main_rx: broadcast::Receiver<MainToPeerTask>,
3576
            peer_state: MutablePeerState,
3577
            to_main_tx: mpsc::Sender<PeerTaskToMain>,
3578
            genesis_block: Block,
3579
            peer_broadcast_tx: broadcast::Sender<MainToPeerTask>,
3580
        }
3581

3582
        async fn genesis_setup(network: Network) -> TestSetup {
2✔
3583
            let (peer_broadcast_tx, from_main_rx, to_main_tx, to_main_rx, alice, _hsd) =
2✔
3584
                get_test_genesis_setup(network, 0, cli_args::Args::default())
2✔
3585
                    .await
2✔
3586
                    .unwrap();
2✔
3587
            let peer_hsd = get_dummy_handshake_data_for_genesis(network);
2✔
3588
            let peer_loop_handler = PeerLoopHandler::new(
2✔
3589
                to_main_tx.clone(),
2✔
3590
                alice.clone(),
2✔
3591
                get_dummy_socket_address(0),
2✔
3592
                peer_hsd.clone(),
2✔
3593
                true,
2✔
3594
                1,
2✔
3595
            );
2✔
3596
            let peer_state = MutablePeerState::new(peer_hsd.tip_header.height);
2✔
3597

2✔
3598
            // (peer_loop_handler, to_main_rx1)
2✔
3599
            TestSetup {
2✔
3600
                peer_broadcast_tx,
2✔
3601
                peer_loop_handler,
2✔
3602
                to_main_rx,
2✔
3603
                from_main_rx,
2✔
3604
                peer_state,
2✔
3605
                to_main_tx,
2✔
3606
                genesis_block: Block::genesis(network),
2✔
3607
            }
2✔
3608
        }
2✔
3609

UNCOV
3610
        #[traced_test]
×
3611
        #[tokio::test]
3612
        async fn accept_block_proposal_height_one() {
1✔
3613
            // Node knows genesis block, receives a block proposal for block 1
1✔
3614
            // and must accept this. Verify that main loop is informed of block
1✔
3615
            // proposal.
1✔
3616
            let TestSetup {
1✔
3617
                peer_broadcast_tx,
1✔
3618
                mut peer_loop_handler,
1✔
3619
                mut to_main_rx,
1✔
3620
                from_main_rx,
1✔
3621
                mut peer_state,
1✔
3622
                to_main_tx,
1✔
3623
                genesis_block,
1✔
3624
            } = genesis_setup(Network::Main).await;
1✔
3625
            let block1 = fake_valid_block_for_tests(
1✔
3626
                &peer_loop_handler.global_state_lock,
1✔
3627
                StdRng::seed_from_u64(5550001).random(),
1✔
3628
            )
1✔
3629
            .await;
1✔
3630

1✔
3631
            let mock = Mock::new(vec![
1✔
3632
                Action::Read(PeerMessage::BlockProposal(Box::new(block1))),
1✔
3633
                Action::Read(PeerMessage::Bye),
1✔
3634
            ]);
1✔
3635
            peer_loop_handler
1✔
3636
                .run(mock, from_main_rx, &mut peer_state)
1✔
3637
                .await
1✔
3638
                .unwrap();
1✔
3639

1✔
3640
            match to_main_rx.try_recv().unwrap() {
1✔
3641
                PeerTaskToMain::BlockProposal(block) => {
1✔
3642
                    assert_eq!(genesis_block.hash(), block.header().prev_block_digest);
1✔
3643
                }
1✔
3644
                _ => panic!("Expected main loop to be informed of block proposal"),
1✔
3645
            };
1✔
3646

1✔
3647
            drop(to_main_tx);
1✔
3648
            drop(peer_broadcast_tx);
1✔
3649
        }
1✔
3650

UNCOV
3651
        #[traced_test]
×
3652
        #[tokio::test]
3653
        async fn accept_block_proposal_notification_height_one() {
1✔
3654
            // Node knows genesis block, receives a block proposal notification
1✔
3655
            // for block 1 and must accept this by requesting the block
1✔
3656
            // proposal from peer.
1✔
3657
            let TestSetup {
1✔
3658
                peer_broadcast_tx,
1✔
3659
                mut peer_loop_handler,
1✔
3660
                to_main_rx: _,
1✔
3661
                from_main_rx,
1✔
3662
                mut peer_state,
1✔
3663
                to_main_tx,
1✔
3664
                ..
1✔
3665
            } = genesis_setup(Network::Main).await;
1✔
3666
            let block1 = fake_valid_block_for_tests(
1✔
3667
                &peer_loop_handler.global_state_lock,
1✔
3668
                StdRng::seed_from_u64(5550001).random(),
1✔
3669
            )
1✔
3670
            .await;
1✔
3671

1✔
3672
            let mock = Mock::new(vec![
1✔
3673
                Action::Read(PeerMessage::BlockProposalNotification((&block1).into())),
1✔
3674
                Action::Write(PeerMessage::BlockProposalRequest(
1✔
3675
                    BlockProposalRequest::new(block1.body().mast_hash()),
1✔
3676
                )),
1✔
3677
                Action::Read(PeerMessage::Bye),
1✔
3678
            ]);
1✔
3679
            peer_loop_handler
1✔
3680
                .run(mock, from_main_rx, &mut peer_state)
1✔
3681
                .await
1✔
3682
                .unwrap();
1✔
3683

1✔
3684
            drop(to_main_tx);
1✔
3685
            drop(peer_broadcast_tx);
1✔
3686
        }
1✔
3687
    }
3688

3689
    mod proof_qualities {
3690
        use strum::IntoEnumIterator;
3691

3692
        use super::*;
3693
        use crate::config_models::cli_args;
3694
        use crate::models::blockchain::transaction::Transaction;
3695
        use crate::models::peer::transfer_transaction::TransactionProofQuality;
3696
        use crate::tests::shared::mock_genesis_global_state;
3697

3698
        async fn tx_of_proof_quality(
2✔
3699
            network: Network,
2✔
3700
            quality: TransactionProofQuality,
2✔
3701
        ) -> Transaction {
2✔
3702
            let wallet_secret = WalletEntropy::devnet_wallet();
2✔
3703
            let alice_key = wallet_secret.nth_generation_spending_key_for_tests(0);
2✔
3704
            let alice =
2✔
3705
                mock_genesis_global_state(network, 1, wallet_secret, cli_args::Args::default())
2✔
3706
                    .await;
2✔
3707
            let alice = alice.lock_guard().await;
2✔
3708
            let genesis_block = alice.chain.light_state();
2✔
3709
            let in_seven_months = genesis_block.header().timestamp + Timestamp::months(7);
2✔
3710
            let prover_capability = match quality {
2✔
3711
                TransactionProofQuality::ProofCollection => TxProvingCapability::ProofCollection,
1✔
3712
                TransactionProofQuality::SingleProof => TxProvingCapability::SingleProof,
1✔
3713
            };
3714
            alice
2✔
3715
                .create_transaction_with_prover_capability(
2✔
3716
                    vec![].into(),
2✔
3717
                    alice_key.into(),
2✔
3718
                    UtxoNotificationMedium::OffChain,
2✔
3719
                    NativeCurrencyAmount::coins(1),
2✔
3720
                    in_seven_months,
2✔
3721
                    prover_capability,
2✔
3722
                    &TritonVmJobQueue::dummy(),
2✔
3723
                )
2✔
3724
                .await
2✔
3725
                .unwrap()
2✔
3726
                .0
2✔
3727
        }
2✔
3728

UNCOV
3729
        #[traced_test]
×
3730
        #[tokio::test]
3731
        async fn client_favors_higher_proof_quality() {
1✔
3732
            // In this scenario the peer is informed of a transaction that it
1✔
3733
            // already knows, and it's tested that it checks the proof quality
1✔
3734
            // field and verifies that it exceeds the proof in the mempool
1✔
3735
            // before requesting the transasction.
1✔
3736
            let network = Network::Main;
1✔
3737
            let proof_collection_tx =
1✔
3738
                tx_of_proof_quality(network, TransactionProofQuality::ProofCollection).await;
1✔
3739
            let single_proof_tx =
1✔
3740
                tx_of_proof_quality(network, TransactionProofQuality::SingleProof).await;
1✔
3741

1✔
3742
            for (own_tx_pq, new_tx_pq) in
4✔
3743
                TransactionProofQuality::iter().cartesian_product(TransactionProofQuality::iter())
1✔
3744
            {
1✔
3745
                use TransactionProofQuality::*;
1✔
3746

1✔
3747
                let (
1✔
3748
                    _peer_broadcast_tx,
4✔
3749
                    from_main_rx_clone,
4✔
3750
                    to_main_tx,
4✔
3751
                    mut to_main_rx1,
4✔
3752
                    mut alice,
4✔
3753
                    handshake_data,
4✔
3754
                ) = get_test_genesis_setup(network, 1, cli_args::Args::default())
4✔
3755
                    .await
4✔
3756
                    .unwrap();
4✔
3757

1✔
3758
                let (own_tx, new_tx) = match (own_tx_pq, new_tx_pq) {
4✔
3759
                    (ProofCollection, ProofCollection) => {
1✔
3760
                        (&proof_collection_tx, &proof_collection_tx)
1✔
3761
                    }
1✔
3762
                    (ProofCollection, SingleProof) => (&proof_collection_tx, &single_proof_tx),
1✔
3763
                    (SingleProof, ProofCollection) => (&single_proof_tx, &proof_collection_tx),
1✔
3764
                    (SingleProof, SingleProof) => (&single_proof_tx, &single_proof_tx),
1✔
3765
                };
1✔
3766

1✔
3767
                alice
4✔
3768
                    .lock_guard_mut()
4✔
3769
                    .await
4✔
3770
                    .mempool_insert(own_tx.to_owned(), TransactionOrigin::Foreign)
4✔
3771
                    .await;
4✔
3772

1✔
3773
                let tx_notification: TransactionNotification = new_tx.try_into().unwrap();
4✔
3774

4✔
3775
                let own_proof_is_supreme = own_tx_pq >= new_tx_pq;
4✔
3776
                let mock = if own_proof_is_supreme {
4✔
3777
                    Mock::new(vec![
3✔
3778
                        Action::Read(PeerMessage::TransactionNotification(tx_notification)),
3✔
3779
                        Action::Read(PeerMessage::Bye),
3✔
3780
                    ])
3✔
3781
                } else {
1✔
3782
                    Mock::new(vec![
1✔
3783
                        Action::Read(PeerMessage::TransactionNotification(tx_notification)),
1✔
3784
                        Action::Write(PeerMessage::TransactionRequest(tx_notification.txid)),
1✔
3785
                        Action::Read(PeerMessage::Transaction(Box::new(
1✔
3786
                            new_tx.try_into().unwrap(),
1✔
3787
                        ))),
1✔
3788
                        Action::Read(PeerMessage::Bye),
1✔
3789
                    ])
1✔
3790
                };
1✔
3791

1✔
3792
                let now = proof_collection_tx.kernel.timestamp;
4✔
3793
                let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
4✔
3794
                    to_main_tx,
4✔
3795
                    alice.clone(),
4✔
3796
                    get_dummy_socket_address(0),
4✔
3797
                    handshake_data.clone(),
4✔
3798
                    true,
4✔
3799
                    1,
4✔
3800
                    now,
4✔
3801
                );
4✔
3802
                let mut peer_state = MutablePeerState::new(handshake_data.tip_header.height);
4✔
3803

4✔
3804
                peer_loop_handler
4✔
3805
                    .run(mock, from_main_rx_clone, &mut peer_state)
4✔
3806
                    .await
4✔
3807
                    .unwrap();
4✔
3808

4✔
3809
                if own_proof_is_supreme {
4✔
3810
                    match to_main_rx1.try_recv() {
3✔
3811
                        Err(TryRecvError::Empty) => (),
3✔
3812
                        Err(TryRecvError::Disconnected) => {
1✔
3813
                            panic!("to_main channel must still be open")
1✔
3814
                        }
1✔
3815
                        Ok(_) => panic!("to_main channel must be empty"),
1✔
3816
                    }
1✔
3817
                } else {
1✔
3818
                    match to_main_rx1.try_recv() {
1✔
3819
                        Err(TryRecvError::Empty) => panic!("Transaction must be sent to main loop"),
1✔
3820
                        Err(TryRecvError::Disconnected) => {
1✔
3821
                            panic!("to_main channel must still be open")
1✔
3822
                        }
1✔
3823
                        Ok(PeerTaskToMain::Transaction(_)) => (),
1✔
3824
                        _ => panic!("Unexpected result from channel"),
1✔
3825
                    }
1✔
3826
                }
1✔
3827
            }
1✔
3828
        }
1✔
3829
    }
3830

3831
    mod sync_challenges {
3832
        use super::*;
3833
        use crate::tests::shared::fake_valid_sequence_of_blocks_for_tests_dyn;
3834

UNCOV
3835
        #[traced_test]
×
3836
        #[tokio::test]
3837
        async fn bad_sync_challenge_height_greater_than_tip() {
1✔
3838
            // Criterium: Challenge height may not exceed that of tip in the
1✔
3839
            // request.
1✔
3840

1✔
3841
            let network = Network::Main;
1✔
3842
            let (
1✔
3843
                _alice_main_to_peer_tx,
1✔
3844
                alice_main_to_peer_rx,
1✔
3845
                alice_peer_to_main_tx,
1✔
3846
                alice_peer_to_main_rx,
1✔
3847
                mut alice,
1✔
3848
                alice_hsd,
1✔
3849
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default())
1✔
3850
                .await
1✔
3851
                .unwrap();
1✔
3852
            let genesis_block: Block = Block::genesis(network);
1✔
3853
            let blocks: [Block; 11] = fake_valid_sequence_of_blocks_for_tests(
1✔
3854
                &genesis_block,
1✔
3855
                Timestamp::hours(1),
1✔
3856
                [0u8; 32],
1✔
3857
            )
1✔
3858
            .await;
1✔
3859
            for block in &blocks {
12✔
3860
                alice.set_new_tip(block.clone()).await.unwrap();
11✔
3861
            }
1✔
3862

1✔
3863
            let bh12 = blocks.last().unwrap().header().height;
1✔
3864
            let sync_challenge = SyncChallenge {
1✔
3865
                tip_digest: blocks[9].hash(),
1✔
3866
                challenges: [bh12; 10],
1✔
3867
            };
1✔
3868
            let alice_p2p_messages = Mock::new(vec![
1✔
3869
                Action::Read(PeerMessage::SyncChallenge(sync_challenge)),
1✔
3870
                Action::Read(PeerMessage::Bye),
1✔
3871
            ]);
1✔
3872

1✔
3873
            let peer_address = get_dummy_socket_address(0);
1✔
3874
            let mut alice_peer_loop_handler = PeerLoopHandler::new(
1✔
3875
                alice_peer_to_main_tx.clone(),
1✔
3876
                alice.clone(),
1✔
3877
                peer_address,
1✔
3878
                alice_hsd,
1✔
3879
                false,
1✔
3880
                1,
1✔
3881
            );
1✔
3882
            alice_peer_loop_handler
1✔
3883
                .run_wrapper(alice_p2p_messages, alice_main_to_peer_rx)
1✔
3884
                .await
1✔
3885
                .unwrap();
1✔
3886

1✔
3887
            drop(alice_peer_to_main_rx);
1✔
3888

1✔
3889
            let latest_sanction = alice
1✔
3890
                .lock_guard()
1✔
3891
                .await
1✔
3892
                .net
1✔
3893
                .get_peer_standing_from_database(peer_address.ip())
1✔
3894
                .await
1✔
3895
                .unwrap();
1✔
3896
            assert_eq!(
1✔
3897
                NegativePeerSanction::InvalidSyncChallenge,
1✔
3898
                latest_sanction
1✔
3899
                    .latest_punishment
1✔
3900
                    .expect("peer must be sanctioned")
1✔
3901
                    .0
1✔
3902
            );
1✔
3903
        }
1✔
3904

UNCOV
3905
        #[traced_test]
×
3906
        #[tokio::test]
3907
        async fn bad_sync_challenge_genesis_block_doesnt_crash_client() {
1✔
3908
            // Criterium: Challenge may not point to genesis block, or block 1, as
1✔
3909
            // tip.
1✔
3910

1✔
3911
            let network = Network::Main;
1✔
3912
            let genesis_block: Block = Block::genesis(network);
1✔
3913

1✔
3914
            let alice_cli = cli_args::Args::default();
1✔
3915
            let (
1✔
3916
                _alice_main_to_peer_tx,
1✔
3917
                alice_main_to_peer_rx,
1✔
3918
                alice_peer_to_main_tx,
1✔
3919
                alice_peer_to_main_rx,
1✔
3920
                alice,
1✔
3921
                alice_hsd,
1✔
3922
            ) = get_test_genesis_setup(network, 0, alice_cli).await.unwrap();
1✔
3923

1✔
3924
            let sync_challenge = SyncChallenge {
1✔
3925
                tip_digest: genesis_block.hash(),
1✔
3926
                challenges: [BlockHeight::genesis(); 10],
1✔
3927
            };
1✔
3928

1✔
3929
            let alice_p2p_messages = Mock::new(vec![
1✔
3930
                Action::Read(PeerMessage::SyncChallenge(sync_challenge)),
1✔
3931
                Action::Read(PeerMessage::Bye),
1✔
3932
            ]);
1✔
3933

1✔
3934
            let peer_address = get_dummy_socket_address(0);
1✔
3935
            let mut alice_peer_loop_handler = PeerLoopHandler::new(
1✔
3936
                alice_peer_to_main_tx.clone(),
1✔
3937
                alice.clone(),
1✔
3938
                peer_address,
1✔
3939
                alice_hsd,
1✔
3940
                false,
1✔
3941
                1,
1✔
3942
            );
1✔
3943
            alice_peer_loop_handler
1✔
3944
                .run_wrapper(alice_p2p_messages, alice_main_to_peer_rx)
1✔
3945
                .await
1✔
3946
                .unwrap();
1✔
3947

1✔
3948
            drop(alice_peer_to_main_rx);
1✔
3949

1✔
3950
            let latest_sanction = alice
1✔
3951
                .lock_guard()
1✔
3952
                .await
1✔
3953
                .net
1✔
3954
                .get_peer_standing_from_database(peer_address.ip())
1✔
3955
                .await
1✔
3956
                .unwrap();
1✔
3957
            assert_eq!(
1✔
3958
                NegativePeerSanction::InvalidSyncChallenge,
1✔
3959
                latest_sanction
1✔
3960
                    .latest_punishment
1✔
3961
                    .expect("peer must be sanctioned")
1✔
3962
                    .0
1✔
3963
            );
1✔
3964
        }
1✔
3965

UNCOV
3966
        #[traced_test]
×
3967
        #[tokio::test]
3968
        async fn sync_challenge_happy_path() -> Result<()> {
1✔
3969
            // Bob notifies Alice of a block whose parameters satisfy the sync mode
1✔
3970
            // criterion. Alice issues a challenge. Bob responds. Alice enters into
1✔
3971
            // sync mode.
1✔
3972

1✔
3973
            let mut rng = rand::rng();
1✔
3974
            let network = Network::Main;
1✔
3975
            let genesis_block: Block = Block::genesis(network);
1✔
3976

1✔
3977
            const ALICE_SYNC_MODE_THRESHOLD: usize = 10;
1✔
3978
            let alice_cli = cli_args::Args {
1✔
3979
                sync_mode_threshold: ALICE_SYNC_MODE_THRESHOLD,
1✔
3980
                ..Default::default()
1✔
3981
            };
1✔
3982
            let (
1✔
3983
                _alice_main_to_peer_tx,
1✔
3984
                alice_main_to_peer_rx,
1✔
3985
                alice_peer_to_main_tx,
1✔
3986
                mut alice_peer_to_main_rx,
1✔
3987
                mut alice,
1✔
3988
                alice_hsd,
1✔
3989
            ) = get_test_genesis_setup(network, 0, alice_cli).await?;
1✔
3990
            let _alice_socket_address = get_dummy_socket_address(0);
1✔
3991

1✔
3992
            let (
1✔
3993
                _bob_main_to_peer_tx,
1✔
3994
                _bob_main_to_peer_rx,
1✔
3995
                _bob_peer_to_main_tx,
1✔
3996
                _bob_peer_to_main_rx,
1✔
3997
                mut bob,
1✔
3998
                _bob_hsd,
1✔
3999
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
4000
            let bob_socket_address = get_dummy_socket_address(0);
1✔
4001

1✔
4002
            let now = genesis_block.header().timestamp + Timestamp::hours(1);
1✔
4003
            let block_1 = fake_valid_block_for_tests(&alice, rng.random()).await;
1✔
4004
            assert!(
1✔
4005
                block_1.is_valid(&genesis_block, now).await,
1✔
4006
                "Block must be valid for this test to make sense"
1✔
4007
            );
1✔
4008
            let alice_tip = &block_1;
1✔
4009
            alice.set_new_tip(block_1.clone()).await?;
1✔
4010
            bob.set_new_tip(block_1.clone()).await?;
1✔
4011

1✔
4012
            // produce enough blocks to ensure alice needs to go into sync mode
1✔
4013
            // with this block notification.
1✔
4014
            let blocks = fake_valid_sequence_of_blocks_for_tests_dyn(
1✔
4015
                &block_1,
1✔
4016
                TARGET_BLOCK_INTERVAL,
1✔
4017
                rng.random(),
1✔
4018
                rng.random_range(ALICE_SYNC_MODE_THRESHOLD + 1..20),
1✔
4019
            )
1✔
4020
            .await;
1✔
4021
            for block in &blocks {
18✔
4022
                bob.set_new_tip(block.clone()).await?;
17✔
4023
            }
1✔
4024
            let bob_tip = blocks.last().unwrap();
1✔
4025

1✔
4026
            let block_notification_from_bob = PeerBlockNotification {
1✔
4027
                hash: bob_tip.hash(),
1✔
4028
                height: bob_tip.header().height,
1✔
4029
                cumulative_proof_of_work: bob_tip.header().cumulative_proof_of_work,
1✔
4030
            };
1✔
4031

1✔
4032
            let alice_rng_seed = rng.random::<[u8; 32]>();
1✔
4033
            let mut alice_rng_clone = StdRng::from_seed(alice_rng_seed);
1✔
4034
            let sync_challenge_from_alice = SyncChallenge::generate(
1✔
4035
                &block_notification_from_bob,
1✔
4036
                alice_tip.header().height,
1✔
4037
                alice_rng_clone.random(),
1✔
4038
            );
1✔
4039

1✔
4040
            println!(
1✔
4041
                "sync challenge from alice:\n{:?}",
1✔
4042
                sync_challenge_from_alice
1✔
4043
            );
1✔
4044

1✔
4045
            let sync_challenge_response_from_bob = bob
1✔
4046
                .lock_guard()
1✔
4047
                .await
1✔
4048
                .response_to_sync_challenge(sync_challenge_from_alice)
1✔
4049
                .await
1✔
4050
                .expect("should be able to respond to sync challenge");
1✔
4051

1✔
4052
            let alice_p2p_messages = Mock::new(vec![
1✔
4053
                Action::Read(PeerMessage::BlockNotification(block_notification_from_bob)),
1✔
4054
                Action::Write(PeerMessage::SyncChallenge(sync_challenge_from_alice)),
1✔
4055
                Action::Read(PeerMessage::SyncChallengeResponse(Box::new(
1✔
4056
                    sync_challenge_response_from_bob,
1✔
4057
                ))),
1✔
4058
                Action::Read(PeerMessage::BlockNotification(block_notification_from_bob)),
1✔
4059
                // Ensure no 2nd sync challenge is sent, as timeout has not yet passed.
1✔
4060
                // The absence of a Write here checks that a 2nd challenge isn't sent
1✔
4061
                // when a successful was just received.
1✔
4062
                Action::Read(PeerMessage::Bye),
1✔
4063
            ]);
1✔
4064

1✔
4065
            let mut alice_peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
4066
                alice_peer_to_main_tx.clone(),
1✔
4067
                alice.clone(),
1✔
4068
                bob_socket_address,
1✔
4069
                alice_hsd,
1✔
4070
                false,
1✔
4071
                1,
1✔
4072
                bob_tip.header().timestamp,
1✔
4073
            );
1✔
4074
            alice_peer_loop_handler.set_rng(StdRng::from_seed(alice_rng_seed));
1✔
4075
            alice_peer_loop_handler
1✔
4076
                .run_wrapper(alice_p2p_messages, alice_main_to_peer_rx)
1✔
4077
                .await?;
1✔
4078

1✔
4079
            // AddPeerMaxBlockHeight message triggered *after* sync challenge
1✔
4080
            let mut expected_anchor_mmra = bob_tip.body().block_mmr_accumulator.clone();
1✔
4081
            expected_anchor_mmra.append(bob_tip.hash());
1✔
4082
            let expected_message_from_alice_peer_loop = PeerTaskToMain::AddPeerMaxBlockHeight {
1✔
4083
                peer_address: bob_socket_address,
1✔
4084
                claimed_height: bob_tip.header().height,
1✔
4085
                claimed_cumulative_pow: bob_tip.header().cumulative_proof_of_work,
1✔
4086
                claimed_block_mmra: expected_anchor_mmra,
1✔
4087
            };
1✔
4088
            let observed_message_from_alice_peer_loop = alice_peer_to_main_rx.recv().await.unwrap();
1✔
4089
            assert_eq!(
1✔
4090
                expected_message_from_alice_peer_loop,
1✔
4091
                observed_message_from_alice_peer_loop
1✔
4092
            );
1✔
4093

1✔
4094
            Ok(())
1✔
4095
        }
1✔
4096
    }
4097
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc