• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Neptune-Crypto / neptune-core / 13900163354

17 Mar 2025 01:04PM UTC coverage: 84.344% (+0.2%) from 84.172%
13900163354

Pull #503

github

web-flow
Merge 8872cc52d into 5618528a4
Pull Request #503: feat: Allow restriction of number of inputs per tx

359 of 377 new or added lines in 11 files covered. (95.23%)

1264 existing lines in 19 files now uncovered.

51079 of 60560 relevant lines covered (84.34%)

175910.47 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.01
/src/peer_loop.rs
1
use std::cmp;
2
use std::marker::Unpin;
3
use std::net::SocketAddr;
4
use std::time::Duration;
5
use std::time::SystemTime;
6

7
use anyhow::bail;
8
use anyhow::Result;
9
use chrono::DateTime;
10
use chrono::Utc;
11
use futures::sink::Sink;
12
use futures::sink::SinkExt;
13
use futures::stream::TryStream;
14
use futures::stream::TryStreamExt;
15
use itertools::Itertools;
16
use rand::rngs::StdRng;
17
use rand::Rng;
18
use rand::SeedableRng;
19
use tasm_lib::triton_vm::prelude::Digest;
20
use tasm_lib::twenty_first::prelude::Mmr;
21
use tasm_lib::twenty_first::prelude::MmrMembershipProof;
22
use tasm_lib::twenty_first::util_types::mmr::mmr_accumulator::MmrAccumulator;
23
use tokio::select;
24
use tokio::sync::broadcast;
25
use tokio::sync::mpsc;
26
use tracing::debug;
27
use tracing::error;
28
use tracing::info;
29
use tracing::warn;
30

31
use crate::connect_to_peers::close_peer_connected_callback;
32
use crate::macros::fn_name;
33
use crate::macros::log_slow_scope;
34
use crate::main_loop::MAX_NUM_DIGESTS_IN_BATCH_REQUEST;
35
use crate::models::blockchain::block::block_height::BlockHeight;
36
use crate::models::blockchain::block::mutator_set_update::MutatorSetUpdate;
37
use crate::models::blockchain::block::Block;
38
use crate::models::blockchain::transaction::transaction_kernel::TransactionConfirmabilityError;
39
use crate::models::blockchain::transaction::Transaction;
40
use crate::models::channel::MainToPeerTask;
41
use crate::models::channel::PeerTaskToMain;
42
use crate::models::channel::PeerTaskToMainTransaction;
43
use crate::models::peer::bootstrap_info::BootstrapInfo;
44
use crate::models::peer::bootstrap_info::BootstrapStatus;
45
use crate::models::peer::handshake_data::HandshakeData;
46
use crate::models::peer::peer_info::PeerConnectionInfo;
47
use crate::models::peer::peer_info::PeerInfo;
48
use crate::models::peer::transfer_block::TransferBlock;
49
use crate::models::peer::BlockProposalRequest;
50
use crate::models::peer::BlockRequestBatch;
51
use crate::models::peer::IssuedSyncChallenge;
52
use crate::models::peer::MutablePeerState;
53
use crate::models::peer::NegativePeerSanction;
54
use crate::models::peer::PeerMessage;
55
use crate::models::peer::PeerSanction;
56
use crate::models::peer::PeerStanding;
57
use crate::models::peer::PositivePeerSanction;
58
use crate::models::peer::SyncChallenge;
59
use crate::models::proof_abstractions::mast_hash::MastHash;
60
use crate::models::proof_abstractions::timestamp::Timestamp;
61
use crate::models::state::block_proposal::BlockProposalRejectError;
62
use crate::models::state::mempool::MEMPOOL_IGNORE_TRANSACTIONS_THIS_MANY_SECS_AHEAD;
63
use crate::models::state::mempool::MEMPOOL_TX_THRESHOLD_AGE_IN_SECS;
64
use crate::models::state::GlobalState;
65
use crate::models::state::GlobalStateLock;
66
use crate::util_types::mutator_set::removal_record::RemovalRecordValidityError;
67

68
const STANDARD_BLOCK_BATCH_SIZE: usize = 250;
69
const MAX_PEER_LIST_LENGTH: usize = 10;
70
const MINIMUM_BLOCK_BATCH_SIZE: usize = 2;
71

72
const KEEP_CONNECTION_ALIVE: bool = false;
73
const DISCONNECT_CONNECTION: bool = true;
74

75
pub type PeerStandingNumber = i32;
76

77
/// Handles messages from peers via TCP
78
///
79
/// also handles messages from main task over the main-to-peer-tasks broadcast
80
/// channel.
81
#[derive(Debug, Clone)]
82
pub struct PeerLoopHandler {
83
    to_main_tx: mpsc::Sender<PeerTaskToMain>,
84
    global_state_lock: GlobalStateLock,
85
    peer_address: SocketAddr,
86
    peer_handshake_data: HandshakeData,
87
    inbound_connection: bool,
88
    distance: u8,
89
    rng: StdRng,
90
    #[cfg(test)]
91
    mock_now: Option<Timestamp>,
92
}
93

94
impl PeerLoopHandler {
95
    pub(crate) fn new(
39✔
96
        to_main_tx: mpsc::Sender<PeerTaskToMain>,
39✔
97
        global_state_lock: GlobalStateLock,
39✔
98
        peer_address: SocketAddr,
39✔
99
        peer_handshake_data: HandshakeData,
39✔
100
        inbound_connection: bool,
39✔
101
        distance: u8,
39✔
102
    ) -> Self {
39✔
103
        Self {
39✔
104
            to_main_tx,
39✔
105
            global_state_lock,
39✔
106
            peer_address,
39✔
107
            peer_handshake_data,
39✔
108
            inbound_connection,
39✔
109
            distance,
39✔
110
            rng: StdRng::from_rng(&mut rand::rng()),
39✔
111
            #[cfg(test)]
39✔
112
            mock_now: None,
39✔
113
        }
39✔
114
    }
39✔
115

116
    /// Allows for mocked timestamps such that time dependencies may be tested.
117
    #[cfg(test)]
118
    pub(crate) fn with_mocked_time(
20✔
119
        to_main_tx: mpsc::Sender<PeerTaskToMain>,
20✔
120
        global_state_lock: GlobalStateLock,
20✔
121
        peer_address: SocketAddr,
20✔
122
        peer_handshake_data: HandshakeData,
20✔
123
        inbound_connection: bool,
20✔
124
        distance: u8,
20✔
125
        mocked_time: Timestamp,
20✔
126
    ) -> Self {
20✔
127
        Self {
20✔
128
            to_main_tx,
20✔
129
            global_state_lock,
20✔
130
            peer_address,
20✔
131
            peer_handshake_data,
20✔
132
            inbound_connection,
20✔
133
            distance,
20✔
134
            mock_now: Some(mocked_time),
20✔
135
            rng: StdRng::from_rng(&mut rand::rng()),
20✔
136
        }
20✔
137
    }
20✔
138

139
    /// Overwrite the random number generator object with a specific one.
140
    ///
141
    /// Useful for derandomizing tests.
142
    #[cfg(test)]
143
    fn set_rng(&mut self, rng: StdRng) {
1✔
144
        self.rng = rng;
1✔
145
    }
1✔
146

147
    fn now(&self) -> Timestamp {
27✔
148
        #[cfg(not(test))]
27✔
149
        {
27✔
150
            Timestamp::now()
27✔
151
        }
27✔
152
        #[cfg(test)]
27✔
153
        {
27✔
154
            self.mock_now.unwrap_or(Timestamp::now())
27✔
155
        }
27✔
156
    }
27✔
157

158
    /// Punish a peer for bad behavior.
159
    ///
160
    /// Return `Err` if the peer in question is (now) banned.
161
    ///
162
    /// # Locking:
163
    ///   * acquires `global_state_lock` for write
164
    async fn punish(&mut self, reason: NegativePeerSanction) -> Result<()> {
6✔
165
        let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
6✔
166
        warn!("Punishing peer {} for {:?}", self.peer_address.ip(), reason);
6✔
167
        debug!(
6✔
168
            "Peer standing before punishment is {}",
×
169
            global_state_mut
×
170
                .net
×
UNCOV
171
                .peer_map
×
UNCOV
172
                .get(&self.peer_address)
×
UNCOV
173
                .unwrap()
×
174
                .standing
175
        );
176

177
        let Some(peer_info) = global_state_mut.net.peer_map.get_mut(&self.peer_address) else {
6✔
UNCOV
178
            bail!("Could not read peer map.");
×
179
        };
180
        let sanction_result = peer_info.standing.sanction(PeerSanction::Negative(reason));
6✔
181
        if let Err(err) = sanction_result {
6✔
182
            warn!("Banning peer: {err}");
1✔
183
        }
5✔
184

185
        sanction_result.map_err(|err| anyhow::anyhow!("Banning peer: {err}"))
6✔
186
    }
6✔
187

188
    /// Reward a peer for good behavior.
189
    ///
190
    /// Return `Err` if the peer in question is banned.
191
    ///
192
    /// # Locking:
193
    ///   * acquires `global_state_lock` for write
194
    async fn reward(&mut self, reason: PositivePeerSanction) -> Result<()> {
7✔
195
        let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
7✔
196
        info!("Rewarding peer {} for {:?}", self.peer_address.ip(), reason);
7✔
197
        let Some(peer_info) = global_state_mut.net.peer_map.get_mut(&self.peer_address) else {
7✔
198
            error!("Could not read peer map.");
1✔
199
            return Ok(());
1✔
200
        };
201
        let sanction_result = peer_info.standing.sanction(PeerSanction::Positive(reason));
6✔
202
        if sanction_result.is_err() {
6✔
UNCOV
203
            error!("Cannot reward banned peer");
×
204
        }
6✔
205

206
        sanction_result.map_err(|err| anyhow::anyhow!("Cannot reward banned peer: {err}"))
6✔
207
    }
7✔
208

209
    /// Potentially update a peer's bootstrap status.
210
    ///
211
    /// Per connection, the bootstrap status can be set once without
212
    /// incurring [punishment](Self::punish). After a cooldown period, the
213
    /// bootstrap status can be set again. The cooldown exists to prevent peers
214
    /// from forcing many write-lock acquisition of the global state lock.
215
    ///
216
    /// This method only acquires a write lock for the global state lock if
217
    /// necessary.
218
    ///
219
    /// # Locking:
220
    ///   * acquires `global_state_lock` for read
221
    ///   * might acquire `global_state_lock` for write
222
    async fn handle_bootstrap_status_message(&mut self, status: BootstrapStatus) -> Result<()> {
20✔
223
        const BOOTSTRAP_STATUS_UPDATE_COOLDOWN_PERIOD: Duration = Duration::from_secs(5 * 60);
224

225
        let peer_address = self.peer_address;
20✔
226
        let Some(connection_start_time) = self
20✔
227
            .global_state_lock
20✔
228
            .lock_guard()
20✔
229
            .await
20✔
230
            .net
231
            .peer_map
232
            .get(&peer_address)
20✔
233
            .map(|info| info.own_timestamp_connection_established)
20✔
234
        else {
UNCOV
235
            warn!("Peer {peer_address} not found in peer map when updating bootstrap status.");
×
UNCOV
236
            return Ok(()); // not great, not ban-worthy
×
237
        };
238
        let last_update_time = self
20✔
239
            .global_state_lock
20✔
240
            .lock_guard()
20✔
241
            .await
20✔
242
            .net
243
            .bootstrap_status
244
            .get(&peer_address)
20✔
245
            .map(|info| info.last_set)
20✔
246
            .unwrap_or(SystemTime::UNIX_EPOCH);
20✔
247

20✔
248
        if last_update_time < connection_start_time {
20✔
249
            debug!("Setting bootstrap status of peer {peer_address} to \"{status}\"");
20✔
250
            self.global_state_lock
20✔
251
                .lock_guard_mut()
20✔
252
                .await
20✔
253
                .net
254
                .bootstrap_status
255
                .insert(peer_address, BootstrapInfo::new(status));
20✔
256
            return Ok(());
20✔
UNCOV
257
        }
×
258

UNCOV
259
        let Ok(duration_since_last_update) = SystemTime::now().duration_since(last_update_time)
×
260
        else {
UNCOV
261
            warn!("Last bootstrap update of peer {peer_address} is in the future.");
×
UNCOV
262
            return Ok(()); // not great, not ban-worthy
×
263
        };
UNCOV
264
        if duration_since_last_update < BOOTSTRAP_STATUS_UPDATE_COOLDOWN_PERIOD {
×
UNCOV
265
            info!(
×
UNCOV
266
                "Punishing peer {peer_address} for bootstrap update within cooldown period. \
×
UNCOV
267
                Update received after {duration} seconds of last update, \
×
UNCOV
268
                cooldown period is {cooldown} seconds.",
×
UNCOV
269
                duration = duration_since_last_update.as_secs(),
×
UNCOV
270
                cooldown = BOOTSTRAP_STATUS_UPDATE_COOLDOWN_PERIOD.as_secs(),
×
271
            );
UNCOV
272
            self.punish(NegativePeerSanction::BootstrapStatusUpdateSpam)
×
UNCOV
273
                .await?;
×
UNCOV
274
            return Ok(());
×
UNCOV
275
        }
×
UNCOV
276

×
UNCOV
277
        debug!("Updating bootstrap status of peer {peer_address} to \"{status}\"");
×
UNCOV
278
        self.global_state_lock
×
UNCOV
279
            .lock_guard_mut()
×
UNCOV
280
            .await
×
281
            .net
282
            .bootstrap_status
UNCOV
283
            .insert(peer_address, BootstrapInfo::new(status));
×
UNCOV
284
        self.punish(NegativePeerSanction::BootstrapStatusUpdate)
×
UNCOV
285
            .await?;
×
286

287
        Ok(())
×
288
    }
20✔
289

290
    /// Construct a batch response, with blocks and their MMR membership proofs
291
    /// relative to a specified anchor.
292
    ///
293
    /// Returns `None` if the anchor has a lower leaf count than the blocks, or
294
    /// a block height of the response exceeds own tip height.
295
    async fn batch_response(
16✔
296
        state: &GlobalState,
16✔
297
        blocks: Vec<Block>,
16✔
298
        anchor: &MmrAccumulator,
16✔
299
    ) -> Option<Vec<(TransferBlock, MmrMembershipProof)>> {
16✔
300
        let own_tip_height = state.chain.light_state().header().height;
16✔
301
        let block_heights_match_anchor = blocks
16✔
302
            .iter()
16✔
303
            .all(|bl| bl.header().height < anchor.num_leafs().into());
46✔
304
        let block_heights_known = blocks.iter().all(|bl| bl.header().height <= own_tip_height);
46✔
305
        if !block_heights_match_anchor || !block_heights_known {
16✔
UNCOV
306
            let max_block_height = match blocks.iter().map(|bl| bl.header().height).max() {
×
UNCOV
307
                Some(height) => height.to_string(),
×
UNCOV
308
                None => "None".to_owned(),
×
309
            };
310

311
            debug!("max_block_height: {max_block_height}");
×
UNCOV
312
            debug!("own_tip_height: {own_tip_height}");
×
UNCOV
313
            debug!("anchor.num_leafs(): {}", anchor.num_leafs());
×
UNCOV
314
            debug!("block_heights_match_anchor: {block_heights_match_anchor}");
×
UNCOV
315
            debug!("block_heights_known: {block_heights_known}");
×
UNCOV
316
            return None;
×
317
        }
16✔
318

16✔
319
        let mut ret = vec![];
16✔
320
        for block in blocks {
62✔
321
            let mmr_mp = state
46✔
322
                .chain
46✔
323
                .archival_state()
46✔
324
                .archival_block_mmr
46✔
325
                .ammr()
46✔
326
                .prove_membership_relative_to_smaller_mmr(
46✔
327
                    block.header().height.into(),
46✔
328
                    anchor.num_leafs(),
46✔
329
                )
46✔
330
                .await;
46✔
331
            let block: TransferBlock = block.try_into().unwrap();
46✔
332
            ret.push((block, mmr_mp));
46✔
333
        }
334

335
        Some(ret)
16✔
336
    }
16✔
337

338
    /// Handle validation and send all blocks to the main task if they're all
339
    /// valid. Use with a list of blocks or a single block. When the
340
    /// `received_blocks` is a list, the parent of the `i+1`th block in the
341
    /// list is the `i`th block. The parent of element zero in this list is
342
    /// `parent_of_first_block`.
343
    ///
344
    /// # Return Value
345
    ///  - `Err` when the connection should be closed;
346
    ///  - `Ok(None)` if some block is invalid
347
    ///  - `Ok(None)` if the last block has insufficient cumulative PoW and we
348
    ///    are not syncing;
349
    ///  - `Ok(None)` if the last block has insufficient height and we are
350
    ///    syncing;
351
    ///  - `Ok(Some(block_height))` otherwise, referring to the block with the
352
    ///    highest height in the batch.
353
    ///
354
    /// A return value of Ok(Some(_)) means that the message was passed on to
355
    /// main loop.
356
    ///
357
    /// # Locking
358
    ///   * Acquires `global_state_lock` for write via `self.punish(..)` and
359
    ///     `self.reward(..)`.
360
    ///
361
    /// # Panics
362
    ///
363
    ///  - Panics if called with the empty list.
364
    async fn handle_blocks(
8✔
365
        &mut self,
8✔
366
        received_blocks: Vec<Block>,
8✔
367
        parent_of_first_block: Block,
8✔
368
    ) -> Result<Option<BlockHeight>> {
8✔
369
        debug!(
8✔
UNCOV
370
            "attempting to validate {} {}",
×
UNCOV
371
            received_blocks.len(),
×
UNCOV
372
            if received_blocks.len() == 1 {
×
UNCOV
373
                "block"
×
374
            } else {
UNCOV
375
                "blocks"
×
376
            }
377
        );
378
        let now = self.now();
8✔
379
        debug!("validating with respect to current timestamp {now}");
8✔
380
        let mut previous_block = &parent_of_first_block;
8✔
381
        for new_block in &received_blocks {
24✔
382
            let new_block_has_proof_of_work = new_block.has_proof_of_work(previous_block.header());
17✔
383
            debug!("new block has proof of work? {new_block_has_proof_of_work}");
17✔
384
            let new_block_is_valid = new_block.is_valid(previous_block, now).await;
17✔
385
            debug!("new block is valid? {new_block_is_valid}");
17✔
386
            if !new_block_has_proof_of_work {
17✔
387
                warn!(
1✔
UNCOV
388
                    "Received invalid proof-of-work for block of height {} from peer with IP {}",
×
UNCOV
389
                    new_block.kernel.header.height, self.peer_address
×
390
                );
391
                warn!("Difficulty is {}.", previous_block.kernel.header.difficulty);
1✔
392
                warn!(
1✔
UNCOV
393
                    "Proof of work should be {} (or more) but was [{}].",
×
UNCOV
394
                    previous_block.kernel.header.difficulty.target(),
×
UNCOV
395
                    new_block.hash().values().iter().join(", ")
×
396
                );
397
                self.punish(NegativePeerSanction::InvalidBlock((
1✔
398
                    new_block.kernel.header.height,
1✔
399
                    new_block.hash(),
1✔
400
                )))
1✔
401
                .await?;
1✔
402
                warn!("Failed to validate block due to insufficient PoW");
1✔
403
                return Ok(None);
1✔
404
            } else if !new_block_is_valid {
16✔
UNCOV
405
                warn!(
×
UNCOV
406
                    "Received invalid block of height {} from peer with IP {}",
×
UNCOV
407
                    new_block.kernel.header.height, self.peer_address
×
408
                );
UNCOV
409
                self.punish(NegativePeerSanction::InvalidBlock((
×
UNCOV
410
                    new_block.kernel.header.height,
×
UNCOV
411
                    new_block.hash(),
×
UNCOV
412
                )))
×
UNCOV
413
                .await?;
×
UNCOV
414
                warn!("Failed to validate block: invalid block");
×
UNCOV
415
                return Ok(None);
×
416
            }
16✔
417
            info!(
16✔
UNCOV
418
                "Block with height {} is valid. mined: {}",
×
UNCOV
419
                new_block.kernel.header.height,
×
UNCOV
420
                new_block.kernel.header.timestamp.standard_format()
×
421
            );
422

423
            previous_block = new_block;
16✔
424
        }
425

426
        // evaluate the fork choice rule
427
        debug!("Checking last block's canonicity ...");
7✔
428
        let last_block = received_blocks.last().unwrap();
7✔
429
        let is_canonical = self
7✔
430
            .global_state_lock
7✔
431
            .lock_guard()
7✔
432
            .await
7✔
433
            .incoming_block_is_more_canonical(last_block);
7✔
434
        let last_block_height = last_block.header().height;
7✔
435
        let sync_mode_active_and_have_new_champion = self
7✔
436
            .global_state_lock
7✔
437
            .lock_guard()
7✔
438
            .await
7✔
439
            .net
440
            .sync_anchor
441
            .as_ref()
7✔
442
            .is_some_and(|x| {
7✔
UNCOV
443
                x.champion
×
UNCOV
444
                    .is_none_or(|(height, _)| height < last_block_height)
×
445
            });
7✔
446
        if !is_canonical && !sync_mode_active_and_have_new_champion {
7✔
447
            warn!(
1✔
UNCOV
448
                "Received {} blocks from peer but incoming blocks are less \
×
UNCOV
449
            canonical than current tip, or current sync-champion.",
×
UNCOV
450
                received_blocks.len()
×
451
            );
452
            return Ok(None);
1✔
453
        }
6✔
454

6✔
455
        // Send the new blocks to the main task which handles the state update
6✔
456
        // and storage to the database.
6✔
457
        let number_of_received_blocks = received_blocks.len();
6✔
458
        self.to_main_tx
6✔
459
            .send(PeerTaskToMain::NewBlocks(received_blocks))
6✔
460
            .await?;
6✔
461
        info!(
6✔
UNCOV
462
            "Updated block info by block from peer. block height {}",
×
463
            last_block_height
464
        );
465

466
        // Valuable, new, hard-to-produce information. Reward peer.
467
        self.reward(PositivePeerSanction::ValidBlocks(number_of_received_blocks))
6✔
468
            .await?;
6✔
469

470
        Ok(Some(last_block_height))
6✔
471
    }
8✔
472

473
    /// Take a single block received from a peer and (attempt to) find a path
474
    /// between the received block and a common ancestor stored in the blocks
475
    /// database.
476
    ///
477
    /// This function attempts to find the parent of the received block, either
478
    /// by searching the database or by requesting it from a peer.
479
    ///  - If the parent is not stored, it is requested from the peer and the
480
    ///    received block is pushed to the fork reconciliation list for later
481
    ///    handling by this function. The fork reconciliation list starts out
482
    ///    empty, but grows as more parents are requested and transmitted.
483
    ///  - If the parent is found in the database, a) block handling continues:
484
    ///    the entire list of fork reconciliation blocks are passed down the
485
    ///    pipeline, potentially leading to a state update; and b) the fork
486
    ///    reconciliation list is cleared.
487
    ///
488
    /// Locking:
489
    ///   * Acquires `global_state_lock` for write via `self.punish(..)` and
490
    ///     `self.reward(..)`.
491
    async fn try_ensure_path<S>(
20✔
492
        &mut self,
20✔
493
        received_block: Box<Block>,
20✔
494
        peer: &mut S,
20✔
495
        peer_state: &mut MutablePeerState,
20✔
496
    ) -> Result<()>
20✔
497
    where
20✔
498
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
20✔
499
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
20✔
500
        <S as TryStream>::Error: std::error::Error,
20✔
501
    {
20✔
502
        // Does the received block match the fork reconciliation list?
503
        let received_block_matches_fork_reconciliation_list = if let Some(successor) =
20✔
504
            peer_state.fork_reconciliation_blocks.last()
20✔
505
        {
506
            let valid = successor
10✔
507
                .is_valid(received_block.as_ref(), self.now())
10✔
508
                .await;
10✔
509
            if !valid {
10✔
UNCOV
510
                warn!(
×
UNCOV
511
                        "Fork reconciliation failed after receiving {} blocks: successor of received block is invalid",
×
UNCOV
512
                        peer_state.fork_reconciliation_blocks.len() + 1
×
513
                    );
514
            }
10✔
515
            valid
10✔
516
        } else {
517
            true
10✔
518
        };
519

520
        // Are we running out of RAM?
521
        let too_many_blocks = peer_state.fork_reconciliation_blocks.len() + 1
20✔
522
            >= self.global_state_lock.cli().sync_mode_threshold;
20✔
523
        if too_many_blocks {
20✔
524
            warn!(
1✔
UNCOV
525
                "Fork reconciliation failed after receiving {} blocks: block count exceeds sync mode threshold",
×
UNCOV
526
                peer_state.fork_reconciliation_blocks.len() + 1
×
527
            );
528
        }
19✔
529

530
        // Block mismatch or too many blocks: abort!
531
        if !received_block_matches_fork_reconciliation_list || too_many_blocks {
20✔
532
            self.punish(NegativePeerSanction::ForkResolutionError((
1✔
533
                received_block.header().height,
1✔
534
                peer_state.fork_reconciliation_blocks.len() as u16,
1✔
535
                received_block.hash(),
1✔
536
            )))
1✔
537
            .await?;
1✔
538
            peer_state.fork_reconciliation_blocks = vec![];
1✔
539
            return Ok(());
1✔
540
        }
19✔
541

19✔
542
        // otherwise, append
19✔
543
        peer_state.fork_reconciliation_blocks.push(*received_block);
19✔
544

19✔
545
        // Try fetch parent
19✔
546
        let received_block_header = *peer_state
19✔
547
            .fork_reconciliation_blocks
19✔
548
            .last()
19✔
549
            .unwrap()
19✔
550
            .header();
19✔
551

19✔
552
        let parent_digest = received_block_header.prev_block_digest;
19✔
553
        let parent_height = received_block_header.height.previous()
19✔
554
            .expect("transferred block must have previous height because genesis block cannot be transferred");
19✔
555
        debug!("Try ensure path: fetching parent block");
19✔
556
        let parent_block = self
19✔
557
            .global_state_lock
19✔
558
            .lock_guard()
19✔
559
            .await
19✔
560
            .chain
561
            .archival_state()
19✔
562
            .get_block(parent_digest)
19✔
563
            .await?;
19✔
564
        debug!(
19✔
UNCOV
565
            "Completed parent block fetching from DB: {}",
×
UNCOV
566
            if parent_block.is_some() {
×
UNCOV
567
                "found".to_string()
×
568
            } else {
UNCOV
569
                "not found".to_string()
×
570
            }
571
        );
572

573
        // If parent is not known (but not genesis) request it.
574
        let Some(parent_block) = parent_block else {
19✔
575
            if parent_height.is_genesis() {
11✔
576
                peer_state.fork_reconciliation_blocks.clear();
1✔
577
                self.punish(NegativePeerSanction::DifferentGenesis).await?;
1✔
UNCOV
578
                return Ok(());
×
579
            }
10✔
580
            info!(
10✔
UNCOV
581
                "Parent not known: Requesting previous block with height {} from peer",
×
582
                parent_height
583
            );
584

585
            peer.send(PeerMessage::BlockRequestByHash(parent_digest))
10✔
586
                .await?;
10✔
587

588
            return Ok(());
10✔
589
        };
590

591
        // We want to treat the received fork reconciliation blocks (plus the
592
        // received block) in reverse order, from oldest to newest, because
593
        // they were requested from high to low block height.
594
        let mut new_blocks = peer_state.fork_reconciliation_blocks.clone();
8✔
595
        new_blocks.reverse();
8✔
596

8✔
597
        // Reset the fork resolution state since we got all the way back to a
8✔
598
        // block that we have.
8✔
599
        let fork_reconciliation_event = !peer_state.fork_reconciliation_blocks.is_empty();
8✔
600
        peer_state.fork_reconciliation_blocks.clear();
8✔
601

602
        if let Some(new_block_height) = self.handle_blocks(new_blocks, parent_block).await? {
8✔
603
            // If `BlockNotification` was received during a block reconciliation
604
            // event, then the peer might have one (or more (unlikely)) blocks
605
            // that we do not have. We should thus request those blocks.
606
            if fork_reconciliation_event
6✔
607
                && peer_state.highest_shared_block_height > new_block_height
6✔
608
            {
609
                peer.send(PeerMessage::BlockRequestByHeight(
1✔
610
                    peer_state.highest_shared_block_height,
1✔
611
                ))
1✔
612
                .await?;
1✔
613
            }
5✔
614
        }
2✔
615

616
        Ok(())
8✔
617
    }
20✔
618

619
    /// Handle peer messages and returns Ok(true) if connection should be closed.
620
    /// Connection should also be closed if an error is returned.
621
    /// Otherwise, returns OK(false).
622
    ///
623
    /// Locking:
624
    ///   * Acquires `global_state_lock` for read.
625
    ///   * Acquires `global_state_lock` for write via `self.punish(..)` and
626
    ///     `self.reward(..)`.
627
    async fn handle_peer_message<S>(
129✔
628
        &mut self,
129✔
629
        msg: PeerMessage,
129✔
630
        peer: &mut S,
129✔
631
        peer_state_info: &mut MutablePeerState,
129✔
632
    ) -> Result<bool>
129✔
633
    where
129✔
634
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
129✔
635
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
129✔
636
        <S as TryStream>::Error: std::error::Error,
129✔
637
    {
129✔
638
        debug!(
129✔
UNCOV
639
            "Received {} from peer {}",
×
UNCOV
640
            msg.get_type(),
×
641
            self.peer_address
642
        );
643
        match msg {
129✔
644
            PeerMessage::Bye => {
645
                // Note that the current peer is not removed from the global_state.peer_map here
646
                // but that this is done by the caller.
647
                info!("Got bye. Closing connection to peer");
58✔
648
                Ok(DISCONNECT_CONNECTION)
58✔
649
            }
650
            PeerMessage::PeerListRequest => {
651
                let peer_info = {
2✔
652
                    log_slow_scope!(fn_name!() + "::PeerMessage::PeerListRequest");
2✔
653

654
                    // We are interested in the address on which peers accept ingoing connections,
655
                    // not in the address in which they are connected to us. We are only interested in
656
                    // peers that accept incoming connections.
657
                    let mut peer_info: Vec<(SocketAddr, u128)> = self
2✔
658
                        .global_state_lock
2✔
659
                        .lock_guard()
2✔
660
                        .await
2✔
661
                        .net
662
                        .peer_map
663
                        .values()
2✔
664
                        .filter(|peer_info| peer_info.listen_address().is_some())
5✔
665
                        .take(MAX_PEER_LIST_LENGTH) // limit length of response
2✔
666
                        .map(|peer_info| {
5✔
667
                            (
5✔
668
                                // unwrap is safe bc of above `filter`
5✔
669
                                peer_info.listen_address().unwrap(),
5✔
670
                                peer_info.instance_id(),
5✔
671
                            )
5✔
672
                        })
5✔
673
                        .collect();
2✔
674

2✔
675
                    // We sort the returned list, so this function is easier to test
2✔
676
                    peer_info.sort_by_cached_key(|x| x.0);
5✔
677
                    peer_info
2✔
678
                };
2✔
679

2✔
680
                debug!("Responding with: {:?}", peer_info);
2✔
681
                peer.send(PeerMessage::PeerListResponse(peer_info)).await?;
2✔
682
                Ok(KEEP_CONNECTION_ALIVE)
2✔
683
            }
UNCOV
684
            PeerMessage::PeerListResponse(peers) => {
×
UNCOV
685
                log_slow_scope!(fn_name!() + "::PeerMessage::PeerListResponse");
×
UNCOV
686

×
UNCOV
687
                if peers.len() > MAX_PEER_LIST_LENGTH {
×
UNCOV
688
                    self.punish(NegativePeerSanction::FloodPeerListResponse)
×
UNCOV
689
                        .await?;
×
UNCOV
690
                }
×
UNCOV
691
                self.to_main_tx
×
UNCOV
692
                    .send(PeerTaskToMain::PeerDiscoveryAnswer(
×
UNCOV
693
                        peers,
×
UNCOV
694
                        self.distance + 1,
×
UNCOV
695
                    ))
×
UNCOV
696
                    .await?;
×
UNCOV
697
                Ok(KEEP_CONNECTION_ALIVE)
×
698
            }
699
            PeerMessage::BlockNotificationRequest => {
700
                debug!("Got BlockNotificationRequest");
×
701

UNCOV
702
                peer.send(PeerMessage::BlockNotification(
×
UNCOV
703
                    self.global_state_lock
×
UNCOV
704
                        .lock_guard()
×
UNCOV
705
                        .await
×
706
                        .chain
707
                        .light_state()
×
708
                        .into(),
×
709
                ))
×
710
                .await?;
×
711

UNCOV
712
                Ok(KEEP_CONNECTION_ALIVE)
×
713
            }
714
            PeerMessage::BlockNotification(block_notification) => {
4✔
715
                const SYNC_CHALLENGE_COOLDOWN: Timestamp = Timestamp::minutes(10);
716

717
                let (tip_header, sync_anchor_is_set) = {
4✔
718
                    let state = self.global_state_lock.lock_guard().await;
4✔
719
                    (
4✔
720
                        *state.chain.light_state().header(),
4✔
721
                        state.net.sync_anchor.is_some(),
4✔
722
                    )
4✔
723
                };
4✔
724
                debug!(
4✔
UNCOV
725
                    "Got BlockNotification of height {}. Own height is {}",
×
726
                    block_notification.height, tip_header.height
727
                );
728

729
                let sync_mode_threshold = self.global_state_lock.cli().sync_mode_threshold;
4✔
730
                let now = self.now();
4✔
731
                let time_since_latest_successful_challenge = peer_state_info
4✔
732
                    .successful_sync_challenge_response_time
4✔
733
                    .map(|then| now - then);
4✔
734
                let cooldown_expired = time_since_latest_successful_challenge
4✔
735
                    .is_none_or(|time_passed| time_passed > SYNC_CHALLENGE_COOLDOWN);
4✔
736
                let exceeds_sync_mode_threshold = GlobalState::sync_mode_threshold_stateless(
4✔
737
                    &tip_header,
4✔
738
                    block_notification.height,
4✔
739
                    block_notification.cumulative_proof_of_work,
4✔
740
                    sync_mode_threshold,
4✔
741
                );
4✔
742
                if cooldown_expired && exceeds_sync_mode_threshold {
4✔
743
                    debug!("sync mode criterion satisfied.");
1✔
744

745
                    if peer_state_info.sync_challenge.is_some() {
1✔
UNCOV
746
                        warn!("Cannot launch new sync challenge because one is already on-going.");
×
747
                        return Ok(KEEP_CONNECTION_ALIVE);
×
748
                    }
1✔
749

1✔
750
                    info!(
1✔
UNCOV
751
                        "Peer indicates block which satisfies sync mode criterion, issuing challenge."
×
752
                    );
753
                    let challenge = SyncChallenge::generate(
1✔
754
                        &block_notification,
1✔
755
                        tip_header.height,
1✔
756
                        self.rng.random(),
1✔
757
                    );
1✔
758
                    peer_state_info.sync_challenge = Some(IssuedSyncChallenge::new(
1✔
759
                        challenge,
1✔
760
                        block_notification.cumulative_proof_of_work,
1✔
761
                        self.now(),
1✔
762
                    ));
1✔
763

1✔
764
                    debug!("sending challenge ...");
1✔
765
                    peer.send(PeerMessage::SyncChallenge(challenge)).await?;
1✔
766

767
                    return Ok(KEEP_CONNECTION_ALIVE);
1✔
768
                }
3✔
769

3✔
770
                peer_state_info.highest_shared_block_height = block_notification.height;
3✔
771
                let block_is_new = tip_header.cumulative_proof_of_work
3✔
772
                    < block_notification.cumulative_proof_of_work;
3✔
773

3✔
774
                debug!("block_is_new: {}", block_is_new);
3✔
775

776
                if block_is_new
3✔
777
                    && peer_state_info.fork_reconciliation_blocks.is_empty()
3✔
778
                    && !sync_anchor_is_set
2✔
779
                    && !exceeds_sync_mode_threshold
2✔
780
                {
781
                    debug!(
1✔
UNCOV
782
                        "sending BlockRequestByHeight to peer for block with height {}",
×
783
                        block_notification.height
784
                    );
785
                    peer.send(PeerMessage::BlockRequestByHeight(block_notification.height))
1✔
786
                        .await?;
1✔
787
                } else {
788
                    debug!(
2✔
UNCOV
789
                        "ignoring peer block. height {}. new: {}, reconciling_fork: {}",
×
UNCOV
790
                        block_notification.height,
×
UNCOV
791
                        block_is_new,
×
UNCOV
792
                        !peer_state_info.fork_reconciliation_blocks.is_empty()
×
793
                    );
794
                }
795

796
                Ok(KEEP_CONNECTION_ALIVE)
3✔
797
            }
798
            PeerMessage::SyncChallenge(sync_challenge) => {
2✔
UNCOV
799
                let response = {
×
800
                    log_slow_scope!(fn_name!() + "::PeerMessage::SyncChallenge");
2✔
801

2✔
802
                    info!("Got sync challenge from {}", self.peer_address.ip());
2✔
803

804
                    let response = self
2✔
805
                        .global_state_lock
2✔
806
                        .lock_guard()
2✔
807
                        .await
2✔
808
                        .response_to_sync_challenge(sync_challenge)
2✔
809
                        .await;
2✔
810

811
                    match response {
2✔
812
                        Ok(resp) => resp,
×
813
                        Err(e) => {
2✔
814
                            warn!("could not generate sync challenge response:\n{e}");
2✔
815
                            self.punish(NegativePeerSanction::InvalidSyncChallenge)
2✔
816
                                .await?;
2✔
817
                            return Ok(KEEP_CONNECTION_ALIVE);
2✔
818
                        }
819
                    }
820
                };
821

UNCOV
822
                info!(
×
UNCOV
823
                    "Responding to sync challenge from {}",
×
UNCOV
824
                    self.peer_address.ip()
×
825
                );
826
                peer.send(PeerMessage::SyncChallengeResponse(Box::new(response)))
×
827
                    .await?;
×
828

UNCOV
829
                Ok(KEEP_CONNECTION_ALIVE)
×
830
            }
831
            PeerMessage::SyncChallengeResponse(challenge_response) => {
1✔
832
                const SYNC_RESPONSE_TIMEOUT: Timestamp = Timestamp::seconds(45);
833

834
                log_slow_scope!(fn_name!() + "::PeerMessage::SyncChallengeResponse");
1✔
835
                info!(
1✔
UNCOV
836
                    "Got sync challenge response from {}",
×
UNCOV
837
                    self.peer_address.ip()
×
838
                );
839

840
                // The purpose of the sync challenge and sync challenge response
841
                // is to avoid going into sync mode based on a malicious target
842
                // fork. Instead of verifying that the claimed proof-of-work
843
                // number is correct (which would require sending and verifying,
844
                // at least, all blocks between luca (whatever that is) and the
845
                // claimed tip), we use a heuristic that requires less
846
                // communication and less verification work. The downside of
847
                // using a heuristic here is a nonzero false positive and false
848
                // negative rate. Note that the false negative event
849
                // (maliciously sending someone into sync mode based on a bogus
850
                // fork) still requires a significant amount of work from the
851
                // attacker, *in addition* to being lucky. Also, down the line
852
                // succinctness (and more specifically, recursive block
853
                // validation) obviates this entire subprotocol.
854

855
                // Did we issue a challenge?
856
                let Some(issued_challenge) = peer_state_info.sync_challenge else {
1✔
UNCOV
857
                    warn!("Sync challenge response was not prompted.");
×
UNCOV
858
                    self.punish(NegativePeerSanction::UnexpectedSyncChallengeResponse)
×
UNCOV
859
                        .await?;
×
860
                    return Ok(KEEP_CONNECTION_ALIVE);
×
861
                };
862

863
                // Reset the challenge, regardless of the response's success.
864
                peer_state_info.sync_challenge = None;
1✔
865

1✔
866
                // Does response match issued challenge?
1✔
867
                if !challenge_response.matches(issued_challenge) {
1✔
UNCOV
868
                    self.punish(NegativePeerSanction::InvalidSyncChallengeResponse)
×
UNCOV
869
                        .await?;
×
UNCOV
870
                    return Ok(KEEP_CONNECTION_ALIVE);
×
871
                }
1✔
872

1✔
873
                // Does response verify?
1✔
874
                let claimed_tip_height = challenge_response.tip.header.height;
1✔
875
                let now = self.now();
1✔
876
                if !challenge_response.is_valid(now).await {
1✔
UNCOV
877
                    self.punish(NegativePeerSanction::InvalidSyncChallengeResponse)
×
UNCOV
878
                        .await?;
×
UNCOV
879
                    return Ok(KEEP_CONNECTION_ALIVE);
×
880
                }
1✔
881

882
                // Does cumulative proof-of-work evolve reasonably?
883
                let own_tip_header = *self
1✔
884
                    .global_state_lock
1✔
885
                    .lock_guard()
1✔
886
                    .await
1✔
887
                    .chain
888
                    .light_state()
1✔
889
                    .header();
1✔
890
                if !challenge_response
1✔
891
                    .check_pow(self.global_state_lock.cli().network, own_tip_header.height)
1✔
892
                {
UNCOV
893
                    self.punish(NegativePeerSanction::FishyPowEvolutionChallengeResponse)
×
UNCOV
894
                        .await?;
×
UNCOV
895
                    return Ok(KEEP_CONNECTION_ALIVE);
×
896
                }
1✔
897

1✔
898
                // Is there some specific (*i.e.*, not aggregate) proof of work?
1✔
899
                if !challenge_response.check_difficulty(own_tip_header.difficulty) {
1✔
UNCOV
900
                    self.punish(NegativePeerSanction::FishyDifficultiesChallengeResponse)
×
UNCOV
901
                        .await?;
×
UNCOV
902
                    return Ok(KEEP_CONNECTION_ALIVE);
×
903
                }
1✔
904

1✔
905
                // Did it come in time?
1✔
906
                if now - issued_challenge.issued_at > SYNC_RESPONSE_TIMEOUT {
1✔
UNCOV
907
                    self.punish(NegativePeerSanction::TimedOutSyncChallengeResponse)
×
UNCOV
908
                        .await?;
×
UNCOV
909
                    return Ok(KEEP_CONNECTION_ALIVE);
×
910
                }
1✔
911

1✔
912
                info!("Successful sync challenge response; relaying peer tip info to main loop.");
1✔
913
                peer_state_info.successful_sync_challenge_response_time = Some(now);
1✔
914

1✔
915
                let mut sync_mmra_anchor = challenge_response.tip.body.block_mmr_accumulator;
1✔
916
                sync_mmra_anchor.append(issued_challenge.challenge.tip_digest);
1✔
917

1✔
918
                // Inform main loop
1✔
919
                self.to_main_tx
1✔
920
                    .send(PeerTaskToMain::AddPeerMaxBlockHeight {
1✔
921
                        peer_address: self.peer_address,
1✔
922
                        claimed_height: claimed_tip_height,
1✔
923
                        claimed_cumulative_pow: issued_challenge.accumulated_pow,
1✔
924
                        claimed_block_mmra: sync_mmra_anchor,
1✔
925
                    })
1✔
926
                    .await?;
1✔
927

928
                Ok(KEEP_CONNECTION_ALIVE)
1✔
929
            }
930
            PeerMessage::BlockRequestByHash(block_digest) => {
×
931
                match self
×
UNCOV
932
                    .global_state_lock
×
UNCOV
933
                    .lock_guard()
×
UNCOV
934
                    .await
×
935
                    .chain
UNCOV
936
                    .archival_state()
×
937
                    .get_block(block_digest)
×
938
                    .await?
×
939
                {
940
                    None => {
941
                        // TODO: Consider punishing here
942
                        warn!("Peer requested unknown block with hash {}", block_digest);
×
UNCOV
943
                        Ok(KEEP_CONNECTION_ALIVE)
×
944
                    }
UNCOV
945
                    Some(b) => {
×
UNCOV
946
                        peer.send(PeerMessage::Block(Box::new(b.try_into().unwrap())))
×
UNCOV
947
                            .await?;
×
UNCOV
948
                        Ok(KEEP_CONNECTION_ALIVE)
×
949
                    }
950
                }
951
            }
952
            PeerMessage::BlockRequestByHeight(block_height) => {
4✔
953
                let block_response = {
3✔
954
                    log_slow_scope!(fn_name!() + "::PeerMessage::BlockRequestByHeight");
4✔
955

4✔
956
                    debug!("Got BlockRequestByHeight of height {}", block_height);
4✔
957

958
                    let canonical_block_digest = self
4✔
959
                        .global_state_lock
4✔
960
                        .lock_guard()
4✔
961
                        .await
4✔
962
                        .chain
963
                        .archival_state()
4✔
964
                        .archival_block_mmr
4✔
965
                        .ammr()
4✔
966
                        .try_get_leaf(block_height.into())
4✔
967
                        .await;
4✔
968

969
                    let canonical_block_digest = match canonical_block_digest {
4✔
970
                        None => {
971
                            let own_tip_height = self
1✔
972
                                .global_state_lock
1✔
973
                                .lock_guard()
1✔
974
                                .await
1✔
975
                                .chain
976
                                .light_state()
1✔
977
                                .header()
1✔
978
                                .height;
1✔
979
                            warn!("Got block request by height ({block_height}) for unknown block. Own tip height is {own_tip_height}.");
1✔
980
                            self.punish(NegativePeerSanction::BlockRequestUnknownHeight)
1✔
981
                                .await?;
1✔
982

983
                            return Ok(KEEP_CONNECTION_ALIVE);
1✔
984
                        }
985
                        Some(digest) => digest,
3✔
986
                    };
987

988
                    let canonical_chain_block: Block = self
3✔
989
                        .global_state_lock
3✔
990
                        .lock_guard()
3✔
991
                        .await
3✔
992
                        .chain
993
                        .archival_state()
3✔
994
                        .get_block(canonical_block_digest)
3✔
995
                        .await?
3✔
996
                        .unwrap();
3✔
997

3✔
998
                    PeerMessage::Block(Box::new(canonical_chain_block.try_into().unwrap()))
3✔
999
                };
3✔
1000

3✔
1001
                debug!("Sending block");
3✔
1002
                peer.send(block_response).await?;
3✔
1003
                debug!("Sent block");
3✔
1004
                Ok(KEEP_CONNECTION_ALIVE)
3✔
1005
            }
1006
            PeerMessage::Block(t_block) => {
20✔
1007
                log_slow_scope!(fn_name!() + "::PeerMessage::Block");
20✔
1008

20✔
1009
                info!(
20✔
UNCOV
1010
                    "Got new block from peer {}, height {}, mined {}",
×
UNCOV
1011
                    self.peer_address,
×
UNCOV
1012
                    t_block.header.height,
×
UNCOV
1013
                    t_block.header.timestamp.standard_format()
×
1014
                );
1015
                let new_block_height = t_block.header.height;
20✔
1016

1017
                let block = match Block::try_from(*t_block) {
20✔
1018
                    Ok(block) => Box::new(block),
20✔
UNCOV
1019
                    Err(e) => {
×
UNCOV
1020
                        warn!("Peer sent invalid block: {e:?}");
×
UNCOV
1021
                        self.punish(NegativePeerSanction::InvalidTransferBlock)
×
UNCOV
1022
                            .await?;
×
1023

UNCOV
1024
                        return Ok(KEEP_CONNECTION_ALIVE);
×
1025
                    }
1026
                };
1027

1028
                // Update the value for the highest known height that peer possesses iff
1029
                // we are not in a fork reconciliation state.
1030
                if peer_state_info.fork_reconciliation_blocks.is_empty() {
20✔
1031
                    peer_state_info.highest_shared_block_height = new_block_height;
10✔
1032
                }
10✔
1033

1034
                self.try_ensure_path(block, peer, peer_state_info).await?;
20✔
1035

1036
                // Reward happens as part of `try_ensure_path`
1037

1038
                Ok(KEEP_CONNECTION_ALIVE)
19✔
1039
            }
1040
            PeerMessage::BlockRequestBatch(BlockRequestBatch {
1041
                known_blocks,
8✔
1042
                max_response_len,
8✔
1043
                anchor,
8✔
1044
            }) => {
8✔
1045
                debug!(
8✔
UNCOV
1046
                    "Received BlockRequestBatch from peer {}, max_response_len: {max_response_len}",
×
1047
                    self.peer_address
1048
                );
1049

1050
                if known_blocks.len() > MAX_NUM_DIGESTS_IN_BATCH_REQUEST {
8✔
UNCOV
1051
                    self.punish(NegativePeerSanction::BatchBlocksRequestTooManyDigests)
×
UNCOV
1052
                        .await?;
×
1053

UNCOV
1054
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1055
                }
8✔
1056

1057
                // The last block in the list of the peers known block is the
1058
                // earliest block, block with lowest height, the peer has
1059
                // requested. If it does not belong to canonical chain, none of
1060
                // the later will. So we can do an early abort in that case.
1061
                let least_preferred = match known_blocks.last() {
8✔
1062
                    Some(least_preferred) => *least_preferred,
8✔
1063
                    None => {
UNCOV
1064
                        self.punish(NegativePeerSanction::BatchBlocksRequestEmpty)
×
UNCOV
1065
                            .await?;
×
1066

UNCOV
1067
                        return Ok(KEEP_CONNECTION_ALIVE);
×
1068
                    }
1069
                };
1070

1071
                let state = self.global_state_lock.lock_guard().await;
8✔
1072
                let block_mmr_num_leafs = state.chain.light_state().header().height.next().into();
8✔
1073
                let luca_is_known = state
8✔
1074
                    .chain
8✔
1075
                    .archival_state()
8✔
1076
                    .block_belongs_to_canonical_chain(least_preferred)
8✔
1077
                    .await;
8✔
1078
                if !luca_is_known || anchor.num_leafs() > block_mmr_num_leafs {
8✔
UNCOV
1079
                    drop(state);
×
UNCOV
1080
                    self.punish(NegativePeerSanction::BatchBlocksUnknownRequest)
×
UNCOV
1081
                        .await?;
×
UNCOV
1082
                    peer.send(PeerMessage::UnableToSatisfyBatchRequest).await?;
×
1083

UNCOV
1084
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1085
                }
8✔
1086

1087
                // Happy case: At least *one* of the blocks referenced by peer
1088
                // is known to us.
1089
                let first_block_in_response = {
8✔
1090
                    let mut first_block_in_response: Option<BlockHeight> = None;
8✔
1091
                    for block_digest in known_blocks {
10✔
1092
                        if state
10✔
1093
                            .chain
10✔
1094
                            .archival_state()
10✔
1095
                            .block_belongs_to_canonical_chain(block_digest)
10✔
1096
                            .await
10✔
1097
                        {
1098
                            let height = state
8✔
1099
                                .chain
8✔
1100
                                .archival_state()
8✔
1101
                                .get_block_header(block_digest)
8✔
1102
                                .await
8✔
1103
                                .unwrap()
8✔
1104
                                .height;
8✔
1105
                            first_block_in_response = Some(height);
8✔
1106
                            debug!(
8✔
1107
                                "Found block in canonical chain for batch response: {}",
×
1108
                                block_digest
1109
                            );
1110
                            break;
8✔
1111
                        }
2✔
1112
                    }
1113

1114
                    first_block_in_response
8✔
1115
                        .expect("existence of LUCA should have been established already.")
8✔
1116
                };
8✔
1117

8✔
1118
                debug!(
8✔
UNCOV
1119
                    "Peer's most preferred block has height {first_block_in_response}.\
×
UNCOV
1120
                 Now building response from that height."
×
1121
                );
1122

1123
                // Get the relevant blocks, at most batch-size many, descending from the
1124
                // peer's (alleged) most canonical block. Don't exceed `max_response_len`
1125
                // or `STANDARD_BLOCK_BATCH_SIZE` number of blocks in response.
1126
                let max_response_len = cmp::min(
8✔
1127
                    max_response_len,
8✔
1128
                    self.global_state_lock.cli().sync_mode_threshold,
8✔
1129
                );
8✔
1130
                let max_response_len = cmp::max(max_response_len, MINIMUM_BLOCK_BATCH_SIZE);
8✔
1131
                let max_response_len = cmp::min(max_response_len, STANDARD_BLOCK_BATCH_SIZE);
8✔
1132

8✔
1133
                let mut digests_of_returned_blocks = Vec::with_capacity(max_response_len);
8✔
1134
                let response_start_height: u64 = first_block_in_response.into();
8✔
1135
                let mut i: u64 = 1;
8✔
1136
                while digests_of_returned_blocks.len() < max_response_len {
31✔
1137
                    let block_height = response_start_height + i;
31✔
1138
                    match state
31✔
1139
                        .chain
31✔
1140
                        .archival_state()
31✔
1141
                        .archival_block_mmr
31✔
1142
                        .ammr()
31✔
1143
                        .try_get_leaf(block_height)
31✔
1144
                        .await
31✔
1145
                    {
1146
                        Some(digest) => {
23✔
1147
                            digests_of_returned_blocks.push(digest);
23✔
1148
                        }
23✔
1149
                        None => break,
8✔
1150
                    }
1151
                    i += 1;
23✔
1152
                }
1153

1154
                let mut returned_blocks: Vec<Block> =
8✔
1155
                    Vec::with_capacity(digests_of_returned_blocks.len());
8✔
1156
                for block_digest in digests_of_returned_blocks {
31✔
1157
                    let block = state
23✔
1158
                        .chain
23✔
1159
                        .archival_state()
23✔
1160
                        .get_block(block_digest)
23✔
1161
                        .await?
23✔
1162
                        .unwrap();
23✔
1163
                    returned_blocks.push(block);
23✔
1164
                }
1165

1166
                let response = Self::batch_response(&state, returned_blocks, &anchor).await;
8✔
1167

1168
                // issue 457. do not hold lock across a peer.send(), nor self.punish()
1169
                drop(state);
8✔
1170

1171
                let Some(response) = response else {
8✔
UNCOV
1172
                    warn!("Unable to satisfy batch-block request");
×
UNCOV
1173
                    self.punish(NegativePeerSanction::BatchBlocksUnknownRequest)
×
1174
                        .await?;
×
1175
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1176
                };
1177

1178
                debug!("Returning {} blocks in batch response", response.len());
8✔
1179

1180
                let response = PeerMessage::BlockResponseBatch(response);
8✔
1181
                peer.send(response).await?;
8✔
1182

1183
                Ok(KEEP_CONNECTION_ALIVE)
8✔
1184
            }
1185
            PeerMessage::BlockResponseBatch(authenticated_blocks) => {
×
1186
                log_slow_scope!(fn_name!() + "::PeerMessage::BlockResponseBatch");
×
UNCOV
1187

×
UNCOV
1188
                debug!(
×
UNCOV
1189
                    "handling block response batch with {} blocks",
×
1190
                    authenticated_blocks.len()
×
1191
                );
1192

1193
                // (Alan:) why is there even a minimum?
UNCOV
1194
                if authenticated_blocks.len() < MINIMUM_BLOCK_BATCH_SIZE {
×
1195
                    warn!("Got smaller batch response than allowed");
×
UNCOV
1196
                    self.punish(NegativePeerSanction::TooShortBlockBatch)
×
UNCOV
1197
                        .await?;
×
1198
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1199
                }
×
1200

1201
                // Verify that we are in fact in syncing mode
1202
                // TODO: Separate peer messages into those allowed under syncing
1203
                // and those that are not
1204
                let Some(sync_anchor) = self
×
UNCOV
1205
                    .global_state_lock
×
UNCOV
1206
                    .lock_guard()
×
1207
                    .await
×
1208
                    .net
1209
                    .sync_anchor
1210
                    .clone()
×
1211
                else {
1212
                    warn!("Received a batch of blocks without being in syncing mode");
×
1213
                    self.punish(NegativePeerSanction::ReceivedBatchBlocksOutsideOfSync)
×
UNCOV
1214
                        .await?;
×
UNCOV
1215
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1216
                };
1217

1218
                // Verify that the response matches the current state
1219
                // We get the latest block from the DB here since this message is
1220
                // only valid for archival nodes.
1221
                let (first_block, _) = &authenticated_blocks[0];
×
1222
                let first_blocks_parent_digest: Digest = first_block.header.prev_block_digest;
×
1223
                let most_canonical_own_block_match: Option<Block> = self
×
UNCOV
1224
                    .global_state_lock
×
UNCOV
1225
                    .lock_guard()
×
UNCOV
1226
                    .await
×
1227
                    .chain
UNCOV
1228
                    .archival_state()
×
1229
                    .get_block(first_blocks_parent_digest)
×
1230
                    .await
×
1231
                    .expect("Block lookup must succeed");
×
1232
                let most_canonical_own_block_match: Block = match most_canonical_own_block_match {
×
UNCOV
1233
                    Some(block) => block,
×
1234
                    None => {
UNCOV
1235
                        warn!("Got batch response with invalid start block");
×
UNCOV
1236
                        self.punish(NegativePeerSanction::BatchBlocksInvalidStartHeight)
×
UNCOV
1237
                            .await?;
×
UNCOV
1238
                        return Ok(KEEP_CONNECTION_ALIVE);
×
1239
                    }
1240
                };
1241

1242
                // Convert all blocks to Block objects
UNCOV
1243
                debug!(
×
UNCOV
1244
                    "Found own block of height {} to match received batch",
×
UNCOV
1245
                    most_canonical_own_block_match.kernel.header.height
×
1246
                );
UNCOV
1247
                let mut received_blocks = vec![];
×
UNCOV
1248
                for (t_block, membership_proof) in authenticated_blocks {
×
1249
                    let Ok(block) = Block::try_from(t_block) else {
×
1250
                        warn!("Received invalid transfer block from peer");
×
1251
                        self.punish(NegativePeerSanction::InvalidTransferBlock)
×
1252
                            .await?;
×
UNCOV
1253
                        return Ok(KEEP_CONNECTION_ALIVE);
×
1254
                    };
1255

UNCOV
1256
                    if !membership_proof.verify(
×
1257
                        block.header().height.into(),
×
1258
                        block.hash(),
×
1259
                        &sync_anchor.block_mmr.peaks(),
×
1260
                        sync_anchor.block_mmr.num_leafs(),
×
UNCOV
1261
                    ) {
×
UNCOV
1262
                        warn!("Authentication of received block fails relative to anchor");
×
UNCOV
1263
                        self.punish(NegativePeerSanction::InvalidBlockMmrAuthentication)
×
UNCOV
1264
                            .await?;
×
UNCOV
1265
                        return Ok(KEEP_CONNECTION_ALIVE);
×
UNCOV
1266
                    }
×
UNCOV
1267

×
UNCOV
1268
                    received_blocks.push(block);
×
1269
                }
1270

1271
                // Get the latest block that we know of and handle all received blocks
UNCOV
1272
                self.handle_blocks(received_blocks, most_canonical_own_block_match)
×
UNCOV
1273
                    .await?;
×
1274

1275
                // Reward happens as part of `handle_blocks`.
1276

UNCOV
1277
                Ok(KEEP_CONNECTION_ALIVE)
×
1278
            }
1279
            PeerMessage::UnableToSatisfyBatchRequest => {
UNCOV
1280
                log_slow_scope!(fn_name!() + "::PeerMessage::UnableToSatisfyBatchRequest");
×
UNCOV
1281
                warn!(
×
UNCOV
1282
                    "Peer {} reports inability to satisfy batch request.",
×
1283
                    self.peer_address
1284
                );
1285

UNCOV
1286
                Ok(KEEP_CONNECTION_ALIVE)
×
1287
            }
1288
            PeerMessage::Handshake(_) => {
UNCOV
1289
                log_slow_scope!(fn_name!() + "::PeerMessage::Handshake");
×
1290

×
1291
                // The handshake should have been sent during connection
×
1292
                // initialization. Here it is out of order at best, malicious at
×
UNCOV
1293
                // worst.
×
UNCOV
1294
                self.punish(NegativePeerSanction::InvalidMessage).await?;
×
1295
                Ok(KEEP_CONNECTION_ALIVE)
×
1296
            }
1297
            PeerMessage::ConnectionStatus(_) => {
1298
                log_slow_scope!(fn_name!() + "::PeerMessage::ConnectionStatus");
×
1299

×
1300
                // The connection status should have been sent during connection
×
1301
                // initialization. Here it is out of order at best, malicious at
×
1302
                // worst.
×
1303

×
1304
                self.punish(NegativePeerSanction::InvalidMessage).await?;
×
1305
                Ok(KEEP_CONNECTION_ALIVE)
×
1306
            }
1307
            PeerMessage::Transaction(transaction) => {
2✔
1308
                log_slow_scope!(fn_name!() + "::PeerMessage::Transaction");
2✔
1309

2✔
1310
                debug!(
2✔
UNCOV
1311
                    "`peer_loop` received following transaction from peer. {} inputs, {} outputs. Synced to mutator set hash: {}",
×
1312
                    transaction.kernel.inputs.len(),
×
UNCOV
1313
                    transaction.kernel.outputs.len(),
×
UNCOV
1314
                    transaction.kernel.mutator_set_hash
×
1315
                );
1316

1317
                let transaction: Transaction = (*transaction).into();
2✔
1318

2✔
1319
                // 1. If transaction is invalid, punish.
2✔
1320
                if !transaction.is_valid().await {
2✔
1321
                    warn!("Received invalid tx");
×
1322
                    self.punish(NegativePeerSanction::InvalidTransaction)
×
UNCOV
1323
                        .await?;
×
UNCOV
1324
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1325
                }
2✔
1326

2✔
1327
                // 2. If transaction has coinbase, punish.
2✔
1328
                // Transactions received from peers have not been mined yet.
2✔
1329
                // Only the miner is allowed to produce transactions with non-empty coinbase fields.
2✔
1330
                if transaction.kernel.coinbase.is_some() {
2✔
1331
                    warn!("Received non-mined transaction with coinbase.");
×
1332
                    self.punish(NegativePeerSanction::NonMinedTransactionHasCoinbase)
×
1333
                        .await?;
×
1334
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1335
                }
2✔
1336

2✔
1337
                // 3. If negative fee, punish.
2✔
1338
                if transaction.kernel.fee.is_negative() {
2✔
UNCOV
1339
                    warn!("Received negative-fee transaction.");
×
UNCOV
1340
                    self.punish(NegativePeerSanction::TransactionWithNegativeFee)
×
UNCOV
1341
                        .await?;
×
UNCOV
1342
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1343
                }
2✔
1344

2✔
1345
                // 4. If transaction is already known, ignore.
2✔
1346
                if self
2✔
1347
                    .global_state_lock
2✔
1348
                    .lock_guard()
2✔
1349
                    .await
2✔
1350
                    .mempool
1351
                    .contains_with_higher_proof_quality(
1352
                        transaction.kernel.txid(),
2✔
1353
                        transaction.proof.proof_quality()?,
2✔
1354
                    )
1355
                {
UNCOV
1356
                    warn!("Received transaction that was already known");
×
1357

1358
                    // We received a transaction that we *probably* haven't requested.
1359
                    // Consider punishing here, if this is abused.
UNCOV
1360
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1361
                }
2✔
1362

1363
                // 5. if transaction is not confirmable, punish.
1364
                let mutator_set_accumulator_after = self
2✔
1365
                    .global_state_lock
2✔
1366
                    .lock_guard()
2✔
1367
                    .await
2✔
1368
                    .chain
1369
                    .light_state()
2✔
1370
                    .mutator_set_accumulator_after();
2✔
1371
                if !transaction.is_confirmable_relative_to(&mutator_set_accumulator_after) {
2✔
1372
                    warn!(
×
1373
                        "Received unconfirmable transaction with TXID {}. Unconfirmable because:",
×
UNCOV
1374
                        transaction.kernel.txid()
×
1375
                    );
1376
                    // get fine-grained error code for informative logging
UNCOV
1377
                    let confirmability_error_code = transaction
×
UNCOV
1378
                        .kernel
×
UNCOV
1379
                        .is_confirmable_relative_to(&mutator_set_accumulator_after);
×
UNCOV
1380
                    match confirmability_error_code {
×
UNCOV
1381
                        Ok(_) => unreachable!(),
×
UNCOV
1382
                        Err(TransactionConfirmabilityError::InvalidRemovalRecord(index)) => {
×
UNCOV
1383
                            warn!("invalid removal record (at index {index})");
×
UNCOV
1384
                            let invalid_removal_record = transaction.kernel.inputs[index].clone();
×
UNCOV
1385
                            let removal_record_error_code = invalid_removal_record
×
UNCOV
1386
                                .validate_inner(&mutator_set_accumulator_after);
×
UNCOV
1387
                            debug!(
×
UNCOV
1388
                                "Absolute index set of removal record {index}: {:?}",
×
1389
                                invalid_removal_record.absolute_indices
1390
                            );
UNCOV
1391
                            match removal_record_error_code {
×
UNCOV
1392
                                Ok(_) => unreachable!(),
×
1393
                                Err(RemovalRecordValidityError::AbsentAuthenticatedChunk) => {
UNCOV
1394
                                    debug!("invalid because membership proof is missing");
×
1395
                                }
1396
                                Err(RemovalRecordValidityError::InvalidSwbfiMmrMp {
UNCOV
1397
                                    chunk_index,
×
UNCOV
1398
                                }) => {
×
UNCOV
1399
                                    debug!("invalid because membership proof for chunk index {chunk_index} is invalid");
×
1400
                                }
1401
                            };
UNCOV
1402
                            self.punish(NegativePeerSanction::UnconfirmableTransaction)
×
UNCOV
1403
                                .await?;
×
UNCOV
1404
                            return Ok(KEEP_CONNECTION_ALIVE);
×
1405
                        }
1406
                        Err(TransactionConfirmabilityError::DuplicateInputs) => {
UNCOV
1407
                            warn!("duplicate inputs");
×
UNCOV
1408
                            self.punish(NegativePeerSanction::DoubleSpendingTransaction)
×
UNCOV
1409
                                .await?;
×
UNCOV
1410
                            return Ok(KEEP_CONNECTION_ALIVE);
×
1411
                        }
UNCOV
1412
                        Err(TransactionConfirmabilityError::AlreadySpentInput(index)) => {
×
UNCOV
1413
                            warn!("already spent input (at index {index})");
×
UNCOV
1414
                            self.punish(NegativePeerSanction::DoubleSpendingTransaction)
×
UNCOV
1415
                                .await?;
×
UNCOV
1416
                            return Ok(KEEP_CONNECTION_ALIVE);
×
1417
                        }
1418
                    };
1419
                }
2✔
1420

2✔
1421
                // If transaction cannot be applied to mutator set, punish.
2✔
1422
                // I don't think this can happen when above checks pass but we include
2✔
1423
                // the check to ensure that transaction can be applied.
2✔
1424
                let ms_update = MutatorSetUpdate::new(
2✔
1425
                    transaction.kernel.inputs.clone(),
2✔
1426
                    transaction.kernel.outputs.clone(),
2✔
1427
                );
2✔
1428
                let can_apply = ms_update
2✔
1429
                    .apply_to_accumulator(&mut mutator_set_accumulator_after.clone())
2✔
1430
                    .is_ok();
2✔
1431
                if !can_apply {
2✔
UNCOV
1432
                    warn!("Cannot apply transaction to current mutator set.");
×
1433
                    warn!("Transaction ID: {}", transaction.kernel.txid());
×
1434
                    self.punish(NegativePeerSanction::CannotApplyTransactionToMutatorSet)
×
1435
                        .await?;
×
1436
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1437
                }
2✔
1438

2✔
1439
                let tx_timestamp = transaction.kernel.timestamp;
2✔
1440

2✔
1441
                // 6. Ignore if transaction is too old
2✔
1442
                let now = self.now();
2✔
1443
                if tx_timestamp < now - Timestamp::seconds(MEMPOOL_TX_THRESHOLD_AGE_IN_SECS) {
2✔
1444
                    // TODO: Consider punishing here
1445
                    warn!("Received too old tx");
×
UNCOV
1446
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1447
                }
2✔
1448

2✔
1449
                // 7. Ignore if transaction is too far into the future
2✔
1450
                if tx_timestamp
2✔
1451
                    > now + Timestamp::seconds(MEMPOOL_IGNORE_TRANSACTIONS_THIS_MANY_SECS_AHEAD)
2✔
1452
                {
1453
                    // TODO: Consider punishing here
UNCOV
1454
                    warn!("Received tx too far into the future. Got timestamp: {tx_timestamp:?}");
×
UNCOV
1455
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1456
                }
2✔
1457

1458
                // Otherwise, relay to main
1459
                let pt2m_transaction = PeerTaskToMainTransaction {
2✔
1460
                    transaction,
2✔
1461
                    confirmable_for_block: self
2✔
1462
                        .global_state_lock
2✔
1463
                        .lock_guard()
2✔
1464
                        .await
2✔
1465
                        .chain
1466
                        .light_state()
2✔
1467
                        .hash(),
2✔
1468
                };
2✔
1469
                self.to_main_tx
2✔
1470
                    .send(PeerTaskToMain::Transaction(Box::new(pt2m_transaction)))
2✔
1471
                    .await?;
2✔
1472

1473
                Ok(KEEP_CONNECTION_ALIVE)
2✔
1474
            }
1475
            PeerMessage::TransactionNotification(tx_notification) => {
6✔
1476
                // addresses #457
6✔
1477
                // new scope for state read-lock to avoid holding across peer.send()
6✔
1478
                {
6✔
1479
                    log_slow_scope!(fn_name!() + "::PeerMessage::TransactionNotification");
6✔
1480

1481
                    // 1. Ignore if we already know this transaction, and
1482
                    // the proof quality is not higher than what we already know.
1483
                    let state = self.global_state_lock.lock_guard().await;
6✔
1484
                    let transaction_of_same_or_higher_proof_quality_is_known =
6✔
1485
                        state.mempool.contains_with_higher_proof_quality(
6✔
1486
                            tx_notification.txid,
6✔
1487
                            tx_notification.proof_quality,
6✔
1488
                        );
6✔
1489
                    if transaction_of_same_or_higher_proof_quality_is_known {
6✔
1490
                        debug!("transaction with same or higher proof quality was already known");
4✔
1491
                        return Ok(KEEP_CONNECTION_ALIVE);
4✔
1492
                    }
2✔
1493

2✔
1494
                    // Only accept transactions that do not require executing
2✔
1495
                    // `update`.
2✔
1496
                    if state
2✔
1497
                        .chain
2✔
1498
                        .light_state()
2✔
1499
                        .mutator_set_accumulator_after()
2✔
1500
                        .hash()
2✔
1501
                        != tx_notification.mutator_set_hash
2✔
1502
                    {
UNCOV
1503
                        debug!("transaction refers to non-canonical mutator set state");
×
UNCOV
1504
                        return Ok(KEEP_CONNECTION_ALIVE);
×
1505
                    }
2✔
1506
                }
2✔
1507

2✔
1508
                // 2. Request the actual `Transaction` from peer
2✔
1509
                debug!("requesting transaction from peer");
2✔
1510
                peer.send(PeerMessage::TransactionRequest(tx_notification.txid))
2✔
1511
                    .await?;
2✔
1512

1513
                Ok(KEEP_CONNECTION_ALIVE)
2✔
1514
            }
1515
            PeerMessage::TransactionRequest(transaction_identifier) => {
×
1516
                if let Some(transaction) = self
×
1517
                    .global_state_lock
×
1518
                    .lock_guard()
×
UNCOV
1519
                    .await
×
1520
                    .mempool
1521
                    .get(transaction_identifier)
×
1522
                {
UNCOV
1523
                    if let Ok(transfer_transaction) = transaction.try_into() {
×
UNCOV
1524
                        peer.send(PeerMessage::Transaction(Box::new(transfer_transaction)))
×
UNCOV
1525
                            .await?;
×
1526
                    } else {
UNCOV
1527
                        warn!("Peer requested transaction that cannot be converted to transfer object");
×
1528
                    }
UNCOV
1529
                }
×
1530

UNCOV
1531
                Ok(KEEP_CONNECTION_ALIVE)
×
1532
            }
1533
            PeerMessage::BlockProposalNotification(block_proposal_notification) => {
1✔
1534
                let verdict = self
1✔
1535
                    .global_state_lock
1✔
1536
                    .lock_guard()
1✔
1537
                    .await
1✔
1538
                    .favor_incoming_block_proposal(
1✔
1539
                        block_proposal_notification.height,
1✔
1540
                        block_proposal_notification.guesser_fee,
1✔
1541
                    );
1✔
1542
                match verdict {
1✔
1543
                    Ok(_) => {
1544
                        peer.send(PeerMessage::BlockProposalRequest(
1✔
1545
                            BlockProposalRequest::new(block_proposal_notification.body_mast_hash),
1✔
1546
                        ))
1✔
1547
                        .await?
1✔
1548
                    }
UNCOV
1549
                    Err(reject_reason) => info!(
×
UNCOV
1550
                        "Got unfavorable block proposal notification from {} peer; rejecting. Reason:\n{reject_reason}",
×
1551
                        self.peer_address
1552
                    ),
1553
                }
1554

1555
                Ok(KEEP_CONNECTION_ALIVE)
1✔
1556
            }
UNCOV
1557
            PeerMessage::BlockProposalRequest(block_proposal_request) => {
×
UNCOV
1558
                let matching_proposal = self
×
UNCOV
1559
                    .global_state_lock
×
UNCOV
1560
                    .lock_guard()
×
UNCOV
1561
                    .await
×
1562
                    .mining_state
1563
                    .block_proposal
UNCOV
1564
                    .filter(|x| x.body().mast_hash() == block_proposal_request.body_mast_hash)
×
UNCOV
1565
                    .map(|x| x.to_owned());
×
UNCOV
1566
                if let Some(proposal) = matching_proposal {
×
UNCOV
1567
                    peer.send(PeerMessage::BlockProposal(Box::new(proposal)))
×
UNCOV
1568
                        .await?;
×
1569
                } else {
UNCOV
1570
                    self.punish(NegativePeerSanction::BlockProposalNotFound)
×
1571
                        .await?;
×
1572
                }
1573

1574
                Ok(KEEP_CONNECTION_ALIVE)
×
1575
            }
1576
            PeerMessage::BlockProposal(block) => {
1✔
1577
                info!("Got block proposal from peer.");
1✔
1578

1579
                let should_punish = {
1✔
1580
                    log_slow_scope!(fn_name!() + "::PeerMessage::BlockProposal::should_punish");
1✔
1581

1582
                    let (verdict, tip) = {
1✔
1583
                        let state = self.global_state_lock.lock_guard().await;
1✔
1584

1585
                        let verdict = state.favor_incoming_block_proposal(
1✔
1586
                            block.header().height,
1✔
1587
                            block.total_guesser_reward(),
1✔
1588
                        );
1✔
1589
                        let tip = state.chain.light_state().to_owned();
1✔
1590
                        (verdict, tip)
1✔
1591
                    };
1592

1593
                    if let Err(rejection_reason) = verdict {
1✔
1594
                        match rejection_reason {
×
1595
                            // no need to punish and log if the fees are equal.  we just ignore the incoming proposal.
UNCOV
1596
                            BlockProposalRejectError::InsufficientFee { current, received }
×
1597
                                if Some(received) == current =>
×
1598
                            {
×
1599
                                debug!("ignoring new block proposal because the fee is equal to the present one");
×
1600
                                None
×
1601
                            }
1602
                            _ => {
1603
                                warn!("Rejecting new block proposal:\n{rejection_reason}");
×
1604
                                Some(NegativePeerSanction::NonFavorableBlockProposal)
×
1605
                            }
1606
                        }
1607
                    } else {
1608
                        // Verify validity and that proposal is child of current tip
1609
                        if block.is_valid(&tip, self.now()).await {
1✔
1610
                            None // all is well, no punishment.
1✔
1611
                        } else {
1612
                            Some(NegativePeerSanction::InvalidBlockProposal)
×
1613
                        }
1614
                    }
1615
                };
1616

1617
                if let Some(sanction) = should_punish {
1✔
1618
                    self.punish(sanction).await?;
×
1619
                } else {
1620
                    self.send_to_main(PeerTaskToMain::BlockProposal(block), line!())
1✔
1621
                        .await?;
1✔
1622

1623
                    // Valuable, new, hard-to-produce information. Reward peer.
1624
                    self.reward(PositivePeerSanction::NewBlockProposal).await?;
1✔
1625
                }
1626

1627
                Ok(KEEP_CONNECTION_ALIVE)
1✔
1628
            }
1629
            PeerMessage::BootstrapStatus(status) => {
20✔
1630
                self.handle_bootstrap_status_message(status).await?;
20✔
1631
                Ok(KEEP_CONNECTION_ALIVE)
20✔
1632
            }
1633
        }
1634
    }
129✔
1635

1636
    /// send msg to main via mpsc channel `to_main_tx` and logs if slow.
1637
    ///
1638
    /// the channel could potentially fill up in which case the send() will
1639
    /// block until there is capacity.  we wrap the send() so we can log if
1640
    /// that ever happens to the extent it passes slow-scope threshold.
1641
    async fn send_to_main(
1✔
1642
        &self,
1✔
1643
        msg: PeerTaskToMain,
1✔
1644
        line: u32,
1✔
1645
    ) -> Result<(), tokio::sync::mpsc::error::SendError<PeerTaskToMain>> {
1✔
1646
        // we measure across the send() in case the channel ever fills up.
1✔
1647
        log_slow_scope!(fn_name!() + &format!("peer_loop.rs:{}", line));
1✔
1648

1✔
1649
        self.to_main_tx.send(msg).await
1✔
1650
    }
1✔
1651

1652
    /// Handle message from main task. The boolean return value indicates if
1653
    /// the connection should be closed.
1654
    ///
1655
    /// Locking:
1656
    ///   * acquires `global_state_lock` for write via Self::punish()
1657
    async fn handle_main_task_message<S>(
×
1658
        &mut self,
×
1659
        msg: MainToPeerTask,
×
1660
        peer: &mut S,
×
1661
        peer_state_info: &mut MutablePeerState,
×
1662
    ) -> Result<bool>
×
1663
    where
×
1664
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
×
UNCOV
1665
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
×
1666
        <S as TryStream>::Error: std::error::Error,
×
1667
    {
×
1668
        debug!("Handling {} message from main in peer loop", msg.get_type());
×
1669
        match msg {
×
1670
            MainToPeerTask::Block(block) => {
×
1671
                // We don't currently differentiate whether a new block came from a peer, or from our
×
1672
                // own miner. It's always shared through this logic.
×
1673
                let new_block_height = block.kernel.header.height;
×
UNCOV
1674
                if new_block_height > peer_state_info.highest_shared_block_height {
×
UNCOV
1675
                    debug!("Sending PeerMessage::BlockNotification");
×
1676
                    peer_state_info.highest_shared_block_height = new_block_height;
×
UNCOV
1677
                    peer.send(PeerMessage::BlockNotification(block.as_ref().into()))
×
UNCOV
1678
                        .await?;
×
UNCOV
1679
                    debug!("Sent PeerMessage::BlockNotification");
×
UNCOV
1680
                }
×
UNCOV
1681
                Ok(KEEP_CONNECTION_ALIVE)
×
1682
            }
UNCOV
1683
            MainToPeerTask::RequestBlockBatch(batch_block_request) => {
×
UNCOV
1684
                // Only ask one of the peers about the batch of blocks
×
UNCOV
1685
                if batch_block_request.peer_addr_target != self.peer_address {
×
UNCOV
1686
                    return Ok(KEEP_CONNECTION_ALIVE);
×
UNCOV
1687
                }
×
UNCOV
1688

×
UNCOV
1689
                let max_response_len = std::cmp::min(
×
UNCOV
1690
                    STANDARD_BLOCK_BATCH_SIZE,
×
UNCOV
1691
                    self.global_state_lock.cli().sync_mode_threshold,
×
UNCOV
1692
                );
×
UNCOV
1693

×
UNCOV
1694
                peer.send(PeerMessage::BlockRequestBatch(BlockRequestBatch {
×
UNCOV
1695
                    known_blocks: batch_block_request.known_blocks,
×
UNCOV
1696
                    max_response_len,
×
UNCOV
1697
                    anchor: batch_block_request.anchor_mmr,
×
1698
                }))
×
1699
                .await?;
×
1700

1701
                Ok(KEEP_CONNECTION_ALIVE)
×
1702
            }
UNCOV
1703
            MainToPeerTask::PeerSynchronizationTimeout(socket_addr) => {
×
UNCOV
1704
                log_slow_scope!(fn_name!() + "::MainToPeerTask::PeerSynchronizationTimeout");
×
1705

×
1706
                if self.peer_address != socket_addr {
×
UNCOV
1707
                    return Ok(KEEP_CONNECTION_ALIVE);
×
UNCOV
1708
                }
×
UNCOV
1709

×
UNCOV
1710
                self.punish(NegativePeerSanction::SynchronizationTimeout)
×
UNCOV
1711
                    .await?;
×
1712

1713
                // If this peer failed the last synchronization attempt, we only
1714
                // sanction, we don't disconnect.
UNCOV
1715
                Ok(KEEP_CONNECTION_ALIVE)
×
1716
            }
1717
            MainToPeerTask::MakePeerDiscoveryRequest => {
UNCOV
1718
                peer.send(PeerMessage::PeerListRequest).await?;
×
1719
                Ok(KEEP_CONNECTION_ALIVE)
×
1720
            }
UNCOV
1721
            MainToPeerTask::Disconnect(peer_address) => {
×
1722
                log_slow_scope!(fn_name!() + "::MainToPeerTask::Disconnect");
×
UNCOV
1723

×
UNCOV
1724
                // Only disconnect from the peer the main task requested a disconnect for.
×
UNCOV
1725
                if peer_address != self.peer_address {
×
UNCOV
1726
                    return Ok(KEEP_CONNECTION_ALIVE);
×
UNCOV
1727
                }
×
UNCOV
1728
                self.register_peer_disconnection().await;
×
1729

UNCOV
1730
                Ok(DISCONNECT_CONNECTION)
×
1731
            }
1732
            MainToPeerTask::DisconnectAll() => {
UNCOV
1733
                self.register_peer_disconnection().await;
×
1734

UNCOV
1735
                Ok(DISCONNECT_CONNECTION)
×
1736
            }
UNCOV
1737
            MainToPeerTask::MakeSpecificPeerDiscoveryRequest(target_socket_addr) => {
×
UNCOV
1738
                if target_socket_addr == self.peer_address {
×
UNCOV
1739
                    peer.send(PeerMessage::PeerListRequest).await?;
×
UNCOV
1740
                }
×
UNCOV
1741
                Ok(KEEP_CONNECTION_ALIVE)
×
1742
            }
1743
            MainToPeerTask::TransactionNotification(transaction_notification) => {
×
1744
                debug!("Sending PeerMessage::TransactionNotification");
×
1745
                peer.send(PeerMessage::TransactionNotification(
×
1746
                    transaction_notification,
×
1747
                ))
×
1748
                .await?;
×
1749
                debug!("Sent PeerMessage::TransactionNotification");
×
1750
                Ok(KEEP_CONNECTION_ALIVE)
×
1751
            }
1752
            MainToPeerTask::BlockProposalNotification(block_proposal_notification) => {
×
1753
                debug!("Sending PeerMessage::BlockProposalNotification");
×
1754
                peer.send(PeerMessage::BlockProposalNotification(
×
1755
                    block_proposal_notification,
×
UNCOV
1756
                ))
×
UNCOV
1757
                .await?;
×
1758
                debug!("Sent PeerMessage::BlockProposalNotification");
×
1759
                Ok(KEEP_CONNECTION_ALIVE)
×
1760
            }
1761
        }
UNCOV
1762
    }
×
1763

1764
    /// Loop for the peer tasks. Awaits either a message from the peer over TCP,
1765
    /// or a message from main over the main-to-peer-tasks broadcast channel.
1766
    async fn run<S>(
59✔
1767
        &mut self,
59✔
1768
        mut peer: S,
59✔
1769
        mut from_main_rx: broadcast::Receiver<MainToPeerTask>,
59✔
1770
        peer_state_info: &mut MutablePeerState,
59✔
1771
    ) -> Result<()>
59✔
1772
    where
59✔
1773
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
59✔
1774
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
59✔
1775
        <S as TryStream>::Error: std::error::Error,
59✔
1776
    {
59✔
1777
        loop {
1778
            select! {
129✔
1779
                // Handle peer messages
1780
                peer_message = peer.try_next() => {
129✔
1781
                    let peer_address = self.peer_address;
129✔
1782
                    let peer_message = match peer_message {
129✔
1783
                        Ok(message) => message,
129✔
UNCOV
1784
                        Err(err) => {
×
UNCOV
1785
                            let msg = format!("Error when receiving from peer: {peer_address}");
×
UNCOV
1786
                            error!("{msg}. Error: {err}");
×
UNCOV
1787
                            bail!("{msg}. Closing connection.");
×
1788
                        }
1789
                    };
1790
                    let Some(peer_message) = peer_message else {
129✔
UNCOV
1791
                        info!("Peer {peer_address} closed connection.");
×
UNCOV
1792
                        break;
×
1793
                    };
1794

1795
                    let syncing =
129✔
1796
                        self.global_state_lock.lock(|s| s.net.sync_anchor.is_some()).await;
129✔
1797
                    let message_type = peer_message.get_type();
129✔
1798
                    if peer_message.ignore_during_sync() && syncing {
129✔
UNCOV
1799
                        debug!(
×
UNCOV
1800
                            "Ignoring {message_type} message when syncing, from {peer_address}",
×
1801
                        );
UNCOV
1802
                        continue;
×
1803
                    }
129✔
1804
                    if peer_message.ignore_when_not_sync() && !syncing {
129✔
UNCOV
1805
                        debug!(
×
UNCOV
1806
                            "Ignoring {message_type} message when not syncing, from {peer_address}",
×
1807
                        );
UNCOV
1808
                        continue;
×
1809
                    }
129✔
1810

129✔
1811
                    match self
129✔
1812
                        .handle_peer_message(peer_message, &mut peer, peer_state_info)
129✔
1813
                        .await
129✔
1814
                    {
1815
                        Ok(false) => {}
70✔
1816
                        Ok(true) => {
1817
                            info!("Closing connection to {peer_address}");
58✔
1818
                            break;
58✔
1819
                        }
1820
                        Err(err) => {
1✔
1821
                            warn!("Closing connection to {peer_address} because of error {err}.");
1✔
1822
                            bail!("{err}");
1✔
1823
                        }
1824
                    };
1825
                }
1826

1827
                // Handle messages from main task
1828
                main_msg_res = from_main_rx.recv() => {
129✔
UNCOV
1829
                    let main_msg = main_msg_res
×
UNCOV
1830
                        .unwrap_or_else(|e| panic!("Failed to read from main loop: {e}"));
×
UNCOV
1831
                    let close_connection = self
×
UNCOV
1832
                        .handle_main_task_message(main_msg, &mut peer, peer_state_info)
×
UNCOV
1833
                        .await
×
UNCOV
1834
                        .unwrap_or_else(|err| {
×
UNCOV
1835
                            warn!("handle_main_task_message returned an error: {err}");
×
UNCOV
1836
                            true
×
UNCOV
1837
                        });
×
UNCOV
1838

×
1839
                    if close_connection {
×
UNCOV
1840
                        info!(
×
UNCOV
1841
                            "handle_main_task_message is closing the connection to {}",
×
1842
                            self.peer_address
1843
                        );
UNCOV
1844
                        break;
×
UNCOV
1845
                    }
×
1846
                }
1847
            }
1848
        }
1849

1850
        Ok(())
58✔
1851
    }
59✔
1852

1853
    /// Function called before entering the peer loop. Reads the potentially stored
1854
    /// peer standing from the database and does other book-keeping before entering
1855
    /// its final resting place: the `peer_loop`. Note that the peer has already been
1856
    /// accepted for a connection for this loop to be entered. So we don't need
1857
    /// to check the standing again.
1858
    ///
1859
    /// Locking:
1860
    ///   * acquires `global_state_lock` for write
1861
    pub(crate) async fn run_wrapper<S>(
51✔
1862
        &mut self,
51✔
1863
        mut peer: S,
51✔
1864
        from_main_rx: broadcast::Receiver<MainToPeerTask>,
51✔
1865
    ) -> Result<()>
51✔
1866
    where
51✔
1867
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
51✔
1868
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
51✔
1869
        <S as TryStream>::Error: std::error::Error,
51✔
1870
    {
51✔
1871
        const TIME_DIFFERENCE_WARN_THRESHOLD_IN_SECONDS: i128 = 120;
1872

1873
        let cli_args = self.global_state_lock.cli().clone();
51✔
1874
        let global_state = self.global_state_lock.lock_guard().await;
51✔
1875

1876
        let standing = global_state
51✔
1877
            .net
51✔
1878
            .peer_databases
51✔
1879
            .peer_standings
51✔
1880
            .get(self.peer_address.ip())
51✔
1881
            .await
51✔
1882
            .unwrap_or_else(|| PeerStanding::new(cli_args.peer_tolerance));
51✔
1883

51✔
1884
        // Add peer to peer map
51✔
1885
        let peer_connection_info = PeerConnectionInfo::new(
51✔
1886
            self.peer_handshake_data.listen_port,
51✔
1887
            self.peer_address,
51✔
1888
            self.inbound_connection,
51✔
1889
        );
51✔
1890
        let new_peer = PeerInfo::new(
51✔
1891
            peer_connection_info,
51✔
1892
            &self.peer_handshake_data,
51✔
1893
            SystemTime::now(),
51✔
1894
            cli_args.peer_tolerance,
51✔
1895
        )
51✔
1896
        .with_standing(standing);
51✔
1897

51✔
1898
        // If timestamps are different, we currently just log a warning.
51✔
1899
        let peer_clock_ahead_in_seconds = new_peer.time_difference_in_seconds();
51✔
1900
        let own_clock_ahead_in_seconds = -peer_clock_ahead_in_seconds;
51✔
1901
        if peer_clock_ahead_in_seconds > TIME_DIFFERENCE_WARN_THRESHOLD_IN_SECONDS
51✔
1902
            || own_clock_ahead_in_seconds > TIME_DIFFERENCE_WARN_THRESHOLD_IN_SECONDS
51✔
1903
        {
1904
            let own_datetime_utc: DateTime<Utc> =
×
1905
                new_peer.own_timestamp_connection_established.into();
×
1906
            let peer_datetime_utc: DateTime<Utc> =
×
UNCOV
1907
                new_peer.peer_timestamp_connection_established.into();
×
1908
            warn!(
×
1909
                "New peer {} disagrees with us about time. Peer reports time {} but our clock at handshake was {}.",
×
UNCOV
1910
                new_peer.connected_address(),
×
UNCOV
1911
                peer_datetime_utc.format("%Y-%m-%d %H:%M:%S"),
×
UNCOV
1912
                own_datetime_utc.format("%Y-%m-%d %H:%M:%S"));
×
1913
        }
51✔
1914

1915
        // There is potential for a race-condition in the peer_map here, as we've previously
1916
        // counted the number of entries and checked if instance ID was already connected. But
1917
        // this check could have been invalidated by other tasks so we perform it again
1918

1919
        if global_state
51✔
1920
            .net
51✔
1921
            .peer_map
51✔
1922
            .values()
51✔
1923
            .any(|pi| pi.instance_id() == self.peer_handshake_data.instance_id)
51✔
1924
        {
UNCOV
1925
            bail!("Attempted to connect to already connected peer. Aborting connection.");
×
1926
        }
51✔
1927

51✔
1928
        if global_state.net.peer_map.len() >= cli_args.max_num_peers {
51✔
UNCOV
1929
            bail!("Attempted to connect to more peers than allowed. Aborting connection.");
×
1930
        }
51✔
1931

51✔
1932
        if global_state.net.peer_map.contains_key(&self.peer_address) {
51✔
1933
            // This shouldn't be possible, unless the peer reports a different instance ID than
1934
            // for the other connection. Only a malignant client would do that.
UNCOV
1935
            bail!("Already connected to peer. Aborting connection");
×
1936
        }
51✔
1937
        drop(global_state);
51✔
1938

51✔
1939
        self.global_state_lock
51✔
1940
            .lock_mut(|s| s.net.peer_map.insert(self.peer_address, new_peer))
51✔
1941
            .await;
51✔
1942

1943
        // `MutablePeerState` contains the part of the peer-loop's state that is mutable
1944
        let mut peer_state = MutablePeerState::new(self.peer_handshake_data.tip_header.height);
51✔
1945

51✔
1946
        // If peer indicates more canonical block, request a block notification to catch up ASAP
51✔
1947
        if self.peer_handshake_data.tip_header.cumulative_proof_of_work
51✔
1948
            > self
51✔
1949
                .global_state_lock
51✔
1950
                .lock_guard()
51✔
1951
                .await
51✔
1952
                .chain
1953
                .light_state()
51✔
1954
                .kernel
1955
                .header
1956
                .cumulative_proof_of_work
1957
        {
1958
            // Send block notification request to catch up ASAP, in case we're
1959
            // behind the newly-connected peer.
UNCOV
1960
            peer.send(PeerMessage::BlockNotificationRequest).await?;
×
1961
        }
51✔
1962

1963
        let res = self.run(peer, from_main_rx, &mut peer_state).await;
51✔
1964
        debug!("Exited peer loop for {}", self.peer_address);
51✔
1965

1966
        close_peer_connected_callback(
51✔
1967
            self.global_state_lock.clone(),
51✔
1968
            self.peer_address,
51✔
1969
            &self.to_main_tx,
51✔
1970
        )
51✔
1971
        .await;
51✔
1972

1973
        debug!("Ending peer loop for {}", self.peer_address);
51✔
1974

1975
        // Return any error that `run` returned. Returning and not suppressing errors is a quite nice
1976
        // feature to have for testing purposes.
1977
        res
51✔
1978
    }
51✔
1979

1980
    /// Register graceful peer disconnection in the global state.
1981
    ///
1982
    /// See also [`NetworkingState::register_peer_disconnection`][1].
1983
    ///
1984
    /// # Locking:
1985
    ///   * acquires `global_state_lock` for write
1986
    ///
1987
    /// [1]: crate::models::state::networking_state::NetworkingState::register_peer_disconnection
UNCOV
1988
    async fn register_peer_disconnection(&mut self) {
×
UNCOV
1989
        let peer_id = self.peer_handshake_data.instance_id;
×
UNCOV
1990
        self.global_state_lock
×
UNCOV
1991
            .lock_guard_mut()
×
UNCOV
1992
            .await
×
1993
            .net
UNCOV
1994
            .register_peer_disconnection(peer_id, SystemTime::now());
×
UNCOV
1995
    }
×
1996
}
1997

1998
#[cfg(test)]
1999
mod peer_loop_tests {
2000
    use rand::rngs::StdRng;
2001
    use rand::Rng;
2002
    use rand::SeedableRng;
2003
    use tokio::sync::mpsc::error::TryRecvError;
2004
    use tracing_test::traced_test;
2005

2006
    use super::*;
2007
    use crate::config_models::cli_args;
2008
    use crate::config_models::network::Network;
2009
    use crate::job_queue::triton_vm::TritonVmJobQueue;
2010
    use crate::models::blockchain::block::block_header::TARGET_BLOCK_INTERVAL;
2011
    use crate::models::blockchain::type_scripts::native_currency_amount::NativeCurrencyAmount;
2012
    use crate::models::peer::peer_block_notifications::PeerBlockNotification;
2013
    use crate::models::peer::transaction_notification::TransactionNotification;
2014
    use crate::models::state::mempool::TransactionOrigin;
2015
    use crate::models::state::tx_proving_capability::TxProvingCapability;
2016
    use crate::models::state::wallet::utxo_notification::UtxoNotificationMedium;
2017
    use crate::models::state::wallet::wallet_entropy::WalletEntropy;
2018
    use crate::tests::shared::fake_valid_block_for_tests;
2019
    use crate::tests::shared::fake_valid_sequence_of_blocks_for_tests;
2020
    use crate::tests::shared::get_dummy_handshake_data_for_genesis;
2021
    use crate::tests::shared::get_dummy_peer_connection_data_genesis;
2022
    use crate::tests::shared::get_dummy_socket_address;
2023
    use crate::tests::shared::get_test_genesis_setup;
2024
    use crate::tests::shared::Action;
2025
    use crate::tests::shared::Mock;
2026

UNCOV
2027
    #[traced_test]
×
2028
    #[tokio::test]
2029
    async fn test_peer_loop_bye() -> Result<()> {
1✔
2030
        let mock = Mock::new(vec![Action::Read(PeerMessage::Bye)]);
1✔
2031

1✔
2032
        let (peer_broadcast_tx, _from_main_rx_clone, to_main_tx, _to_main_rx1, state_lock, hsd) =
1✔
2033
            get_test_genesis_setup(Network::Alpha, 2, cli_args::Args::default()).await?;
1✔
2034

1✔
2035
        let peer_address = get_dummy_socket_address(2);
1✔
2036
        let from_main_rx_clone = peer_broadcast_tx.subscribe();
1✔
2037
        let mut peer_loop_handler =
1✔
2038
            PeerLoopHandler::new(to_main_tx, state_lock.clone(), peer_address, hsd, true, 1);
1✔
2039
        peer_loop_handler
1✔
2040
            .run_wrapper(mock, from_main_rx_clone)
1✔
2041
            .await?;
1✔
2042

1✔
2043
        assert_eq!(
1✔
2044
            2,
1✔
2045
            state_lock.lock_guard().await.net.peer_map.len(),
1✔
2046
            "peer map length must be back to 2 after goodbye"
1✔
2047
        );
1✔
2048

1✔
2049
        Ok(())
1✔
2050
    }
1✔
2051

UNCOV
2052
    #[traced_test]
×
2053
    #[tokio::test]
2054
    async fn test_peer_loop_peer_list() {
1✔
2055
        let (peer_broadcast_tx, _from_main_rx_clone, to_main_tx, _to_main_rx1, state_lock, _hsd) =
1✔
2056
            get_test_genesis_setup(Network::Alpha, 2, cli_args::Args::default())
1✔
2057
                .await
1✔
2058
                .unwrap();
1✔
2059

1✔
2060
        let mut peer_infos = state_lock
1✔
2061
            .lock_guard()
1✔
2062
            .await
1✔
2063
            .net
1✔
2064
            .peer_map
1✔
2065
            .clone()
1✔
2066
            .into_values()
1✔
2067
            .collect::<Vec<_>>();
1✔
2068
        peer_infos.sort_by_cached_key(|x| x.connected_address());
2✔
2069
        let (peer_address0, instance_id0) = (
1✔
2070
            peer_infos[0].connected_address(),
1✔
2071
            peer_infos[0].instance_id(),
1✔
2072
        );
1✔
2073
        let (peer_address1, instance_id1) = (
1✔
2074
            peer_infos[1].connected_address(),
1✔
2075
            peer_infos[1].instance_id(),
1✔
2076
        );
1✔
2077

1✔
2078
        let (hsd2, sa2) = get_dummy_peer_connection_data_genesis(Network::Alpha, 2);
1✔
2079
        let expected_response = vec![
1✔
2080
            (peer_address0, instance_id0),
1✔
2081
            (peer_address1, instance_id1),
1✔
2082
            (sa2, hsd2.instance_id),
1✔
2083
        ];
1✔
2084
        let mock = Mock::new(vec![
1✔
2085
            Action::Read(PeerMessage::PeerListRequest),
1✔
2086
            Action::Write(PeerMessage::PeerListResponse(expected_response)),
1✔
2087
            Action::Read(PeerMessage::Bye),
1✔
2088
        ]);
1✔
2089

1✔
2090
        let from_main_rx_clone = peer_broadcast_tx.subscribe();
1✔
2091

1✔
2092
        let mut peer_loop_handler =
1✔
2093
            PeerLoopHandler::new(to_main_tx, state_lock.clone(), sa2, hsd2, true, 0);
1✔
2094
        peer_loop_handler
1✔
2095
            .run_wrapper(mock, from_main_rx_clone)
1✔
2096
            .await
1✔
2097
            .unwrap();
1✔
2098

1✔
2099
        assert_eq!(
1✔
2100
            2,
1✔
2101
            state_lock.lock_guard().await.net.peer_map.len(),
1✔
2102
            "peer map must have length 2 after saying goodbye to peer 2"
1✔
2103
        );
1✔
2104
    }
1✔
2105

UNCOV
2106
    #[traced_test]
×
2107
    #[tokio::test]
2108
    async fn different_genesis_test() -> Result<()> {
1✔
2109
        // In this scenario a peer provides another genesis block than what has been
1✔
2110
        // hardcoded. This should lead to the closing of the connection to this peer
1✔
2111
        // and a ban.
1✔
2112

1✔
2113
        let network = Network::Main;
1✔
2114
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, mut to_main_rx1, state_lock, hsd) =
1✔
2115
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
2116
        assert_eq!(1000, state_lock.cli().peer_tolerance);
1✔
2117
        let peer_address = get_dummy_socket_address(0);
1✔
2118

1✔
2119
        // Although the database is empty, `get_latest_block` still returns the genesis block,
1✔
2120
        // since that block is hardcoded.
1✔
2121
        let mut different_genesis_block = state_lock
1✔
2122
            .lock_guard()
1✔
2123
            .await
1✔
2124
            .chain
1✔
2125
            .archival_state()
1✔
2126
            .get_tip()
1✔
2127
            .await;
1✔
2128

1✔
2129
        different_genesis_block.set_header_nonce(StdRng::seed_from_u64(5550001).random());
1✔
2130
        let [block_1_with_different_genesis] = fake_valid_sequence_of_blocks_for_tests(
1✔
2131
            &different_genesis_block,
1✔
2132
            Timestamp::hours(1),
1✔
2133
            StdRng::seed_from_u64(5550001).random(),
1✔
2134
        )
1✔
2135
        .await;
1✔
2136
        let mock = Mock::new(vec![Action::Read(PeerMessage::Block(Box::new(
1✔
2137
            block_1_with_different_genesis.try_into().unwrap(),
1✔
2138
        )))]);
1✔
2139

1✔
2140
        let mut peer_loop_handler = PeerLoopHandler::new(
1✔
2141
            to_main_tx.clone(),
1✔
2142
            state_lock.clone(),
1✔
2143
            peer_address,
1✔
2144
            hsd,
1✔
2145
            true,
1✔
2146
            1,
1✔
2147
        );
1✔
2148
        let res = peer_loop_handler
1✔
2149
            .run_wrapper(mock, from_main_rx_clone)
1✔
2150
            .await;
1✔
2151
        assert!(
1✔
2152
            res.is_err(),
1✔
2153
            "run_wrapper must return failure when genesis is different"
1✔
2154
        );
1✔
2155

1✔
2156
        match to_main_rx1.recv().await {
1✔
2157
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
1✔
2158
            _ => bail!("Must receive remove of peer block max height"),
1✔
2159
        }
1✔
2160

1✔
2161
        // Verify that no further message was sent to main loop
1✔
2162
        match to_main_rx1.try_recv() {
1✔
2163
            Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (),
1✔
2164
            _ => bail!("Block notification must not be sent for block with invalid PoW"),
1✔
2165
        };
1✔
2166

1✔
2167
        drop(to_main_tx);
1✔
2168

1✔
2169
        let peer_standing = state_lock
1✔
2170
            .lock_guard()
1✔
2171
            .await
1✔
2172
            .net
1✔
2173
            .get_peer_standing_from_database(peer_address.ip())
1✔
2174
            .await;
1✔
2175
        assert_eq!(
1✔
2176
            -i32::from(state_lock.cli().peer_tolerance),
1✔
2177
            peer_standing.unwrap().standing
1✔
2178
        );
1✔
2179
        assert_eq!(
1✔
2180
            NegativePeerSanction::DifferentGenesis,
1✔
2181
            peer_standing.unwrap().latest_punishment.unwrap().0
1✔
2182
        );
1✔
2183

1✔
2184
        Ok(())
1✔
2185
    }
1✔
2186

UNCOV
2187
    #[traced_test]
×
2188
    #[tokio::test]
2189
    async fn node_does_not_record_disconnection_time_when_peer_initiates_disconnect() -> Result<()>
1✔
2190
    {
1✔
2191
        let args = cli_args::Args::default();
1✔
2192
        let network = args.network;
1✔
2193
        let (from_main_tx, from_main_rx, to_main_tx, to_main_rx, state_lock, _) =
1✔
2194
            get_test_genesis_setup(network, 0, args).await?;
1✔
2195

1✔
2196
        let peer_address = get_dummy_socket_address(0);
1✔
2197
        let peer_handshake_data = get_dummy_handshake_data_for_genesis(network);
1✔
2198
        let peer_id = peer_handshake_data.instance_id;
1✔
2199
        let mut peer_loop_handler = PeerLoopHandler::new(
1✔
2200
            to_main_tx,
1✔
2201
            state_lock.clone(),
1✔
2202
            peer_address,
1✔
2203
            peer_handshake_data,
1✔
2204
            true,
1✔
2205
            1,
1✔
2206
        );
1✔
2207
        let mock = Mock::new(vec![Action::Read(PeerMessage::Bye)]);
1✔
2208
        peer_loop_handler.run_wrapper(mock, from_main_rx).await?;
1✔
2209

1✔
2210
        let global_state = state_lock.lock_guard().await;
1✔
2211
        assert!(global_state
1✔
2212
            .net
1✔
2213
            .last_disconnection_time_of_peer(peer_id)
1✔
2214
            .is_none());
1✔
2215

1✔
2216
        drop(to_main_rx);
1✔
2217
        drop(from_main_tx);
1✔
2218

1✔
2219
        Ok(())
1✔
2220
    }
1✔
2221

UNCOV
2222
    #[traced_test]
×
2223
    #[tokio::test]
2224
    async fn block_without_valid_pow_test() -> Result<()> {
1✔
2225
        // In this scenario, a block without a valid PoW is received. This block should be rejected
1✔
2226
        // by the peer loop and a notification should never reach the main loop.
1✔
2227

1✔
2228
        let network = Network::Main;
1✔
2229
        let (peer_broadcast_tx, _from_main_rx_clone, to_main_tx, mut to_main_rx1, state_lock, hsd) =
1✔
2230
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
2231
        let peer_address = get_dummy_socket_address(0);
1✔
2232
        let genesis_block: Block = state_lock
1✔
2233
            .lock_guard()
1✔
2234
            .await
1✔
2235
            .chain
1✔
2236
            .archival_state()
1✔
2237
            .get_tip()
1✔
2238
            .await;
1✔
2239

1✔
2240
        // Make a with hash above what the implied threshold from
1✔
2241
        let [mut block_without_valid_pow] = fake_valid_sequence_of_blocks_for_tests(
1✔
2242
            &genesis_block,
1✔
2243
            Timestamp::hours(1),
1✔
2244
            StdRng::seed_from_u64(5550001).random(),
1✔
2245
        )
1✔
2246
        .await;
1✔
2247

1✔
2248
        // This *probably* is invalid PoW -- and needs to be for this test to
1✔
2249
        // work.
1✔
2250
        block_without_valid_pow.set_header_nonce(Digest::default());
1✔
2251

1✔
2252
        // Sending an invalid block will not necessarily result in a ban. This depends on the peer
1✔
2253
        // tolerance that is set in the client. For this reason, we include a "Bye" here.
1✔
2254
        let mock = Mock::new(vec![
1✔
2255
            Action::Read(PeerMessage::Block(Box::new(
1✔
2256
                block_without_valid_pow.clone().try_into().unwrap(),
1✔
2257
            ))),
1✔
2258
            Action::Read(PeerMessage::Bye),
1✔
2259
        ]);
1✔
2260

1✔
2261
        let from_main_rx_clone = peer_broadcast_tx.subscribe();
1✔
2262

1✔
2263
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
2264
            to_main_tx.clone(),
1✔
2265
            state_lock.clone(),
1✔
2266
            peer_address,
1✔
2267
            hsd,
1✔
2268
            true,
1✔
2269
            1,
1✔
2270
            block_without_valid_pow.header().timestamp,
1✔
2271
        );
1✔
2272
        peer_loop_handler
1✔
2273
            .run_wrapper(mock, from_main_rx_clone)
1✔
2274
            .await
1✔
2275
            .expect("sending (one) invalid block should not result in closed connection");
1✔
2276

1✔
2277
        match to_main_rx1.recv().await {
1✔
2278
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
1✔
2279
            _ => bail!("Must receive remove of peer block max height"),
1✔
2280
        }
1✔
2281

1✔
2282
        // Verify that no further message was sent to main loop
1✔
2283
        match to_main_rx1.try_recv() {
1✔
2284
            Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (),
1✔
2285
            _ => bail!("Block notification must not be sent for block with invalid PoW"),
1✔
2286
        };
1✔
2287

1✔
2288
        // We need to have the transmitter in scope until we have received from it
1✔
2289
        // otherwise the receiver will report the disconnected error when we attempt
1✔
2290
        // to read from it. And the purpose is to verify that the channel is empty,
1✔
2291
        // not that it has been closed.
1✔
2292
        drop(to_main_tx);
1✔
2293

1✔
2294
        // Verify that peer standing was stored in database
1✔
2295
        let standing = state_lock
1✔
2296
            .lock_guard()
1✔
2297
            .await
1✔
2298
            .net
1✔
2299
            .peer_databases
1✔
2300
            .peer_standings
1✔
2301
            .get(peer_address.ip())
1✔
2302
            .await
1✔
2303
            .unwrap();
1✔
2304
        assert!(
1✔
2305
            standing.standing < 0,
1✔
2306
            "Peer must be sanctioned for sending a bad block"
1✔
2307
        );
1✔
2308

1✔
2309
        Ok(())
1✔
2310
    }
1✔
2311

UNCOV
2312
    #[traced_test]
×
2313
    #[tokio::test]
2314
    async fn test_peer_loop_block_with_block_in_db() -> Result<()> {
1✔
2315
        // The scenario tested here is that a client receives a block that is already
1✔
2316
        // known and stored. The expected behavior is to ignore the block and not send
1✔
2317
        // a message to the main task.
1✔
2318

1✔
2319
        let network = Network::Main;
1✔
2320
        let (peer_broadcast_tx, _from_main_rx_clone, to_main_tx, mut to_main_rx1, mut alice, hsd) =
1✔
2321
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
2322
        let peer_address = get_dummy_socket_address(0);
1✔
2323
        let genesis_block: Block = Block::genesis(network);
1✔
2324

1✔
2325
        let now = genesis_block.header().timestamp + Timestamp::hours(1);
1✔
2326
        let block_1 =
1✔
2327
            fake_valid_block_for_tests(&alice, StdRng::seed_from_u64(5550001).random()).await;
1✔
2328
        assert!(
1✔
2329
            block_1.is_valid(&genesis_block, now).await,
1✔
2330
            "Block must be valid for this test to make sense"
1✔
2331
        );
1✔
2332
        alice.set_new_tip(block_1.clone()).await?;
1✔
2333

1✔
2334
        let mock_peer_messages = Mock::new(vec![
1✔
2335
            Action::Read(PeerMessage::Block(Box::new(
1✔
2336
                block_1.clone().try_into().unwrap(),
1✔
2337
            ))),
1✔
2338
            Action::Read(PeerMessage::Bye),
1✔
2339
        ]);
1✔
2340

1✔
2341
        let from_main_rx_clone = peer_broadcast_tx.subscribe();
1✔
2342

1✔
2343
        let mut alice_peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
2344
            to_main_tx.clone(),
1✔
2345
            alice.clone(),
1✔
2346
            peer_address,
1✔
2347
            hsd,
1✔
2348
            false,
1✔
2349
            1,
1✔
2350
            block_1.header().timestamp,
1✔
2351
        );
1✔
2352
        alice_peer_loop_handler
1✔
2353
            .run_wrapper(mock_peer_messages, from_main_rx_clone)
1✔
2354
            .await?;
1✔
2355

1✔
2356
        match to_main_rx1.recv().await {
1✔
2357
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
1✔
2358
            other => bail!("Must receive remove of peer block max height. Got:\n {other:?}"),
1✔
2359
        }
1✔
2360
        match to_main_rx1.try_recv() {
1✔
2361
            Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (),
1✔
2362
            _ => bail!("Block notification must not be sent for block with invalid PoW"),
1✔
2363
        };
1✔
2364
        drop(to_main_tx);
1✔
2365

1✔
2366
        if !alice.lock_guard().await.net.peer_map.is_empty() {
1✔
2367
            bail!("peer map must be empty after closing connection gracefully");
1✔
2368
        }
1✔
2369

1✔
2370
        Ok(())
1✔
2371
    }
1✔
2372

UNCOV
2373
    #[traced_test]
×
2374
    #[tokio::test]
2375
    async fn block_request_batch_simple() {
1✔
2376
        // Scenario: Six blocks (including genesis) are known. Peer requests
1✔
2377
        // from all possible starting points, and client responds with the
1✔
2378
        // correct list of blocks.
1✔
2379
        let network = Network::Main;
1✔
2380
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, mut state_lock, hsd) =
1✔
2381
            get_test_genesis_setup(network, 0, cli_args::Args::default())
1✔
2382
                .await
1✔
2383
                .unwrap();
1✔
2384
        let genesis_block: Block = Block::genesis(network);
1✔
2385
        let peer_address = get_dummy_socket_address(0);
1✔
2386
        let [block_1, block_2, block_3, block_4, block_5] =
1✔
2387
            fake_valid_sequence_of_blocks_for_tests(
1✔
2388
                &genesis_block,
1✔
2389
                Timestamp::hours(1),
1✔
2390
                StdRng::seed_from_u64(5550001).random(),
1✔
2391
            )
1✔
2392
            .await;
1✔
2393
        let blocks = vec![
1✔
2394
            genesis_block,
1✔
2395
            block_1,
1✔
2396
            block_2,
1✔
2397
            block_3,
1✔
2398
            block_4,
1✔
2399
            block_5.clone(),
1✔
2400
        ];
1✔
2401
        for block in blocks.iter().skip(1) {
5✔
2402
            state_lock.set_new_tip(block.to_owned()).await.unwrap();
5✔
2403
        }
1✔
2404

1✔
2405
        let mmra = state_lock
1✔
2406
            .lock_guard()
1✔
2407
            .await
1✔
2408
            .chain
1✔
2409
            .archival_state()
1✔
2410
            .archival_block_mmr
1✔
2411
            .ammr()
1✔
2412
            .to_accumulator_async()
1✔
2413
            .await;
1✔
2414
        for i in 0..=4 {
6✔
2415
            let expected_response = {
5✔
2416
                let state = state_lock.lock_guard().await;
5✔
2417
                let blocks_for_response = blocks.iter().skip(i + 1).cloned().collect_vec();
5✔
2418
                PeerLoopHandler::batch_response(&state, blocks_for_response, &mmra)
5✔
2419
                    .await
5✔
2420
                    .unwrap()
5✔
2421
            };
5✔
2422
            let mock = Mock::new(vec![
5✔
2423
                Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch {
5✔
2424
                    known_blocks: vec![blocks[i].hash()],
5✔
2425
                    max_response_len: 14,
5✔
2426
                    anchor: mmra.clone(),
5✔
2427
                })),
5✔
2428
                Action::Write(PeerMessage::BlockResponseBatch(expected_response)),
5✔
2429
                Action::Read(PeerMessage::Bye),
5✔
2430
            ]);
5✔
2431
            let mut peer_loop_handler = PeerLoopHandler::new(
5✔
2432
                to_main_tx.clone(),
5✔
2433
                state_lock.clone(),
5✔
2434
                peer_address,
5✔
2435
                hsd.clone(),
5✔
2436
                false,
5✔
2437
                1,
5✔
2438
            );
5✔
2439

5✔
2440
            peer_loop_handler
5✔
2441
                .run_wrapper(mock, from_main_rx_clone.resubscribe())
5✔
2442
                .await
5✔
2443
                .unwrap();
5✔
2444
        }
1✔
2445
    }
1✔
2446

UNCOV
2447
    #[traced_test]
×
2448
    #[tokio::test]
2449
    async fn block_request_batch_in_order_test() -> Result<()> {
1✔
2450
        // Scenario: A fork began at block 2, node knows two blocks of height 2 and two of height 3.
1✔
2451
        // A peer requests a batch of blocks starting from block 1. Ensure that the correct blocks
1✔
2452
        // are returned.
1✔
2453

1✔
2454
        let network = Network::Main;
1✔
2455
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, mut state_lock, hsd) =
1✔
2456
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
2457
        let genesis_block: Block = Block::genesis(network);
1✔
2458
        let peer_address = get_dummy_socket_address(0);
1✔
2459
        let [block_1, block_2_a, block_3_a] = fake_valid_sequence_of_blocks_for_tests(
1✔
2460
            &genesis_block,
1✔
2461
            Timestamp::hours(1),
1✔
2462
            StdRng::seed_from_u64(5550001).random(),
1✔
2463
        )
1✔
2464
        .await;
1✔
2465
        let [block_2_b, block_3_b] = fake_valid_sequence_of_blocks_for_tests(
1✔
2466
            &block_1,
1✔
2467
            Timestamp::hours(1),
1✔
2468
            StdRng::seed_from_u64(5550002).random(),
1✔
2469
        )
1✔
2470
        .await;
1✔
2471
        assert_ne!(block_2_b.hash(), block_2_a.hash());
1✔
2472

1✔
2473
        state_lock.set_new_tip(block_1.clone()).await?;
1✔
2474
        state_lock.set_new_tip(block_2_a.clone()).await?;
1✔
2475
        state_lock.set_new_tip(block_2_b.clone()).await?;
1✔
2476
        state_lock.set_new_tip(block_3_b.clone()).await?;
1✔
2477
        state_lock.set_new_tip(block_3_a.clone()).await?;
1✔
2478

1✔
2479
        let anchor = state_lock
1✔
2480
            .lock_guard()
1✔
2481
            .await
1✔
2482
            .chain
1✔
2483
            .archival_state()
1✔
2484
            .archival_block_mmr
1✔
2485
            .ammr()
1✔
2486
            .to_accumulator_async()
1✔
2487
            .await;
1✔
2488
        let response_1 = {
1✔
2489
            let state_lock = state_lock.lock_guard().await;
1✔
2490
            PeerLoopHandler::batch_response(
1✔
2491
                &state_lock,
1✔
2492
                vec![block_1.clone(), block_2_a.clone(), block_3_a.clone()],
1✔
2493
                &anchor,
1✔
2494
            )
1✔
2495
            .await
1✔
2496
            .unwrap()
1✔
2497
        };
1✔
2498

1✔
2499
        let mut mock = Mock::new(vec![
1✔
2500
            Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch {
1✔
2501
                known_blocks: vec![genesis_block.hash()],
1✔
2502
                max_response_len: 14,
1✔
2503
                anchor: anchor.clone(),
1✔
2504
            })),
1✔
2505
            Action::Write(PeerMessage::BlockResponseBatch(response_1)),
1✔
2506
            Action::Read(PeerMessage::Bye),
1✔
2507
        ]);
1✔
2508

1✔
2509
        let mut peer_loop_handler_1 = PeerLoopHandler::with_mocked_time(
1✔
2510
            to_main_tx.clone(),
1✔
2511
            state_lock.clone(),
1✔
2512
            peer_address,
1✔
2513
            hsd.clone(),
1✔
2514
            false,
1✔
2515
            1,
1✔
2516
            block_3_a.header().timestamp,
1✔
2517
        );
1✔
2518

1✔
2519
        peer_loop_handler_1
1✔
2520
            .run_wrapper(mock, from_main_rx_clone.resubscribe())
1✔
2521
            .await?;
1✔
2522

1✔
2523
        // Peer knows block 2_b, verify that canonical chain with 2_a is returned
1✔
2524
        let response_2 = {
1✔
2525
            let state_lock = state_lock.lock_guard().await;
1✔
2526
            PeerLoopHandler::batch_response(
1✔
2527
                &state_lock,
1✔
2528
                vec![block_2_a, block_3_a.clone()],
1✔
2529
                &anchor,
1✔
2530
            )
1✔
2531
            .await
1✔
2532
            .unwrap()
1✔
2533
        };
1✔
2534
        mock = Mock::new(vec![
1✔
2535
            Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch {
1✔
2536
                known_blocks: vec![block_2_b.hash(), block_1.hash(), genesis_block.hash()],
1✔
2537
                max_response_len: 14,
1✔
2538
                anchor,
1✔
2539
            })),
1✔
2540
            Action::Write(PeerMessage::BlockResponseBatch(response_2)),
1✔
2541
            Action::Read(PeerMessage::Bye),
1✔
2542
        ]);
1✔
2543

1✔
2544
        let mut peer_loop_handler_2 = PeerLoopHandler::with_mocked_time(
1✔
2545
            to_main_tx.clone(),
1✔
2546
            state_lock.clone(),
1✔
2547
            peer_address,
1✔
2548
            hsd,
1✔
2549
            false,
1✔
2550
            1,
1✔
2551
            block_3_a.header().timestamp,
1✔
2552
        );
1✔
2553

1✔
2554
        peer_loop_handler_2
1✔
2555
            .run_wrapper(mock, from_main_rx_clone)
1✔
2556
            .await?;
1✔
2557

1✔
2558
        Ok(())
1✔
2559
    }
1✔
2560

UNCOV
2561
    #[traced_test]
×
2562
    #[tokio::test]
2563
    async fn block_request_batch_out_of_order_test() -> Result<()> {
1✔
2564
        // Scenario: A fork began at block 2, node knows two blocks of height 2 and two of height 3.
1✔
2565
        // A peer requests a batch of blocks starting from block 1, but the peer supplies their
1✔
2566
        // hashes in a wrong order. Ensure that the correct blocks are returned, in the right order.
1✔
2567
        // The blocks will be supplied in the correct order but starting from the first digest in
1✔
2568
        // the list that is known and canonical.
1✔
2569

1✔
2570
        let network = Network::Main;
1✔
2571
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, mut state_lock, hsd) =
1✔
2572
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
2573
        let genesis_block = Block::genesis(network);
1✔
2574
        let peer_address = get_dummy_socket_address(0);
1✔
2575
        let [block_1, block_2_a, block_3_a] = fake_valid_sequence_of_blocks_for_tests(
1✔
2576
            &genesis_block,
1✔
2577
            Timestamp::hours(1),
1✔
2578
            StdRng::seed_from_u64(5550001).random(),
1✔
2579
        )
1✔
2580
        .await;
1✔
2581
        let [block_2_b, block_3_b] = fake_valid_sequence_of_blocks_for_tests(
1✔
2582
            &block_1,
1✔
2583
            Timestamp::hours(1),
1✔
2584
            StdRng::seed_from_u64(5550002).random(),
1✔
2585
        )
1✔
2586
        .await;
1✔
2587
        assert_ne!(block_2_a.hash(), block_2_b.hash());
1✔
2588

1✔
2589
        state_lock.set_new_tip(block_1.clone()).await?;
1✔
2590
        state_lock.set_new_tip(block_2_a.clone()).await?;
1✔
2591
        state_lock.set_new_tip(block_2_b.clone()).await?;
1✔
2592
        state_lock.set_new_tip(block_3_b.clone()).await?;
1✔
2593
        state_lock.set_new_tip(block_3_a.clone()).await?;
1✔
2594

1✔
2595
        // Peer knows block 2_b, verify that canonical chain with 2_a is returned
1✔
2596
        let mut expected_anchor = block_3_a.body().block_mmr_accumulator.clone();
1✔
2597
        expected_anchor.append(block_3_a.hash());
1✔
2598
        let state_anchor = state_lock
1✔
2599
            .lock_guard()
1✔
2600
            .await
1✔
2601
            .chain
1✔
2602
            .archival_state()
1✔
2603
            .archival_block_mmr
1✔
2604
            .ammr()
1✔
2605
            .to_accumulator_async()
1✔
2606
            .await;
1✔
2607
        assert_eq!(
1✔
2608
            expected_anchor, state_anchor,
1✔
2609
            "Catching assumption about MMRA in tip and in archival state"
1✔
2610
        );
1✔
2611

1✔
2612
        let response = {
1✔
2613
            let state_lock = state_lock.lock_guard().await;
1✔
2614
            PeerLoopHandler::batch_response(
1✔
2615
                &state_lock,
1✔
2616
                vec![block_1.clone(), block_2_a, block_3_a.clone()],
1✔
2617
                &expected_anchor,
1✔
2618
            )
1✔
2619
            .await
1✔
2620
            .unwrap()
1✔
2621
        };
1✔
2622
        let mock = Mock::new(vec![
1✔
2623
            Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch {
1✔
2624
                known_blocks: vec![block_2_b.hash(), genesis_block.hash(), block_1.hash()],
1✔
2625
                max_response_len: 14,
1✔
2626
                anchor: expected_anchor,
1✔
2627
            })),
1✔
2628
            // Since genesis block is the 1st known in the list of known blocks,
1✔
2629
            // it's immediate descendent, block_1, is the first one returned.
1✔
2630
            Action::Write(PeerMessage::BlockResponseBatch(response)),
1✔
2631
            Action::Read(PeerMessage::Bye),
1✔
2632
        ]);
1✔
2633

1✔
2634
        let mut peer_loop_handler_2 = PeerLoopHandler::with_mocked_time(
1✔
2635
            to_main_tx.clone(),
1✔
2636
            state_lock.clone(),
1✔
2637
            peer_address,
1✔
2638
            hsd,
1✔
2639
            false,
1✔
2640
            1,
1✔
2641
            block_3_a.header().timestamp,
1✔
2642
        );
1✔
2643

1✔
2644
        peer_loop_handler_2
1✔
2645
            .run_wrapper(mock, from_main_rx_clone)
1✔
2646
            .await?;
1✔
2647

1✔
2648
        Ok(())
1✔
2649
    }
1✔
2650

UNCOV
2651
    #[traced_test]
×
2652
    #[tokio::test]
2653
    async fn request_unknown_height_doesnt_crash() {
1✔
2654
        // Scenario: Only genesis block is known. Peer requests block of height
1✔
2655
        // 2.
1✔
2656
        let network = Network::Main;
1✔
2657
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, state_lock, hsd) =
1✔
2658
            get_test_genesis_setup(network, 0, cli_args::Args::default())
1✔
2659
                .await
1✔
2660
                .unwrap();
1✔
2661
        let peer_address = get_dummy_socket_address(0);
1✔
2662
        let mock = Mock::new(vec![
1✔
2663
            Action::Read(PeerMessage::BlockRequestByHeight(2.into())),
1✔
2664
            Action::Read(PeerMessage::Bye),
1✔
2665
        ]);
1✔
2666

1✔
2667
        let mut peer_loop_handler = PeerLoopHandler::new(
1✔
2668
            to_main_tx.clone(),
1✔
2669
            state_lock.clone(),
1✔
2670
            peer_address,
1✔
2671
            hsd,
1✔
2672
            false,
1✔
2673
            1,
1✔
2674
        );
1✔
2675

1✔
2676
        // This will return error if seen read/write order does not match that of the
1✔
2677
        // mocked object.
1✔
2678
        peer_loop_handler
1✔
2679
            .run_wrapper(mock, from_main_rx_clone)
1✔
2680
            .await
1✔
2681
            .unwrap();
1✔
2682

1✔
2683
        // Verify that peer is sanctioned for this nonsense.
1✔
2684
        assert!(state_lock
1✔
2685
            .lock_guard()
1✔
2686
            .await
1✔
2687
            .net
1✔
2688
            .get_peer_standing_from_database(peer_address.ip())
1✔
2689
            .await
1✔
2690
            .unwrap()
1✔
2691
            .standing
1✔
2692
            .is_negative());
1✔
2693
    }
1✔
2694

UNCOV
2695
    #[traced_test]
×
2696
    #[tokio::test]
2697
    async fn find_canonical_chain_when_multiple_blocks_at_same_height_test() -> Result<()> {
1✔
2698
        // Scenario: A fork began at block 2, node knows two blocks of height 2 and two of height 3.
1✔
2699
        // A peer requests a block at height 2. Verify that the correct block at height 2 is
1✔
2700
        // returned.
1✔
2701

1✔
2702
        let network = Network::Main;
1✔
2703
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, mut state_lock, hsd) =
1✔
2704
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
2705
        let genesis_block = Block::genesis(network);
1✔
2706
        let peer_address = get_dummy_socket_address(0);
1✔
2707

1✔
2708
        let [block_1, block_2_a, block_3_a] = fake_valid_sequence_of_blocks_for_tests(
1✔
2709
            &genesis_block,
1✔
2710
            Timestamp::hours(1),
1✔
2711
            StdRng::seed_from_u64(5550001).random(),
1✔
2712
        )
1✔
2713
        .await;
1✔
2714
        let [block_2_b, block_3_b] = fake_valid_sequence_of_blocks_for_tests(
1✔
2715
            &block_1,
1✔
2716
            Timestamp::hours(1),
1✔
2717
            StdRng::seed_from_u64(5550002).random(),
1✔
2718
        )
1✔
2719
        .await;
1✔
2720
        assert_ne!(block_2_a.hash(), block_2_b.hash());
1✔
2721

1✔
2722
        state_lock.set_new_tip(block_1.clone()).await?;
1✔
2723
        state_lock.set_new_tip(block_2_a.clone()).await?;
1✔
2724
        state_lock.set_new_tip(block_2_b.clone()).await?;
1✔
2725
        state_lock.set_new_tip(block_3_b.clone()).await?;
1✔
2726
        state_lock.set_new_tip(block_3_a.clone()).await?;
1✔
2727

1✔
2728
        let mock = Mock::new(vec![
1✔
2729
            Action::Read(PeerMessage::BlockRequestByHeight(2.into())),
1✔
2730
            Action::Write(PeerMessage::Block(Box::new(block_2_a.try_into().unwrap()))),
1✔
2731
            Action::Read(PeerMessage::BlockRequestByHeight(3.into())),
1✔
2732
            Action::Write(PeerMessage::Block(Box::new(
1✔
2733
                block_3_a.clone().try_into().unwrap(),
1✔
2734
            ))),
1✔
2735
            Action::Read(PeerMessage::Bye),
1✔
2736
        ]);
1✔
2737

1✔
2738
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
2739
            to_main_tx.clone(),
1✔
2740
            state_lock.clone(),
1✔
2741
            peer_address,
1✔
2742
            hsd,
1✔
2743
            false,
1✔
2744
            1,
1✔
2745
            block_3_a.header().timestamp,
1✔
2746
        );
1✔
2747

1✔
2748
        // This will return error if seen read/write order does not match that of the
1✔
2749
        // mocked object.
1✔
2750
        peer_loop_handler
1✔
2751
            .run_wrapper(mock, from_main_rx_clone)
1✔
2752
            .await?;
1✔
2753

1✔
2754
        Ok(())
1✔
2755
    }
1✔
2756

UNCOV
2757
    #[traced_test]
×
2758
    #[tokio::test]
2759
    async fn receival_of_block_notification_height_1() {
1✔
2760
        // Scenario: client only knows genesis block. Then receives block
1✔
2761
        // notification of height 1. Must request block 1.
1✔
2762
        let network = Network::Main;
1✔
2763
        let mut rng = StdRng::seed_from_u64(5552401);
1✔
2764
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, to_main_rx1, state_lock, hsd) =
1✔
2765
            get_test_genesis_setup(network, 0, cli_args::Args::default())
1✔
2766
                .await
1✔
2767
                .unwrap();
1✔
2768
        let block_1 = fake_valid_block_for_tests(&state_lock, rng.random()).await;
1✔
2769
        let notification_height1 = (&block_1).into();
1✔
2770
        let mock = Mock::new(vec![
1✔
2771
            Action::Read(PeerMessage::BlockNotification(notification_height1)),
1✔
2772
            Action::Write(PeerMessage::BlockRequestByHeight(1u64.into())),
1✔
2773
            Action::Read(PeerMessage::Bye),
1✔
2774
        ]);
1✔
2775

1✔
2776
        let peer_address = get_dummy_socket_address(0);
1✔
2777
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
2778
            to_main_tx.clone(),
1✔
2779
            state_lock.clone(),
1✔
2780
            peer_address,
1✔
2781
            hsd,
1✔
2782
            false,
1✔
2783
            1,
1✔
2784
            block_1.header().timestamp,
1✔
2785
        );
1✔
2786
        peer_loop_handler
1✔
2787
            .run_wrapper(mock, from_main_rx_clone)
1✔
2788
            .await
1✔
2789
            .unwrap();
1✔
2790

1✔
2791
        drop(to_main_rx1);
1✔
2792
    }
1✔
2793

UNCOV
2794
    #[traced_test]
×
2795
    #[tokio::test]
2796
    async fn receive_block_request_by_height_block_7() {
1✔
2797
        // Scenario: client only knows blocks up to height 7. Then receives block-
1✔
2798
        // request-by-height for height 7. Must respond with block 7.
1✔
2799
        let network = Network::Main;
1✔
2800
        let mut rng = StdRng::seed_from_u64(5552401);
1✔
2801
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, to_main_rx1, mut state_lock, hsd) =
1✔
2802
            get_test_genesis_setup(network, 0, cli_args::Args::default())
1✔
2803
                .await
1✔
2804
                .unwrap();
1✔
2805
        let genesis_block = Block::genesis(network);
1✔
2806
        let blocks: [Block; 7] = fake_valid_sequence_of_blocks_for_tests(
1✔
2807
            &genesis_block,
1✔
2808
            Timestamp::hours(1),
1✔
2809
            rng.random(),
1✔
2810
        )
1✔
2811
        .await;
1✔
2812
        let block7 = blocks.last().unwrap().to_owned();
1✔
2813
        let tip_height: u64 = block7.header().height.into();
1✔
2814
        assert_eq!(7, tip_height);
1✔
2815

1✔
2816
        for block in &blocks {
8✔
2817
            state_lock.set_new_tip(block.to_owned()).await.unwrap();
7✔
2818
        }
1✔
2819

1✔
2820
        let block7_response = PeerMessage::Block(Box::new(block7.try_into().unwrap()));
1✔
2821
        let mock = Mock::new(vec![
1✔
2822
            Action::Read(PeerMessage::BlockRequestByHeight(7u64.into())),
1✔
2823
            Action::Write(block7_response),
1✔
2824
            Action::Read(PeerMessage::Bye),
1✔
2825
        ]);
1✔
2826

1✔
2827
        let peer_address = get_dummy_socket_address(0);
1✔
2828
        let mut peer_loop_handler = PeerLoopHandler::new(
1✔
2829
            to_main_tx.clone(),
1✔
2830
            state_lock.clone(),
1✔
2831
            peer_address,
1✔
2832
            hsd,
1✔
2833
            false,
1✔
2834
            1,
1✔
2835
        );
1✔
2836
        peer_loop_handler
1✔
2837
            .run_wrapper(mock, from_main_rx_clone)
1✔
2838
            .await
1✔
2839
            .unwrap();
1✔
2840

1✔
2841
        drop(to_main_rx1);
1✔
2842
    }
1✔
2843

UNCOV
2844
    #[traced_test]
×
2845
    #[tokio::test]
2846
    async fn test_peer_loop_receival_of_first_block() -> Result<()> {
1✔
2847
        // Scenario: client only knows genesis block. Then receives block 1.
1✔
2848

1✔
2849
        let network = Network::Main;
1✔
2850
        let mut rng = StdRng::seed_from_u64(5550001);
1✔
2851
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, mut to_main_rx1, state_lock, hsd) =
1✔
2852
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
2853
        let peer_address = get_dummy_socket_address(0);
1✔
2854

1✔
2855
        let block_1 = fake_valid_block_for_tests(&state_lock, rng.random()).await;
1✔
2856
        let mock = Mock::new(vec![
1✔
2857
            Action::Read(PeerMessage::Block(Box::new(
1✔
2858
                block_1.clone().try_into().unwrap(),
1✔
2859
            ))),
1✔
2860
            Action::Read(PeerMessage::Bye),
1✔
2861
        ]);
1✔
2862

1✔
2863
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
2864
            to_main_tx.clone(),
1✔
2865
            state_lock.clone(),
1✔
2866
            peer_address,
1✔
2867
            hsd,
1✔
2868
            false,
1✔
2869
            1,
1✔
2870
            block_1.header().timestamp,
1✔
2871
        );
1✔
2872
        peer_loop_handler
1✔
2873
            .run_wrapper(mock, from_main_rx_clone)
1✔
2874
            .await?;
1✔
2875

1✔
2876
        // Verify that a block was sent to `main_loop`
1✔
2877
        match to_main_rx1.recv().await {
1✔
2878
            Some(PeerTaskToMain::NewBlocks(_block)) => (),
1✔
2879
            _ => bail!("Did not find msg sent to main task"),
1✔
2880
        };
1✔
2881

1✔
2882
        match to_main_rx1.recv().await {
1✔
2883
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
1✔
2884
            _ => bail!("Must receive remove of peer block max height"),
1✔
2885
        }
1✔
2886

1✔
2887
        if !state_lock.lock_guard().await.net.peer_map.is_empty() {
1✔
2888
            bail!("peer map must be empty after closing connection gracefully");
1✔
2889
        }
1✔
2890

1✔
2891
        Ok(())
1✔
2892
    }
1✔
2893

UNCOV
2894
    #[traced_test]
×
2895
    #[tokio::test]
2896
    async fn test_peer_loop_receival_of_second_block_no_blocks_in_db() -> Result<()> {
1✔
2897
        // In this scenario, the client only knows the genesis block (block 0) and then
1✔
2898
        // receives block 2, meaning that block 1 will have to be requested.
1✔
2899

1✔
2900
        let network = Network::Main;
1✔
2901
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, mut to_main_rx1, state_lock, hsd) =
1✔
2902
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
2903
        let peer_address = get_dummy_socket_address(0);
1✔
2904
        let genesis_block: Block = state_lock
1✔
2905
            .lock_guard()
1✔
2906
            .await
1✔
2907
            .chain
1✔
2908
            .archival_state()
1✔
2909
            .get_tip()
1✔
2910
            .await;
1✔
2911
        let [block_1, block_2] = fake_valid_sequence_of_blocks_for_tests(
1✔
2912
            &genesis_block,
1✔
2913
            Timestamp::hours(1),
1✔
2914
            StdRng::seed_from_u64(5550001).random(),
1✔
2915
        )
1✔
2916
        .await;
1✔
2917

1✔
2918
        let mock = Mock::new(vec![
1✔
2919
            Action::Read(PeerMessage::Block(Box::new(
1✔
2920
                block_2.clone().try_into().unwrap(),
1✔
2921
            ))),
1✔
2922
            Action::Write(PeerMessage::BlockRequestByHash(block_1.hash())),
1✔
2923
            Action::Read(PeerMessage::Block(Box::new(
1✔
2924
                block_1.clone().try_into().unwrap(),
1✔
2925
            ))),
1✔
2926
            Action::Read(PeerMessage::Bye),
1✔
2927
        ]);
1✔
2928

1✔
2929
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
2930
            to_main_tx.clone(),
1✔
2931
            state_lock.clone(),
1✔
2932
            peer_address,
1✔
2933
            hsd,
1✔
2934
            true,
1✔
2935
            1,
1✔
2936
            block_2.header().timestamp,
1✔
2937
        );
1✔
2938
        peer_loop_handler
1✔
2939
            .run_wrapper(mock, from_main_rx_clone)
1✔
2940
            .await?;
1✔
2941

1✔
2942
        match to_main_rx1.recv().await {
1✔
2943
            Some(PeerTaskToMain::NewBlocks(blocks)) => {
1✔
2944
                if blocks[0].hash() != block_1.hash() {
1✔
2945
                    bail!("1st received block by main loop must be block 1");
1✔
2946
                }
1✔
2947
                if blocks[1].hash() != block_2.hash() {
1✔
2948
                    bail!("2nd received block by main loop must be block 2");
1✔
2949
                }
1✔
2950
            }
1✔
2951
            _ => bail!("Did not find msg sent to main task 1"),
1✔
2952
        };
1✔
2953
        match to_main_rx1.recv().await {
1✔
2954
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
1✔
2955
            _ => bail!("Must receive remove of peer block max height"),
1✔
2956
        }
1✔
2957

1✔
2958
        if !state_lock.lock_guard().await.net.peer_map.is_empty() {
1✔
2959
            bail!("peer map must be empty after closing connection gracefully");
1✔
2960
        }
1✔
2961

1✔
2962
        Ok(())
1✔
2963
    }
1✔
2964

UNCOV
2965
    #[traced_test]
×
2966
    #[tokio::test]
2967
    async fn prevent_ram_exhaustion_test() -> Result<()> {
1✔
2968
        // In this scenario the peer sends more blocks than the client allows to store in the
1✔
2969
        // fork-reconciliation field. This should result in abandonment of the fork-reconciliation
1✔
2970
        // process as the alternative is that the program will crash because it runs out of RAM.
1✔
2971

1✔
2972
        let network = Network::Main;
1✔
2973
        let mut rng = StdRng::seed_from_u64(5550001);
1✔
2974
        let (
1✔
2975
            _peer_broadcast_tx,
1✔
2976
            from_main_rx_clone,
1✔
2977
            to_main_tx,
1✔
2978
            mut to_main_rx1,
1✔
2979
            mut state_lock,
1✔
2980
            _hsd,
1✔
2981
        ) = get_test_genesis_setup(network, 1, cli_args::Args::default()).await?;
1✔
2982
        let genesis_block = Block::genesis(network);
1✔
2983

1✔
2984
        // Restrict max number of blocks held in memory to 2.
1✔
2985
        let mut cli = state_lock.cli().clone();
1✔
2986
        cli.sync_mode_threshold = 2;
1✔
2987
        state_lock.set_cli(cli).await;
1✔
2988

1✔
2989
        let (hsd1, peer_address1) = get_dummy_peer_connection_data_genesis(Network::Alpha, 1);
1✔
2990
        let [block_1, _block_2, block_3, block_4] = fake_valid_sequence_of_blocks_for_tests(
1✔
2991
            &genesis_block,
1✔
2992
            Timestamp::hours(1),
1✔
2993
            rng.random(),
1✔
2994
        )
1✔
2995
        .await;
1✔
2996
        state_lock.set_new_tip(block_1.clone()).await?;
1✔
2997

1✔
2998
        let mock = Mock::new(vec![
1✔
2999
            Action::Read(PeerMessage::Block(Box::new(
1✔
3000
                block_4.clone().try_into().unwrap(),
1✔
3001
            ))),
1✔
3002
            Action::Write(PeerMessage::BlockRequestByHash(block_3.hash())),
1✔
3003
            Action::Read(PeerMessage::Block(Box::new(
1✔
3004
                block_3.clone().try_into().unwrap(),
1✔
3005
            ))),
1✔
3006
            Action::Read(PeerMessage::Bye),
1✔
3007
        ]);
1✔
3008

1✔
3009
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
3010
            to_main_tx.clone(),
1✔
3011
            state_lock.clone(),
1✔
3012
            peer_address1,
1✔
3013
            hsd1,
1✔
3014
            true,
1✔
3015
            1,
1✔
3016
            block_4.header().timestamp,
1✔
3017
        );
1✔
3018
        peer_loop_handler
1✔
3019
            .run_wrapper(mock, from_main_rx_clone)
1✔
3020
            .await?;
1✔
3021

1✔
3022
        match to_main_rx1.recv().await {
1✔
3023
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
1✔
3024
            _ => bail!("Must receive remove of peer block max height"),
1✔
3025
        }
1✔
3026

1✔
3027
        // Verify that no block is sent to main loop.
1✔
3028
        match to_main_rx1.try_recv() {
1✔
3029
            Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (),
1✔
3030
            _ => bail!("Peer must not handle more fork-reconciliation blocks than specified in CLI arguments"),
1✔
3031
        };
1✔
3032
        drop(to_main_tx);
1✔
3033

1✔
3034
        // Verify that peer is sanctioned for failed fork reconciliation attempt
1✔
3035
        assert!(state_lock
1✔
3036
            .lock_guard()
1✔
3037
            .await
1✔
3038
            .net
1✔
3039
            .get_peer_standing_from_database(peer_address1.ip())
1✔
3040
            .await
1✔
3041
            .unwrap()
1✔
3042
            .standing
1✔
3043
            .is_negative());
1✔
3044

1✔
3045
        Ok(())
1✔
3046
    }
1✔
3047

UNCOV
3048
    #[traced_test]
×
3049
    #[tokio::test]
3050
    async fn test_peer_loop_receival_of_fourth_block_one_block_in_db() {
1✔
3051
        // In this scenario, the client know the genesis block (block 0) and block 1, it
1✔
3052
        // then receives block 4, meaning that block 3 and 2 will have to be requested.
1✔
3053

1✔
3054
        let network = Network::Main;
1✔
3055
        let (
1✔
3056
            _peer_broadcast_tx,
1✔
3057
            from_main_rx_clone,
1✔
3058
            to_main_tx,
1✔
3059
            mut to_main_rx1,
1✔
3060
            mut state_lock,
1✔
3061
            hsd,
1✔
3062
        ) = get_test_genesis_setup(network, 0, cli_args::Args::default())
1✔
3063
            .await
1✔
3064
            .unwrap();
1✔
3065
        let peer_address: SocketAddr = get_dummy_socket_address(0);
1✔
3066
        let genesis_block = Block::genesis(network);
1✔
3067
        let [block_1, block_2, block_3, block_4] = fake_valid_sequence_of_blocks_for_tests(
1✔
3068
            &genesis_block,
1✔
3069
            Timestamp::hours(1),
1✔
3070
            StdRng::seed_from_u64(5550001).random(),
1✔
3071
        )
1✔
3072
        .await;
1✔
3073
        state_lock.set_new_tip(block_1.clone()).await.unwrap();
1✔
3074

1✔
3075
        let mock = Mock::new(vec![
1✔
3076
            Action::Read(PeerMessage::Block(Box::new(
1✔
3077
                block_4.clone().try_into().unwrap(),
1✔
3078
            ))),
1✔
3079
            Action::Write(PeerMessage::BlockRequestByHash(block_3.hash())),
1✔
3080
            Action::Read(PeerMessage::Block(Box::new(
1✔
3081
                block_3.clone().try_into().unwrap(),
1✔
3082
            ))),
1✔
3083
            Action::Write(PeerMessage::BlockRequestByHash(block_2.hash())),
1✔
3084
            Action::Read(PeerMessage::Block(Box::new(
1✔
3085
                block_2.clone().try_into().unwrap(),
1✔
3086
            ))),
1✔
3087
            Action::Read(PeerMessage::Bye),
1✔
3088
        ]);
1✔
3089

1✔
3090
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
3091
            to_main_tx.clone(),
1✔
3092
            state_lock.clone(),
1✔
3093
            peer_address,
1✔
3094
            hsd,
1✔
3095
            true,
1✔
3096
            1,
1✔
3097
            block_4.header().timestamp,
1✔
3098
        );
1✔
3099
        peer_loop_handler
1✔
3100
            .run_wrapper(mock, from_main_rx_clone)
1✔
3101
            .await
1✔
3102
            .unwrap();
1✔
3103

1✔
3104
        let Some(PeerTaskToMain::NewBlocks(blocks)) = to_main_rx1.recv().await else {
1✔
3105
            panic!("Did not find msg sent to main task");
1✔
3106
        };
1✔
3107
        assert_eq!(blocks[0].hash(), block_2.hash());
1✔
3108
        assert_eq!(blocks[1].hash(), block_3.hash());
1✔
3109
        assert_eq!(blocks[2].hash(), block_4.hash());
1✔
3110

1✔
3111
        let Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) = to_main_rx1.recv().await else {
1✔
3112
            panic!("Must receive remove of peer block max height");
1✔
3113
        };
1✔
3114

1✔
3115
        assert!(
1✔
3116
            state_lock.lock_guard().await.net.peer_map.is_empty(),
1✔
3117
            "peer map must be empty after closing connection gracefully"
1✔
3118
        );
1✔
3119
    }
1✔
3120

UNCOV
3121
    #[traced_test]
×
3122
    #[tokio::test]
3123
    async fn test_peer_loop_receival_of_third_block_no_blocks_in_db() -> Result<()> {
1✔
3124
        // In this scenario, the client only knows the genesis block (block 0) and then
1✔
3125
        // receives block 3, meaning that block 2 and 1 will have to be requested.
1✔
3126

1✔
3127
        let network = Network::Main;
1✔
3128
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, mut to_main_rx1, state_lock, hsd) =
1✔
3129
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
3130
        let peer_address = get_dummy_socket_address(0);
1✔
3131
        let genesis_block = Block::genesis(network);
1✔
3132

1✔
3133
        let [block_1, block_2, block_3] = fake_valid_sequence_of_blocks_for_tests(
1✔
3134
            &genesis_block,
1✔
3135
            Timestamp::hours(1),
1✔
3136
            StdRng::seed_from_u64(5550001).random(),
1✔
3137
        )
1✔
3138
        .await;
1✔
3139

1✔
3140
        let mock = Mock::new(vec![
1✔
3141
            Action::Read(PeerMessage::Block(Box::new(
1✔
3142
                block_3.clone().try_into().unwrap(),
1✔
3143
            ))),
1✔
3144
            Action::Write(PeerMessage::BlockRequestByHash(block_2.hash())),
1✔
3145
            Action::Read(PeerMessage::Block(Box::new(
1✔
3146
                block_2.clone().try_into().unwrap(),
1✔
3147
            ))),
1✔
3148
            Action::Write(PeerMessage::BlockRequestByHash(block_1.hash())),
1✔
3149
            Action::Read(PeerMessage::Block(Box::new(
1✔
3150
                block_1.clone().try_into().unwrap(),
1✔
3151
            ))),
1✔
3152
            Action::Read(PeerMessage::Bye),
1✔
3153
        ]);
1✔
3154

1✔
3155
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
3156
            to_main_tx.clone(),
1✔
3157
            state_lock.clone(),
1✔
3158
            peer_address,
1✔
3159
            hsd,
1✔
3160
            true,
1✔
3161
            1,
1✔
3162
            block_3.header().timestamp,
1✔
3163
        );
1✔
3164
        peer_loop_handler
1✔
3165
            .run_wrapper(mock, from_main_rx_clone)
1✔
3166
            .await?;
1✔
3167

1✔
3168
        match to_main_rx1.recv().await {
1✔
3169
            Some(PeerTaskToMain::NewBlocks(blocks)) => {
1✔
3170
                if blocks[0].hash() != block_1.hash() {
1✔
3171
                    bail!("1st received block by main loop must be block 1");
1✔
3172
                }
1✔
3173
                if blocks[1].hash() != block_2.hash() {
1✔
3174
                    bail!("2nd received block by main loop must be block 2");
1✔
3175
                }
1✔
3176
                if blocks[2].hash() != block_3.hash() {
1✔
3177
                    bail!("3rd received block by main loop must be block 3");
1✔
3178
                }
1✔
3179
            }
1✔
3180
            _ => bail!("Did not find msg sent to main task"),
1✔
3181
        };
1✔
3182
        match to_main_rx1.recv().await {
1✔
3183
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
1✔
3184
            _ => bail!("Must receive remove of peer block max height"),
1✔
3185
        }
1✔
3186

1✔
3187
        if !state_lock.lock_guard().await.net.peer_map.is_empty() {
1✔
3188
            bail!("peer map must be empty after closing connection gracefully");
1✔
3189
        }
1✔
3190

1✔
3191
        Ok(())
1✔
3192
    }
1✔
3193

UNCOV
3194
    #[traced_test]
×
3195
    #[tokio::test]
3196
    async fn test_block_reconciliation_interrupted_by_block_notification() -> Result<()> {
1✔
3197
        // In this scenario, the client know the genesis block (block 0) and block 1, it
1✔
3198
        // then receives block 4, meaning that block 3, 2, and 1 will have to be requested.
1✔
3199
        // But the requests are interrupted by the peer sending another message: a new block
1✔
3200
        // notification.
1✔
3201

1✔
3202
        let network = Network::Main;
1✔
3203
        let (
1✔
3204
            _peer_broadcast_tx,
1✔
3205
            from_main_rx_clone,
1✔
3206
            to_main_tx,
1✔
3207
            mut to_main_rx1,
1✔
3208
            mut state_lock,
1✔
3209
            hsd,
1✔
3210
        ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
3211
        let peer_socket_address: SocketAddr = get_dummy_socket_address(0);
1✔
3212
        let genesis_block: Block = state_lock
1✔
3213
            .lock_guard()
1✔
3214
            .await
1✔
3215
            .chain
1✔
3216
            .archival_state()
1✔
3217
            .get_tip()
1✔
3218
            .await;
1✔
3219

1✔
3220
        let [block_1, block_2, block_3, block_4, block_5] =
1✔
3221
            fake_valid_sequence_of_blocks_for_tests(
1✔
3222
                &genesis_block,
1✔
3223
                Timestamp::hours(1),
1✔
3224
                StdRng::seed_from_u64(5550001).random(),
1✔
3225
            )
1✔
3226
            .await;
1✔
3227
        state_lock.set_new_tip(block_1.clone()).await?;
1✔
3228

1✔
3229
        let mock = Mock::new(vec![
1✔
3230
            Action::Read(PeerMessage::Block(Box::new(
1✔
3231
                block_4.clone().try_into().unwrap(),
1✔
3232
            ))),
1✔
3233
            Action::Write(PeerMessage::BlockRequestByHash(block_3.hash())),
1✔
3234
            Action::Read(PeerMessage::Block(Box::new(
1✔
3235
                block_3.clone().try_into().unwrap(),
1✔
3236
            ))),
1✔
3237
            Action::Write(PeerMessage::BlockRequestByHash(block_2.hash())),
1✔
3238
            //
1✔
3239
            // Now make the interruption of the block reconciliation process
1✔
3240
            Action::Read(PeerMessage::BlockNotification((&block_5).into())),
1✔
3241
            //
1✔
3242
            // Complete the block reconciliation process by requesting the last block
1✔
3243
            // in this process, to get back to a mutually known block.
1✔
3244
            Action::Read(PeerMessage::Block(Box::new(
1✔
3245
                block_2.clone().try_into().unwrap(),
1✔
3246
            ))),
1✔
3247
            //
1✔
3248
            // Then anticipate the request of the block that was announced
1✔
3249
            // in the interruption.
1✔
3250
            // Note that we cannot anticipate the response, as only the main
1✔
3251
            // task writes to the database. And the database needs to be updated
1✔
3252
            // for the handling of block 5 to be done correctly.
1✔
3253
            Action::Write(PeerMessage::BlockRequestByHeight(
1✔
3254
                block_5.kernel.header.height,
1✔
3255
            )),
1✔
3256
            Action::Read(PeerMessage::Bye),
1✔
3257
        ]);
1✔
3258

1✔
3259
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
3260
            to_main_tx.clone(),
1✔
3261
            state_lock.clone(),
1✔
3262
            peer_socket_address,
1✔
3263
            hsd,
1✔
3264
            false,
1✔
3265
            1,
1✔
3266
            block_5.header().timestamp,
1✔
3267
        );
1✔
3268
        peer_loop_handler
1✔
3269
            .run_wrapper(mock, from_main_rx_clone)
1✔
3270
            .await?;
1✔
3271

1✔
3272
        match to_main_rx1.recv().await {
1✔
3273
            Some(PeerTaskToMain::NewBlocks(blocks)) => {
1✔
3274
                if blocks[0].hash() != block_2.hash() {
1✔
3275
                    bail!("1st received block by main loop must be block 1");
1✔
3276
                }
1✔
3277
                if blocks[1].hash() != block_3.hash() {
1✔
3278
                    bail!("2nd received block by main loop must be block 2");
1✔
3279
                }
1✔
3280
                if blocks[2].hash() != block_4.hash() {
1✔
3281
                    bail!("3rd received block by main loop must be block 3");
1✔
3282
                }
1✔
3283
            }
1✔
3284
            _ => bail!("Did not find msg sent to main task"),
1✔
3285
        };
1✔
3286
        match to_main_rx1.recv().await {
1✔
3287
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
1✔
3288
            _ => bail!("Must receive remove of peer block max height"),
1✔
3289
        }
1✔
3290

1✔
3291
        if !state_lock.lock_guard().await.net.peer_map.is_empty() {
1✔
3292
            bail!("peer map must be empty after closing connection gracefully");
1✔
3293
        }
1✔
3294

1✔
3295
        Ok(())
1✔
3296
    }
1✔
3297

UNCOV
3298
    #[traced_test]
×
3299
    #[tokio::test]
3300
    async fn test_block_reconciliation_interrupted_by_peer_list_request() -> Result<()> {
1✔
3301
        // In this scenario, the client knows the genesis block (block 0) and block 1, it
1✔
3302
        // then receives block 4, meaning that block 3, 2, and 1 will have to be requested.
1✔
3303
        // But the requests are interrupted by the peer sending another message: a request
1✔
3304
        // for a list of peers.
1✔
3305

1✔
3306
        let network = Network::Main;
1✔
3307
        let (
1✔
3308
            _peer_broadcast_tx,
1✔
3309
            from_main_rx_clone,
1✔
3310
            to_main_tx,
1✔
3311
            mut to_main_rx1,
1✔
3312
            mut state_lock,
1✔
3313
            _hsd,
1✔
3314
        ) = get_test_genesis_setup(network, 1, cli_args::Args::default()).await?;
1✔
3315
        let genesis_block = Block::genesis(network);
1✔
3316
        let peer_infos: Vec<PeerInfo> = state_lock
1✔
3317
            .lock_guard()
1✔
3318
            .await
1✔
3319
            .net
1✔
3320
            .peer_map
1✔
3321
            .clone()
1✔
3322
            .into_values()
1✔
3323
            .collect::<Vec<_>>();
1✔
3324

1✔
3325
        let [block_1, block_2, block_3, block_4] = fake_valid_sequence_of_blocks_for_tests(
1✔
3326
            &genesis_block,
1✔
3327
            Timestamp::hours(1),
1✔
3328
            StdRng::seed_from_u64(5550001).random(),
1✔
3329
        )
1✔
3330
        .await;
1✔
3331
        state_lock.set_new_tip(block_1.clone()).await?;
1✔
3332

1✔
3333
        let (hsd_1, sa_1) = get_dummy_peer_connection_data_genesis(network, 1);
1✔
3334
        let expected_peer_list_resp = vec![
1✔
3335
            (
1✔
3336
                peer_infos[0].listen_address().unwrap(),
1✔
3337
                peer_infos[0].instance_id(),
1✔
3338
            ),
1✔
3339
            (sa_1, hsd_1.instance_id),
1✔
3340
        ];
1✔
3341
        let mock = Mock::new(vec![
1✔
3342
            Action::Read(PeerMessage::Block(Box::new(
1✔
3343
                block_4.clone().try_into().unwrap(),
1✔
3344
            ))),
1✔
3345
            Action::Write(PeerMessage::BlockRequestByHash(block_3.hash())),
1✔
3346
            Action::Read(PeerMessage::Block(Box::new(
1✔
3347
                block_3.clone().try_into().unwrap(),
1✔
3348
            ))),
1✔
3349
            Action::Write(PeerMessage::BlockRequestByHash(block_2.hash())),
1✔
3350
            //
1✔
3351
            // Now make the interruption of the block reconciliation process
1✔
3352
            Action::Read(PeerMessage::PeerListRequest),
1✔
3353
            //
1✔
3354
            // Answer the request for a peer list
1✔
3355
            Action::Write(PeerMessage::PeerListResponse(expected_peer_list_resp)),
1✔
3356
            //
1✔
3357
            // Complete the block reconciliation process by requesting the last block
1✔
3358
            // in this process, to get back to a mutually known block.
1✔
3359
            Action::Read(PeerMessage::Block(Box::new(
1✔
3360
                block_2.clone().try_into().unwrap(),
1✔
3361
            ))),
1✔
3362
            Action::Read(PeerMessage::Bye),
1✔
3363
        ]);
1✔
3364

1✔
3365
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
3366
            to_main_tx,
1✔
3367
            state_lock.clone(),
1✔
3368
            sa_1,
1✔
3369
            hsd_1,
1✔
3370
            true,
1✔
3371
            1,
1✔
3372
            block_4.header().timestamp,
1✔
3373
        );
1✔
3374
        peer_loop_handler
1✔
3375
            .run_wrapper(mock, from_main_rx_clone)
1✔
3376
            .await?;
1✔
3377

1✔
3378
        // Verify that blocks are sent to `main_loop` in expected ordering
1✔
3379
        match to_main_rx1.recv().await {
1✔
3380
            Some(PeerTaskToMain::NewBlocks(blocks)) => {
1✔
3381
                if blocks[0].hash() != block_2.hash() {
1✔
3382
                    bail!("1st received block by main loop must be block 1");
1✔
3383
                }
1✔
3384
                if blocks[1].hash() != block_3.hash() {
1✔
3385
                    bail!("2nd received block by main loop must be block 2");
1✔
3386
                }
1✔
3387
                if blocks[2].hash() != block_4.hash() {
1✔
3388
                    bail!("3rd received block by main loop must be block 3");
1✔
3389
                }
1✔
3390
            }
1✔
3391
            _ => bail!("Did not find msg sent to main task"),
1✔
3392
        };
1✔
3393

1✔
3394
        assert_eq!(
1✔
3395
            1,
1✔
3396
            state_lock.lock_guard().await.net.peer_map.len(),
1✔
3397
            "One peer must remain in peer list after peer_1 closed gracefully"
1✔
3398
        );
1✔
3399

1✔
3400
        Ok(())
1✔
3401
    }
1✔
3402

UNCOV
3403
    #[traced_test]
×
3404
    #[tokio::test]
3405
    async fn empty_mempool_request_tx_test() {
1✔
3406
        // In this scenario the client receives a transaction notification from
1✔
3407
        // a peer of a transaction it doesn't know; the client must then request it.
1✔
3408

1✔
3409
        let network = Network::Main;
1✔
3410
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, mut to_main_rx1, state_lock, _hsd) =
1✔
3411
            get_test_genesis_setup(network, 1, cli_args::Args::default())
1✔
3412
                .await
1✔
3413
                .unwrap();
1✔
3414

1✔
3415
        let spending_key = state_lock
1✔
3416
            .lock_guard()
1✔
3417
            .await
1✔
3418
            .wallet_state
1✔
3419
            .wallet_entropy
1✔
3420
            .nth_symmetric_key_for_tests(0);
1✔
3421
        let genesis_block = Block::genesis(network);
1✔
3422
        let now = genesis_block.kernel.header.timestamp;
1✔
3423
        let (transaction_1, _, _change_output) = state_lock
1✔
3424
            .lock_guard()
1✔
3425
            .await
1✔
3426
            .create_transaction_with_prover_capability(
1✔
3427
                Default::default(),
1✔
3428
                spending_key.into(),
1✔
3429
                UtxoNotificationMedium::OffChain,
1✔
3430
                NativeCurrencyAmount::coins(0),
1✔
3431
                now,
1✔
3432
                TxProvingCapability::ProofCollection,
1✔
3433
                &TritonVmJobQueue::dummy(),
1✔
3434
            )
1✔
3435
            .await
1✔
3436
            .unwrap();
1✔
3437

1✔
3438
        // Build the resulting transaction notification
1✔
3439
        let tx_notification: TransactionNotification = (&transaction_1).try_into().unwrap();
1✔
3440
        let mock = Mock::new(vec![
1✔
3441
            Action::Read(PeerMessage::TransactionNotification(tx_notification)),
1✔
3442
            Action::Write(PeerMessage::TransactionRequest(tx_notification.txid)),
1✔
3443
            Action::Read(PeerMessage::Transaction(Box::new(
1✔
3444
                (&transaction_1).try_into().unwrap(),
1✔
3445
            ))),
1✔
3446
            Action::Read(PeerMessage::Bye),
1✔
3447
        ]);
1✔
3448

1✔
3449
        let (hsd_1, _sa_1) = get_dummy_peer_connection_data_genesis(network, 1);
1✔
3450

1✔
3451
        // Mock a timestamp to allow transaction to be considered valid
1✔
3452
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
3453
            to_main_tx,
1✔
3454
            state_lock.clone(),
1✔
3455
            get_dummy_socket_address(0),
1✔
3456
            hsd_1.clone(),
1✔
3457
            true,
1✔
3458
            1,
1✔
3459
            now,
1✔
3460
        );
1✔
3461

1✔
3462
        let mut peer_state = MutablePeerState::new(hsd_1.tip_header.height);
1✔
3463

1✔
3464
        assert!(
1✔
3465
            state_lock.lock_guard().await.mempool.is_empty(),
1✔
3466
            "Mempool must be empty at init"
1✔
3467
        );
1✔
3468
        peer_loop_handler
1✔
3469
            .run(mock, from_main_rx_clone, &mut peer_state)
1✔
3470
            .await
1✔
3471
            .unwrap();
1✔
3472

1✔
3473
        // Transaction must be sent to `main_loop`. The transaction is stored to the mempool
1✔
3474
        // by the `main_loop`.
1✔
3475
        match to_main_rx1.recv().await {
1✔
3476
            Some(PeerTaskToMain::Transaction(_)) => (),
1✔
3477
            _ => panic!("Must receive remove of peer block max height"),
1✔
3478
        };
1✔
3479
    }
1✔
3480

UNCOV
3481
    #[traced_test]
×
3482
    #[tokio::test]
3483
    async fn populated_mempool_request_tx_test() -> Result<()> {
1✔
3484
        // In this scenario the peer is informed of a transaction that it already knows
1✔
3485

1✔
3486
        let network = Network::Main;
1✔
3487
        let (
1✔
3488
            _peer_broadcast_tx,
1✔
3489
            from_main_rx_clone,
1✔
3490
            to_main_tx,
1✔
3491
            mut to_main_rx1,
1✔
3492
            mut state_lock,
1✔
3493
            _hsd,
1✔
3494
        ) = get_test_genesis_setup(network, 1, cli_args::Args::default())
1✔
3495
            .await
1✔
3496
            .unwrap();
1✔
3497
        let spending_key = state_lock
1✔
3498
            .lock_guard()
1✔
3499
            .await
1✔
3500
            .wallet_state
1✔
3501
            .wallet_entropy
1✔
3502
            .nth_symmetric_key_for_tests(0);
1✔
3503

1✔
3504
        let genesis_block = Block::genesis(network);
1✔
3505
        let now = genesis_block.kernel.header.timestamp;
1✔
3506
        let (transaction_1, _, _change_output) = state_lock
1✔
3507
            .lock_guard()
1✔
3508
            .await
1✔
3509
            .create_transaction_with_prover_capability(
1✔
3510
                Default::default(),
1✔
3511
                spending_key.into(),
1✔
3512
                UtxoNotificationMedium::OffChain,
1✔
3513
                NativeCurrencyAmount::coins(0),
1✔
3514
                now,
1✔
3515
                TxProvingCapability::ProofCollection,
1✔
3516
                &TritonVmJobQueue::dummy(),
1✔
3517
            )
1✔
3518
            .await
1✔
3519
            .unwrap();
1✔
3520

1✔
3521
        let (hsd_1, _sa_1) = get_dummy_peer_connection_data_genesis(network, 1);
1✔
3522
        let mut peer_loop_handler = PeerLoopHandler::new(
1✔
3523
            to_main_tx,
1✔
3524
            state_lock.clone(),
1✔
3525
            get_dummy_socket_address(0),
1✔
3526
            hsd_1.clone(),
1✔
3527
            true,
1✔
3528
            1,
1✔
3529
        );
1✔
3530
        let mut peer_state = MutablePeerState::new(hsd_1.tip_header.height);
1✔
3531

1✔
3532
        assert!(
1✔
3533
            state_lock.lock_guard().await.mempool.is_empty(),
1✔
3534
            "Mempool must be empty at init"
1✔
3535
        );
1✔
3536
        state_lock
1✔
3537
            .lock_guard_mut()
1✔
3538
            .await
1✔
3539
            .mempool_insert(transaction_1.clone(), TransactionOrigin::Foreign)
1✔
3540
            .await
1✔
3541
            .unwrap();
1✔
3542
        assert!(
1✔
3543
            !state_lock.lock_guard().await.mempool.is_empty(),
1✔
3544
            "Mempool must be non-empty after insertion"
1✔
3545
        );
1✔
3546

1✔
3547
        // Run the peer loop and verify expected exchange -- namely that the
1✔
3548
        // tx notification is received and the the transaction is *not*
1✔
3549
        // requested.
1✔
3550
        let tx_notification: TransactionNotification = (&transaction_1).try_into().unwrap();
1✔
3551
        let mock = Mock::new(vec![
1✔
3552
            Action::Read(PeerMessage::TransactionNotification(tx_notification)),
1✔
3553
            Action::Read(PeerMessage::Bye),
1✔
3554
        ]);
1✔
3555
        peer_loop_handler
1✔
3556
            .run(mock, from_main_rx_clone, &mut peer_state)
1✔
3557
            .await
1✔
3558
            .unwrap();
1✔
3559

1✔
3560
        // nothing is allowed to be sent to `main_loop`
1✔
3561
        match to_main_rx1.try_recv() {
1✔
3562
            Err(TryRecvError::Empty) => (),
1✔
3563
            Err(TryRecvError::Disconnected) => panic!("to_main channel must still be open"),
1✔
3564
            Ok(_) => panic!("to_main channel must be empty"),
1✔
3565
        };
1✔
3566
        Ok(())
1✔
3567
    }
1✔
3568

3569
    mod block_proposals {
3570
        use super::*;
3571
        use crate::tests::shared::get_dummy_handshake_data_for_genesis;
3572

3573
        struct TestSetup {
3574
            peer_loop_handler: PeerLoopHandler,
3575
            to_main_rx: mpsc::Receiver<PeerTaskToMain>,
3576
            from_main_rx: broadcast::Receiver<MainToPeerTask>,
3577
            peer_state: MutablePeerState,
3578
            to_main_tx: mpsc::Sender<PeerTaskToMain>,
3579
            genesis_block: Block,
3580
            peer_broadcast_tx: broadcast::Sender<MainToPeerTask>,
3581
        }
3582

3583
        async fn genesis_setup(network: Network) -> TestSetup {
2✔
3584
            let (peer_broadcast_tx, from_main_rx, to_main_tx, to_main_rx, alice, _hsd) =
2✔
3585
                get_test_genesis_setup(network, 0, cli_args::Args::default())
2✔
3586
                    .await
2✔
3587
                    .unwrap();
2✔
3588
            let peer_hsd = get_dummy_handshake_data_for_genesis(network);
2✔
3589
            let peer_loop_handler = PeerLoopHandler::new(
2✔
3590
                to_main_tx.clone(),
2✔
3591
                alice.clone(),
2✔
3592
                get_dummy_socket_address(0),
2✔
3593
                peer_hsd.clone(),
2✔
3594
                true,
2✔
3595
                1,
2✔
3596
            );
2✔
3597
            let peer_state = MutablePeerState::new(peer_hsd.tip_header.height);
2✔
3598

2✔
3599
            // (peer_loop_handler, to_main_rx1)
2✔
3600
            TestSetup {
2✔
3601
                peer_broadcast_tx,
2✔
3602
                peer_loop_handler,
2✔
3603
                to_main_rx,
2✔
3604
                from_main_rx,
2✔
3605
                peer_state,
2✔
3606
                to_main_tx,
2✔
3607
                genesis_block: Block::genesis(network),
2✔
3608
            }
2✔
3609
        }
2✔
3610

UNCOV
3611
        #[traced_test]
×
3612
        #[tokio::test]
3613
        async fn accept_block_proposal_height_one() {
1✔
3614
            // Node knows genesis block, receives a block proposal for block 1
1✔
3615
            // and must accept this. Verify that main loop is informed of block
1✔
3616
            // proposal.
1✔
3617
            let TestSetup {
1✔
3618
                peer_broadcast_tx,
1✔
3619
                mut peer_loop_handler,
1✔
3620
                mut to_main_rx,
1✔
3621
                from_main_rx,
1✔
3622
                mut peer_state,
1✔
3623
                to_main_tx,
1✔
3624
                genesis_block,
1✔
3625
            } = genesis_setup(Network::Main).await;
1✔
3626
            let block1 = fake_valid_block_for_tests(
1✔
3627
                &peer_loop_handler.global_state_lock,
1✔
3628
                StdRng::seed_from_u64(5550001).random(),
1✔
3629
            )
1✔
3630
            .await;
1✔
3631

1✔
3632
            let mock = Mock::new(vec![
1✔
3633
                Action::Read(PeerMessage::BlockProposal(Box::new(block1))),
1✔
3634
                Action::Read(PeerMessage::Bye),
1✔
3635
            ]);
1✔
3636
            peer_loop_handler
1✔
3637
                .run(mock, from_main_rx, &mut peer_state)
1✔
3638
                .await
1✔
3639
                .unwrap();
1✔
3640

1✔
3641
            match to_main_rx.try_recv().unwrap() {
1✔
3642
                PeerTaskToMain::BlockProposal(block) => {
1✔
3643
                    assert_eq!(genesis_block.hash(), block.header().prev_block_digest);
1✔
3644
                }
1✔
3645
                _ => panic!("Expected main loop to be informed of block proposal"),
1✔
3646
            };
1✔
3647

1✔
3648
            drop(to_main_tx);
1✔
3649
            drop(peer_broadcast_tx);
1✔
3650
        }
1✔
3651

UNCOV
3652
        #[traced_test]
×
3653
        #[tokio::test]
3654
        async fn accept_block_proposal_notification_height_one() {
1✔
3655
            // Node knows genesis block, receives a block proposal notification
1✔
3656
            // for block 1 and must accept this by requesting the block
1✔
3657
            // proposal from peer.
1✔
3658
            let TestSetup {
1✔
3659
                peer_broadcast_tx,
1✔
3660
                mut peer_loop_handler,
1✔
3661
                to_main_rx: _,
1✔
3662
                from_main_rx,
1✔
3663
                mut peer_state,
1✔
3664
                to_main_tx,
1✔
3665
                ..
1✔
3666
            } = genesis_setup(Network::Main).await;
1✔
3667
            let block1 = fake_valid_block_for_tests(
1✔
3668
                &peer_loop_handler.global_state_lock,
1✔
3669
                StdRng::seed_from_u64(5550001).random(),
1✔
3670
            )
1✔
3671
            .await;
1✔
3672

1✔
3673
            let mock = Mock::new(vec![
1✔
3674
                Action::Read(PeerMessage::BlockProposalNotification((&block1).into())),
1✔
3675
                Action::Write(PeerMessage::BlockProposalRequest(
1✔
3676
                    BlockProposalRequest::new(block1.body().mast_hash()),
1✔
3677
                )),
1✔
3678
                Action::Read(PeerMessage::Bye),
1✔
3679
            ]);
1✔
3680
            peer_loop_handler
1✔
3681
                .run(mock, from_main_rx, &mut peer_state)
1✔
3682
                .await
1✔
3683
                .unwrap();
1✔
3684

1✔
3685
            drop(to_main_tx);
1✔
3686
            drop(peer_broadcast_tx);
1✔
3687
        }
1✔
3688
    }
3689

3690
    mod proof_qualities {
3691
        use strum::IntoEnumIterator;
3692

3693
        use super::*;
3694
        use crate::config_models::cli_args;
3695
        use crate::models::blockchain::transaction::Transaction;
3696
        use crate::models::peer::transfer_transaction::TransactionProofQuality;
3697
        use crate::tests::shared::mock_genesis_global_state;
3698

3699
        async fn tx_of_proof_quality(
2✔
3700
            network: Network,
2✔
3701
            quality: TransactionProofQuality,
2✔
3702
        ) -> Transaction {
2✔
3703
            let wallet_secret = WalletEntropy::devnet_wallet();
2✔
3704
            let alice_key = wallet_secret.nth_generation_spending_key_for_tests(0);
2✔
3705
            let alice =
2✔
3706
                mock_genesis_global_state(network, 1, wallet_secret, cli_args::Args::default())
2✔
3707
                    .await;
2✔
3708
            let alice = alice.lock_guard().await;
2✔
3709
            let genesis_block = alice.chain.light_state();
2✔
3710
            let in_seven_months = genesis_block.header().timestamp + Timestamp::months(7);
2✔
3711
            let prover_capability = match quality {
2✔
3712
                TransactionProofQuality::ProofCollection => TxProvingCapability::ProofCollection,
1✔
3713
                TransactionProofQuality::SingleProof => TxProvingCapability::SingleProof,
1✔
3714
            };
3715
            alice
2✔
3716
                .create_transaction_with_prover_capability(
2✔
3717
                    vec![].into(),
2✔
3718
                    alice_key.into(),
2✔
3719
                    UtxoNotificationMedium::OffChain,
2✔
3720
                    NativeCurrencyAmount::coins(1),
2✔
3721
                    in_seven_months,
2✔
3722
                    prover_capability,
2✔
3723
                    &TritonVmJobQueue::dummy(),
2✔
3724
                )
2✔
3725
                .await
2✔
3726
                .unwrap()
2✔
3727
                .0
2✔
3728
        }
2✔
3729

UNCOV
3730
        #[traced_test]
×
3731
        #[tokio::test]
3732
        async fn client_favors_higher_proof_quality() {
1✔
3733
            // In this scenario the peer is informed of a transaction that it
1✔
3734
            // already knows, and it's tested that it checks the proof quality
1✔
3735
            // field and verifies that it exceeds the proof in the mempool
1✔
3736
            // before requesting the transasction.
1✔
3737
            let network = Network::Main;
1✔
3738
            let proof_collection_tx =
1✔
3739
                tx_of_proof_quality(network, TransactionProofQuality::ProofCollection).await;
1✔
3740
            let single_proof_tx =
1✔
3741
                tx_of_proof_quality(network, TransactionProofQuality::SingleProof).await;
1✔
3742

1✔
3743
            for (own_tx_pq, new_tx_pq) in
4✔
3744
                TransactionProofQuality::iter().cartesian_product(TransactionProofQuality::iter())
1✔
3745
            {
1✔
3746
                use TransactionProofQuality::*;
1✔
3747

1✔
3748
                let (
1✔
3749
                    _peer_broadcast_tx,
4✔
3750
                    from_main_rx_clone,
4✔
3751
                    to_main_tx,
4✔
3752
                    mut to_main_rx1,
4✔
3753
                    mut alice,
4✔
3754
                    handshake_data,
4✔
3755
                ) = get_test_genesis_setup(network, 1, cli_args::Args::default())
4✔
3756
                    .await
4✔
3757
                    .unwrap();
4✔
3758

1✔
3759
                let (own_tx, new_tx) = match (own_tx_pq, new_tx_pq) {
4✔
3760
                    (ProofCollection, ProofCollection) => {
1✔
3761
                        (&proof_collection_tx, &proof_collection_tx)
1✔
3762
                    }
1✔
3763
                    (ProofCollection, SingleProof) => (&proof_collection_tx, &single_proof_tx),
1✔
3764
                    (SingleProof, ProofCollection) => (&single_proof_tx, &proof_collection_tx),
1✔
3765
                    (SingleProof, SingleProof) => (&single_proof_tx, &single_proof_tx),
1✔
3766
                };
1✔
3767

1✔
3768
                alice
4✔
3769
                    .lock_guard_mut()
4✔
3770
                    .await
4✔
3771
                    .mempool_insert(own_tx.to_owned(), TransactionOrigin::Foreign)
4✔
3772
                    .await
4✔
3773
                    .unwrap();
4✔
3774

4✔
3775
                let tx_notification: TransactionNotification = new_tx.try_into().unwrap();
4✔
3776

4✔
3777
                let own_proof_is_supreme = own_tx_pq >= new_tx_pq;
4✔
3778
                let mock = if own_proof_is_supreme {
4✔
3779
                    Mock::new(vec![
3✔
3780
                        Action::Read(PeerMessage::TransactionNotification(tx_notification)),
3✔
3781
                        Action::Read(PeerMessage::Bye),
3✔
3782
                    ])
3✔
3783
                } else {
1✔
3784
                    Mock::new(vec![
1✔
3785
                        Action::Read(PeerMessage::TransactionNotification(tx_notification)),
1✔
3786
                        Action::Write(PeerMessage::TransactionRequest(tx_notification.txid)),
1✔
3787
                        Action::Read(PeerMessage::Transaction(Box::new(
1✔
3788
                            new_tx.try_into().unwrap(),
1✔
3789
                        ))),
1✔
3790
                        Action::Read(PeerMessage::Bye),
1✔
3791
                    ])
1✔
3792
                };
1✔
3793

1✔
3794
                let now = proof_collection_tx.kernel.timestamp;
4✔
3795
                let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
4✔
3796
                    to_main_tx,
4✔
3797
                    alice.clone(),
4✔
3798
                    get_dummy_socket_address(0),
4✔
3799
                    handshake_data.clone(),
4✔
3800
                    true,
4✔
3801
                    1,
4✔
3802
                    now,
4✔
3803
                );
4✔
3804
                let mut peer_state = MutablePeerState::new(handshake_data.tip_header.height);
4✔
3805

4✔
3806
                peer_loop_handler
4✔
3807
                    .run(mock, from_main_rx_clone, &mut peer_state)
4✔
3808
                    .await
4✔
3809
                    .unwrap();
4✔
3810

4✔
3811
                if own_proof_is_supreme {
4✔
3812
                    match to_main_rx1.try_recv() {
3✔
3813
                        Err(TryRecvError::Empty) => (),
3✔
3814
                        Err(TryRecvError::Disconnected) => {
1✔
3815
                            panic!("to_main channel must still be open")
1✔
3816
                        }
1✔
3817
                        Ok(_) => panic!("to_main channel must be empty"),
1✔
3818
                    }
1✔
3819
                } else {
1✔
3820
                    match to_main_rx1.try_recv() {
1✔
3821
                        Err(TryRecvError::Empty) => panic!("Transaction must be sent to main loop"),
1✔
3822
                        Err(TryRecvError::Disconnected) => {
1✔
3823
                            panic!("to_main channel must still be open")
1✔
3824
                        }
1✔
3825
                        Ok(PeerTaskToMain::Transaction(_)) => (),
1✔
3826
                        _ => panic!("Unexpected result from channel"),
1✔
3827
                    }
1✔
3828
                }
1✔
3829
            }
1✔
3830
        }
1✔
3831
    }
3832

3833
    mod sync_challenges {
3834
        use super::*;
3835
        use crate::tests::shared::fake_valid_sequence_of_blocks_for_tests_dyn;
3836

UNCOV
3837
        #[traced_test]
×
3838
        #[tokio::test]
3839
        async fn bad_sync_challenge_height_greater_than_tip() {
1✔
3840
            // Criterium: Challenge height may not exceed that of tip in the
1✔
3841
            // request.
1✔
3842

1✔
3843
            let network = Network::Main;
1✔
3844
            let (
1✔
3845
                _alice_main_to_peer_tx,
1✔
3846
                alice_main_to_peer_rx,
1✔
3847
                alice_peer_to_main_tx,
1✔
3848
                alice_peer_to_main_rx,
1✔
3849
                mut alice,
1✔
3850
                alice_hsd,
1✔
3851
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default())
1✔
3852
                .await
1✔
3853
                .unwrap();
1✔
3854
            let genesis_block: Block = Block::genesis(network);
1✔
3855
            let blocks: [Block; 11] = fake_valid_sequence_of_blocks_for_tests(
1✔
3856
                &genesis_block,
1✔
3857
                Timestamp::hours(1),
1✔
3858
                [0u8; 32],
1✔
3859
            )
1✔
3860
            .await;
1✔
3861
            for block in &blocks {
12✔
3862
                alice.set_new_tip(block.clone()).await.unwrap();
11✔
3863
            }
1✔
3864

1✔
3865
            let bh12 = blocks.last().unwrap().header().height;
1✔
3866
            let sync_challenge = SyncChallenge {
1✔
3867
                tip_digest: blocks[9].hash(),
1✔
3868
                challenges: [bh12; 10],
1✔
3869
            };
1✔
3870
            let alice_p2p_messages = Mock::new(vec![
1✔
3871
                Action::Read(PeerMessage::SyncChallenge(sync_challenge)),
1✔
3872
                Action::Read(PeerMessage::Bye),
1✔
3873
            ]);
1✔
3874

1✔
3875
            let peer_address = get_dummy_socket_address(0);
1✔
3876
            let mut alice_peer_loop_handler = PeerLoopHandler::new(
1✔
3877
                alice_peer_to_main_tx.clone(),
1✔
3878
                alice.clone(),
1✔
3879
                peer_address,
1✔
3880
                alice_hsd,
1✔
3881
                false,
1✔
3882
                1,
1✔
3883
            );
1✔
3884
            alice_peer_loop_handler
1✔
3885
                .run_wrapper(alice_p2p_messages, alice_main_to_peer_rx)
1✔
3886
                .await
1✔
3887
                .unwrap();
1✔
3888

1✔
3889
            drop(alice_peer_to_main_rx);
1✔
3890

1✔
3891
            let latest_sanction = alice
1✔
3892
                .lock_guard()
1✔
3893
                .await
1✔
3894
                .net
1✔
3895
                .get_peer_standing_from_database(peer_address.ip())
1✔
3896
                .await
1✔
3897
                .unwrap();
1✔
3898
            assert_eq!(
1✔
3899
                NegativePeerSanction::InvalidSyncChallenge,
1✔
3900
                latest_sanction
1✔
3901
                    .latest_punishment
1✔
3902
                    .expect("peer must be sanctioned")
1✔
3903
                    .0
1✔
3904
            );
1✔
3905
        }
1✔
3906

UNCOV
3907
        #[traced_test]
×
3908
        #[tokio::test]
3909
        async fn bad_sync_challenge_genesis_block_doesnt_crash_client() {
1✔
3910
            // Criterium: Challenge may not point to genesis block, or block 1, as
1✔
3911
            // tip.
1✔
3912

1✔
3913
            let network = Network::Main;
1✔
3914
            let genesis_block: Block = Block::genesis(network);
1✔
3915

1✔
3916
            let alice_cli = cli_args::Args::default();
1✔
3917
            let (
1✔
3918
                _alice_main_to_peer_tx,
1✔
3919
                alice_main_to_peer_rx,
1✔
3920
                alice_peer_to_main_tx,
1✔
3921
                alice_peer_to_main_rx,
1✔
3922
                alice,
1✔
3923
                alice_hsd,
1✔
3924
            ) = get_test_genesis_setup(network, 0, alice_cli).await.unwrap();
1✔
3925

1✔
3926
            let sync_challenge = SyncChallenge {
1✔
3927
                tip_digest: genesis_block.hash(),
1✔
3928
                challenges: [BlockHeight::genesis(); 10],
1✔
3929
            };
1✔
3930

1✔
3931
            let alice_p2p_messages = Mock::new(vec![
1✔
3932
                Action::Read(PeerMessage::SyncChallenge(sync_challenge)),
1✔
3933
                Action::Read(PeerMessage::Bye),
1✔
3934
            ]);
1✔
3935

1✔
3936
            let peer_address = get_dummy_socket_address(0);
1✔
3937
            let mut alice_peer_loop_handler = PeerLoopHandler::new(
1✔
3938
                alice_peer_to_main_tx.clone(),
1✔
3939
                alice.clone(),
1✔
3940
                peer_address,
1✔
3941
                alice_hsd,
1✔
3942
                false,
1✔
3943
                1,
1✔
3944
            );
1✔
3945
            alice_peer_loop_handler
1✔
3946
                .run_wrapper(alice_p2p_messages, alice_main_to_peer_rx)
1✔
3947
                .await
1✔
3948
                .unwrap();
1✔
3949

1✔
3950
            drop(alice_peer_to_main_rx);
1✔
3951

1✔
3952
            let latest_sanction = alice
1✔
3953
                .lock_guard()
1✔
3954
                .await
1✔
3955
                .net
1✔
3956
                .get_peer_standing_from_database(peer_address.ip())
1✔
3957
                .await
1✔
3958
                .unwrap();
1✔
3959
            assert_eq!(
1✔
3960
                NegativePeerSanction::InvalidSyncChallenge,
1✔
3961
                latest_sanction
1✔
3962
                    .latest_punishment
1✔
3963
                    .expect("peer must be sanctioned")
1✔
3964
                    .0
1✔
3965
            );
1✔
3966
        }
1✔
3967

UNCOV
3968
        #[traced_test]
×
3969
        #[tokio::test]
3970
        async fn sync_challenge_happy_path() -> Result<()> {
1✔
3971
            // Bob notifies Alice of a block whose parameters satisfy the sync mode
1✔
3972
            // criterion. Alice issues a challenge. Bob responds. Alice enters into
1✔
3973
            // sync mode.
1✔
3974

1✔
3975
            let mut rng = rand::rng();
1✔
3976
            let network = Network::Main;
1✔
3977
            let genesis_block: Block = Block::genesis(network);
1✔
3978

1✔
3979
            const ALICE_SYNC_MODE_THRESHOLD: usize = 10;
1✔
3980
            let alice_cli = cli_args::Args {
1✔
3981
                sync_mode_threshold: ALICE_SYNC_MODE_THRESHOLD,
1✔
3982
                ..Default::default()
1✔
3983
            };
1✔
3984
            let (
1✔
3985
                _alice_main_to_peer_tx,
1✔
3986
                alice_main_to_peer_rx,
1✔
3987
                alice_peer_to_main_tx,
1✔
3988
                mut alice_peer_to_main_rx,
1✔
3989
                mut alice,
1✔
3990
                alice_hsd,
1✔
3991
            ) = get_test_genesis_setup(network, 0, alice_cli).await?;
1✔
3992
            let _alice_socket_address = get_dummy_socket_address(0);
1✔
3993

1✔
3994
            let (
1✔
3995
                _bob_main_to_peer_tx,
1✔
3996
                _bob_main_to_peer_rx,
1✔
3997
                _bob_peer_to_main_tx,
1✔
3998
                _bob_peer_to_main_rx,
1✔
3999
                mut bob,
1✔
4000
                _bob_hsd,
1✔
4001
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
4002
            let bob_socket_address = get_dummy_socket_address(0);
1✔
4003

1✔
4004
            let now = genesis_block.header().timestamp + Timestamp::hours(1);
1✔
4005
            let block_1 = fake_valid_block_for_tests(&alice, rng.random()).await;
1✔
4006
            assert!(
1✔
4007
                block_1.is_valid(&genesis_block, now).await,
1✔
4008
                "Block must be valid for this test to make sense"
1✔
4009
            );
1✔
4010
            let alice_tip = &block_1;
1✔
4011
            alice.set_new_tip(block_1.clone()).await?;
1✔
4012
            bob.set_new_tip(block_1.clone()).await?;
1✔
4013

1✔
4014
            // produce enough blocks to ensure alice needs to go into sync mode
1✔
4015
            // with this block notification.
1✔
4016
            let blocks = fake_valid_sequence_of_blocks_for_tests_dyn(
1✔
4017
                &block_1,
1✔
4018
                TARGET_BLOCK_INTERVAL,
1✔
4019
                rng.random(),
1✔
4020
                rng.random_range(ALICE_SYNC_MODE_THRESHOLD + 1..20),
1✔
4021
            )
1✔
4022
            .await;
1✔
4023
            for block in &blocks {
12✔
4024
                bob.set_new_tip(block.clone()).await?;
11✔
4025
            }
1✔
4026
            let bob_tip = blocks.last().unwrap();
1✔
4027

1✔
4028
            let block_notification_from_bob = PeerBlockNotification {
1✔
4029
                hash: bob_tip.hash(),
1✔
4030
                height: bob_tip.header().height,
1✔
4031
                cumulative_proof_of_work: bob_tip.header().cumulative_proof_of_work,
1✔
4032
            };
1✔
4033

1✔
4034
            let alice_rng_seed = rng.random::<[u8; 32]>();
1✔
4035
            let mut alice_rng_clone = StdRng::from_seed(alice_rng_seed);
1✔
4036
            let sync_challenge_from_alice = SyncChallenge::generate(
1✔
4037
                &block_notification_from_bob,
1✔
4038
                alice_tip.header().height,
1✔
4039
                alice_rng_clone.random(),
1✔
4040
            );
1✔
4041

1✔
4042
            println!(
1✔
4043
                "sync challenge from alice:\n{:?}",
1✔
4044
                sync_challenge_from_alice
1✔
4045
            );
1✔
4046

1✔
4047
            let sync_challenge_response_from_bob = bob
1✔
4048
                .lock_guard()
1✔
4049
                .await
1✔
4050
                .response_to_sync_challenge(sync_challenge_from_alice)
1✔
4051
                .await
1✔
4052
                .expect("should be able to respond to sync challenge");
1✔
4053

1✔
4054
            let alice_p2p_messages = Mock::new(vec![
1✔
4055
                Action::Read(PeerMessage::BlockNotification(block_notification_from_bob)),
1✔
4056
                Action::Write(PeerMessage::SyncChallenge(sync_challenge_from_alice)),
1✔
4057
                Action::Read(PeerMessage::SyncChallengeResponse(Box::new(
1✔
4058
                    sync_challenge_response_from_bob,
1✔
4059
                ))),
1✔
4060
                Action::Read(PeerMessage::BlockNotification(block_notification_from_bob)),
1✔
4061
                // Ensure no 2nd sync challenge is sent, as timeout has not yet passed.
1✔
4062
                // The absence of a Write here checks that a 2nd challenge isn't sent
1✔
4063
                // when a successful was just received.
1✔
4064
                Action::Read(PeerMessage::Bye),
1✔
4065
            ]);
1✔
4066

1✔
4067
            let mut alice_peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
4068
                alice_peer_to_main_tx.clone(),
1✔
4069
                alice.clone(),
1✔
4070
                bob_socket_address,
1✔
4071
                alice_hsd,
1✔
4072
                false,
1✔
4073
                1,
1✔
4074
                bob_tip.header().timestamp,
1✔
4075
            );
1✔
4076
            alice_peer_loop_handler.set_rng(StdRng::from_seed(alice_rng_seed));
1✔
4077
            alice_peer_loop_handler
1✔
4078
                .run_wrapper(alice_p2p_messages, alice_main_to_peer_rx)
1✔
4079
                .await?;
1✔
4080

1✔
4081
            // AddPeerMaxBlockHeight message triggered *after* sync challenge
1✔
4082
            let mut expected_anchor_mmra = bob_tip.body().block_mmr_accumulator.clone();
1✔
4083
            expected_anchor_mmra.append(bob_tip.hash());
1✔
4084
            let expected_message_from_alice_peer_loop = PeerTaskToMain::AddPeerMaxBlockHeight {
1✔
4085
                peer_address: bob_socket_address,
1✔
4086
                claimed_height: bob_tip.header().height,
1✔
4087
                claimed_cumulative_pow: bob_tip.header().cumulative_proof_of_work,
1✔
4088
                claimed_block_mmra: expected_anchor_mmra,
1✔
4089
            };
1✔
4090
            let observed_message_from_alice_peer_loop = alice_peer_to_main_rx.recv().await.unwrap();
1✔
4091
            assert_eq!(
1✔
4092
                expected_message_from_alice_peer_loop,
1✔
4093
                observed_message_from_alice_peer_loop
1✔
4094
            );
1✔
4095

1✔
4096
            Ok(())
1✔
4097
        }
1✔
4098
    }
4099
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc