• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Neptune-Crypto / neptune-core / 14034884859

24 Mar 2025 12:09PM UTC coverage: 84.255% (-0.004%) from 84.259%
14034884859

push

github

web-flow
style: Report guesser fee of rejected blocks

When rejecting a block proposal notification, an info message is logged showing
the reason why it was rejected. This commit changes that info message to include
the rejected proposal's guesser fee.

0 of 5 new or added lines in 1 file covered. (0.0%)

3 existing lines in 1 file now uncovered.

50762 of 60248 relevant lines covered (84.26%)

176184.3 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.4
/src/peer_loop.rs
1
use std::cmp;
2
use std::marker::Unpin;
3
use std::net::SocketAddr;
4
use std::time::SystemTime;
5

6
use anyhow::bail;
7
use anyhow::Result;
8
use chrono::DateTime;
9
use chrono::Utc;
10
use futures::sink::Sink;
11
use futures::sink::SinkExt;
12
use futures::stream::TryStream;
13
use futures::stream::TryStreamExt;
14
use itertools::Itertools;
15
use rand::rngs::StdRng;
16
use rand::Rng;
17
use rand::SeedableRng;
18
use tasm_lib::triton_vm::prelude::Digest;
19
use tasm_lib::twenty_first::prelude::Mmr;
20
use tasm_lib::twenty_first::prelude::MmrMembershipProof;
21
use tasm_lib::twenty_first::util_types::mmr::mmr_accumulator::MmrAccumulator;
22
use tokio::select;
23
use tokio::sync::broadcast;
24
use tokio::sync::mpsc;
25
use tracing::debug;
26
use tracing::error;
27
use tracing::info;
28
use tracing::warn;
29

30
use crate::connect_to_peers::close_peer_connected_callback;
31
use crate::macros::fn_name;
32
use crate::macros::log_slow_scope;
33
use crate::main_loop::MAX_NUM_DIGESTS_IN_BATCH_REQUEST;
34
use crate::models::blockchain::block::block_height::BlockHeight;
35
use crate::models::blockchain::block::mutator_set_update::MutatorSetUpdate;
36
use crate::models::blockchain::block::Block;
37
use crate::models::blockchain::transaction::transaction_kernel::TransactionConfirmabilityError;
38
use crate::models::blockchain::transaction::Transaction;
39
use crate::models::channel::MainToPeerTask;
40
use crate::models::channel::PeerTaskToMain;
41
use crate::models::channel::PeerTaskToMainTransaction;
42
use crate::models::peer::handshake_data::HandshakeData;
43
use crate::models::peer::peer_info::PeerConnectionInfo;
44
use crate::models::peer::peer_info::PeerInfo;
45
use crate::models::peer::transfer_block::TransferBlock;
46
use crate::models::peer::BlockProposalRequest;
47
use crate::models::peer::BlockRequestBatch;
48
use crate::models::peer::IssuedSyncChallenge;
49
use crate::models::peer::MutablePeerState;
50
use crate::models::peer::NegativePeerSanction;
51
use crate::models::peer::PeerMessage;
52
use crate::models::peer::PeerSanction;
53
use crate::models::peer::PeerStanding;
54
use crate::models::peer::PositivePeerSanction;
55
use crate::models::peer::SyncChallenge;
56
use crate::models::proof_abstractions::mast_hash::MastHash;
57
use crate::models::proof_abstractions::timestamp::Timestamp;
58
use crate::models::state::block_proposal::BlockProposalRejectError;
59
use crate::models::state::mempool::MEMPOOL_IGNORE_TRANSACTIONS_THIS_MANY_SECS_AHEAD;
60
use crate::models::state::mempool::MEMPOOL_TX_THRESHOLD_AGE_IN_SECS;
61
use crate::models::state::GlobalState;
62
use crate::models::state::GlobalStateLock;
63
use crate::util_types::mutator_set::removal_record::RemovalRecordValidityError;
64

65
const STANDARD_BLOCK_BATCH_SIZE: usize = 250;
66
const MAX_PEER_LIST_LENGTH: usize = 10;
67
const MINIMUM_BLOCK_BATCH_SIZE: usize = 2;
68

69
const KEEP_CONNECTION_ALIVE: bool = false;
70
const DISCONNECT_CONNECTION: bool = true;
71

72
pub type PeerStandingNumber = i32;
73

74
/// Handles messages from peers via TCP
75
///
76
/// also handles messages from main task over the main-to-peer-tasks broadcast
77
/// channel.
78
#[derive(Debug, Clone)]
79
pub struct PeerLoopHandler {
80
    to_main_tx: mpsc::Sender<PeerTaskToMain>,
81
    global_state_lock: GlobalStateLock,
82
    peer_address: SocketAddr,
83
    peer_handshake_data: HandshakeData,
84
    inbound_connection: bool,
85
    distance: u8,
86
    rng: StdRng,
87
    #[cfg(test)]
88
    mock_now: Option<Timestamp>,
89
}
90

91
impl PeerLoopHandler {
92
    pub(crate) fn new(
20✔
93
        to_main_tx: mpsc::Sender<PeerTaskToMain>,
20✔
94
        global_state_lock: GlobalStateLock,
20✔
95
        peer_address: SocketAddr,
20✔
96
        peer_handshake_data: HandshakeData,
20✔
97
        inbound_connection: bool,
20✔
98
        distance: u8,
20✔
99
    ) -> Self {
20✔
100
        Self {
20✔
101
            to_main_tx,
20✔
102
            global_state_lock,
20✔
103
            peer_address,
20✔
104
            peer_handshake_data,
20✔
105
            inbound_connection,
20✔
106
            distance,
20✔
107
            rng: StdRng::from_rng(&mut rand::rng()),
20✔
108
            #[cfg(test)]
20✔
109
            mock_now: None,
20✔
110
        }
20✔
111
    }
20✔
112

113
    /// Allows for mocked timestamps such that time dependencies may be tested.
114
    #[cfg(test)]
115
    pub(crate) fn with_mocked_time(
20✔
116
        to_main_tx: mpsc::Sender<PeerTaskToMain>,
20✔
117
        global_state_lock: GlobalStateLock,
20✔
118
        peer_address: SocketAddr,
20✔
119
        peer_handshake_data: HandshakeData,
20✔
120
        inbound_connection: bool,
20✔
121
        distance: u8,
20✔
122
        mocked_time: Timestamp,
20✔
123
    ) -> Self {
20✔
124
        Self {
20✔
125
            to_main_tx,
20✔
126
            global_state_lock,
20✔
127
            peer_address,
20✔
128
            peer_handshake_data,
20✔
129
            inbound_connection,
20✔
130
            distance,
20✔
131
            mock_now: Some(mocked_time),
20✔
132
            rng: StdRng::from_rng(&mut rand::rng()),
20✔
133
        }
20✔
134
    }
20✔
135

136
    /// Overwrite the random number generator object with a specific one.
137
    ///
138
    /// Useful for derandomizing tests.
139
    #[cfg(test)]
140
    fn set_rng(&mut self, rng: StdRng) {
1✔
141
        self.rng = rng;
1✔
142
    }
1✔
143

144
    fn now(&self) -> Timestamp {
27✔
145
        #[cfg(not(test))]
27✔
146
        {
27✔
147
            Timestamp::now()
27✔
148
        }
27✔
149
        #[cfg(test)]
27✔
150
        {
27✔
151
            self.mock_now.unwrap_or(Timestamp::now())
27✔
152
        }
27✔
153
    }
27✔
154

155
    /// Punish a peer for bad behavior.
156
    ///
157
    /// Return `Err` if the peer in question is (now) banned.
158
    ///
159
    /// # Locking:
160
    ///   * acquires `global_state_lock` for write
161
    async fn punish(&mut self, reason: NegativePeerSanction) -> Result<()> {
6✔
162
        let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
6✔
163
        warn!("Punishing peer {} for {:?}", self.peer_address.ip(), reason);
6✔
164
        debug!(
6✔
165
            "Peer standing before punishment is {}",
×
166
            global_state_mut
×
167
                .net
×
168
                .peer_map
×
169
                .get(&self.peer_address)
×
170
                .unwrap()
×
171
                .standing
172
        );
173

174
        let Some(peer_info) = global_state_mut.net.peer_map.get_mut(&self.peer_address) else {
6✔
175
            bail!("Could not read peer map.");
×
176
        };
177
        let sanction_result = peer_info.standing.sanction(PeerSanction::Negative(reason));
6✔
178
        if let Err(err) = sanction_result {
6✔
179
            warn!("Banning peer: {err}");
1✔
180
        }
5✔
181

182
        sanction_result.map_err(|err| anyhow::anyhow!("Banning peer: {err}"))
6✔
183
    }
6✔
184

185
    /// Reward a peer for good behavior.
186
    ///
187
    /// Return `Err` if the peer in question is banned.
188
    ///
189
    /// # Locking:
190
    ///   * acquires `global_state_lock` for write
191
    async fn reward(&mut self, reason: PositivePeerSanction) -> Result<()> {
7✔
192
        let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
7✔
193
        info!("Rewarding peer {} for {:?}", self.peer_address.ip(), reason);
7✔
194
        let Some(peer_info) = global_state_mut.net.peer_map.get_mut(&self.peer_address) else {
7✔
195
            error!("Could not read peer map.");
1✔
196
            return Ok(());
1✔
197
        };
198
        let sanction_result = peer_info.standing.sanction(PeerSanction::Positive(reason));
6✔
199
        if sanction_result.is_err() {
6✔
200
            error!("Cannot reward banned peer");
×
201
        }
6✔
202

203
        sanction_result.map_err(|err| anyhow::anyhow!("Cannot reward banned peer: {err}"))
6✔
204
    }
7✔
205

206
    /// Construct a batch response, with blocks and their MMR membership proofs
207
    /// relative to a specified anchor.
208
    ///
209
    /// Returns `None` if the anchor has a lower leaf count than the blocks, or
210
    /// a block height of the response exceeds own tip height.
211
    async fn batch_response(
16✔
212
        state: &GlobalState,
16✔
213
        blocks: Vec<Block>,
16✔
214
        anchor: &MmrAccumulator,
16✔
215
    ) -> Option<Vec<(TransferBlock, MmrMembershipProof)>> {
16✔
216
        let own_tip_height = state.chain.light_state().header().height;
16✔
217
        let block_heights_match_anchor = blocks
16✔
218
            .iter()
16✔
219
            .all(|bl| bl.header().height < anchor.num_leafs().into());
46✔
220
        let block_heights_known = blocks.iter().all(|bl| bl.header().height <= own_tip_height);
46✔
221
        if !block_heights_match_anchor || !block_heights_known {
16✔
222
            let max_block_height = match blocks.iter().map(|bl| bl.header().height).max() {
×
223
                Some(height) => height.to_string(),
×
224
                None => "None".to_owned(),
×
225
            };
226

227
            debug!("max_block_height: {max_block_height}");
×
228
            debug!("own_tip_height: {own_tip_height}");
×
229
            debug!("anchor.num_leafs(): {}", anchor.num_leafs());
×
230
            debug!("block_heights_match_anchor: {block_heights_match_anchor}");
×
231
            debug!("block_heights_known: {block_heights_known}");
×
232
            return None;
×
233
        }
16✔
234

16✔
235
        let mut ret = vec![];
16✔
236
        for block in blocks {
62✔
237
            let mmr_mp = state
46✔
238
                .chain
46✔
239
                .archival_state()
46✔
240
                .archival_block_mmr
46✔
241
                .ammr()
46✔
242
                .prove_membership_relative_to_smaller_mmr(
46✔
243
                    block.header().height.into(),
46✔
244
                    anchor.num_leafs(),
46✔
245
                )
46✔
246
                .await;
46✔
247
            let block: TransferBlock = block.try_into().unwrap();
46✔
248
            ret.push((block, mmr_mp));
46✔
249
        }
250

251
        Some(ret)
16✔
252
    }
16✔
253

254
    /// Handle validation and send all blocks to the main task if they're all
255
    /// valid. Use with a list of blocks or a single block. When the
256
    /// `received_blocks` is a list, the parent of the `i+1`th block in the
257
    /// list is the `i`th block. The parent of element zero in this list is
258
    /// `parent_of_first_block`.
259
    ///
260
    /// # Return Value
261
    ///  - `Err` when the connection should be closed;
262
    ///  - `Ok(None)` if some block is invalid
263
    ///  - `Ok(None)` if the last block has insufficient cumulative PoW and we
264
    ///    are not syncing;
265
    ///  - `Ok(None)` if the last block has insufficient height and we are
266
    ///    syncing;
267
    ///  - `Ok(Some(block_height))` otherwise, referring to the block with the
268
    ///    highest height in the batch.
269
    ///
270
    /// A return value of Ok(Some(_)) means that the message was passed on to
271
    /// main loop.
272
    ///
273
    /// # Locking
274
    ///   * Acquires `global_state_lock` for write via `self.punish(..)` and
275
    ///     `self.reward(..)`.
276
    ///
277
    /// # Panics
278
    ///
279
    ///  - Panics if called with the empty list.
280
    async fn handle_blocks(
8✔
281
        &mut self,
8✔
282
        received_blocks: Vec<Block>,
8✔
283
        parent_of_first_block: Block,
8✔
284
    ) -> Result<Option<BlockHeight>> {
8✔
285
        debug!(
8✔
286
            "attempting to validate {} {}",
×
287
            received_blocks.len(),
×
288
            if received_blocks.len() == 1 {
×
289
                "block"
×
290
            } else {
291
                "blocks"
×
292
            }
293
        );
294
        let now = self.now();
8✔
295
        debug!("validating with respect to current timestamp {now}");
8✔
296
        let mut previous_block = &parent_of_first_block;
8✔
297
        for new_block in &received_blocks {
24✔
298
            let new_block_has_proof_of_work = new_block.has_proof_of_work(previous_block.header());
17✔
299
            debug!("new block has proof of work? {new_block_has_proof_of_work}");
17✔
300
            let new_block_is_valid = new_block.is_valid(previous_block, now).await;
17✔
301
            debug!("new block is valid? {new_block_is_valid}");
17✔
302
            if !new_block_has_proof_of_work {
17✔
303
                warn!(
1✔
304
                    "Received invalid proof-of-work for block of height {} from peer with IP {}",
×
305
                    new_block.kernel.header.height, self.peer_address
×
306
                );
307
                warn!("Difficulty is {}.", previous_block.kernel.header.difficulty);
1✔
308
                warn!(
1✔
309
                    "Proof of work should be {} (or more) but was [{}].",
×
310
                    previous_block.kernel.header.difficulty.target(),
×
311
                    new_block.hash().values().iter().join(", ")
×
312
                );
313
                self.punish(NegativePeerSanction::InvalidBlock((
1✔
314
                    new_block.kernel.header.height,
1✔
315
                    new_block.hash(),
1✔
316
                )))
1✔
317
                .await?;
1✔
318
                warn!("Failed to validate block due to insufficient PoW");
1✔
319
                return Ok(None);
1✔
320
            } else if !new_block_is_valid {
16✔
321
                warn!(
×
322
                    "Received invalid block of height {} from peer with IP {}",
×
323
                    new_block.kernel.header.height, self.peer_address
×
324
                );
325
                self.punish(NegativePeerSanction::InvalidBlock((
×
326
                    new_block.kernel.header.height,
×
327
                    new_block.hash(),
×
328
                )))
×
329
                .await?;
×
330
                warn!("Failed to validate block: invalid block");
×
331
                return Ok(None);
×
332
            }
16✔
333
            info!(
16✔
334
                "Block with height {} is valid. mined: {}",
×
335
                new_block.kernel.header.height,
×
336
                new_block.kernel.header.timestamp.standard_format()
×
337
            );
338

339
            previous_block = new_block;
16✔
340
        }
341

342
        // evaluate the fork choice rule
343
        debug!("Checking last block's canonicity ...");
7✔
344
        let last_block = received_blocks.last().unwrap();
7✔
345
        let is_canonical = self
7✔
346
            .global_state_lock
7✔
347
            .lock_guard()
7✔
348
            .await
7✔
349
            .incoming_block_is_more_canonical(last_block);
7✔
350
        let last_block_height = last_block.header().height;
7✔
351
        let sync_mode_active_and_have_new_champion = self
7✔
352
            .global_state_lock
7✔
353
            .lock_guard()
7✔
354
            .await
7✔
355
            .net
356
            .sync_anchor
357
            .as_ref()
7✔
358
            .is_some_and(|x| {
7✔
359
                x.champion
×
360
                    .is_none_or(|(height, _)| height < last_block_height)
×
361
            });
7✔
362
        if !is_canonical && !sync_mode_active_and_have_new_champion {
7✔
363
            warn!(
1✔
364
                "Received {} blocks from peer but incoming blocks are less \
×
365
            canonical than current tip, or current sync-champion.",
×
366
                received_blocks.len()
×
367
            );
368
            return Ok(None);
1✔
369
        }
6✔
370

6✔
371
        // Send the new blocks to the main task which handles the state update
6✔
372
        // and storage to the database.
6✔
373
        let number_of_received_blocks = received_blocks.len();
6✔
374
        self.to_main_tx
6✔
375
            .send(PeerTaskToMain::NewBlocks(received_blocks))
6✔
376
            .await?;
6✔
377
        info!(
6✔
378
            "Updated block info by block from peer. block height {}",
×
379
            last_block_height
380
        );
381

382
        // Valuable, new, hard-to-produce information. Reward peer.
383
        self.reward(PositivePeerSanction::ValidBlocks(number_of_received_blocks))
6✔
384
            .await?;
6✔
385

386
        Ok(Some(last_block_height))
6✔
387
    }
8✔
388

389
    /// Take a single block received from a peer and (attempt to) find a path
390
    /// between the received block and a common ancestor stored in the blocks
391
    /// database.
392
    ///
393
    /// This function attempts to find the parent of the received block, either
394
    /// by searching the database or by requesting it from a peer.
395
    ///  - If the parent is not stored, it is requested from the peer and the
396
    ///    received block is pushed to the fork reconciliation list for later
397
    ///    handling by this function. The fork reconciliation list starts out
398
    ///    empty, but grows as more parents are requested and transmitted.
399
    ///  - If the parent is found in the database, a) block handling continues:
400
    ///    the entire list of fork reconciliation blocks are passed down the
401
    ///    pipeline, potentially leading to a state update; and b) the fork
402
    ///    reconciliation list is cleared.
403
    ///
404
    /// Locking:
405
    ///   * Acquires `global_state_lock` for write via `self.punish(..)` and
406
    ///     `self.reward(..)`.
407
    async fn try_ensure_path<S>(
20✔
408
        &mut self,
20✔
409
        received_block: Box<Block>,
20✔
410
        peer: &mut S,
20✔
411
        peer_state: &mut MutablePeerState,
20✔
412
    ) -> Result<()>
20✔
413
    where
20✔
414
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
20✔
415
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
20✔
416
        <S as TryStream>::Error: std::error::Error,
20✔
417
    {
20✔
418
        // Does the received block match the fork reconciliation list?
419
        let received_block_matches_fork_reconciliation_list = if let Some(successor) =
20✔
420
            peer_state.fork_reconciliation_blocks.last()
20✔
421
        {
422
            let valid = successor
10✔
423
                .is_valid(received_block.as_ref(), self.now())
10✔
424
                .await;
10✔
425
            if !valid {
10✔
426
                warn!(
×
427
                        "Fork reconciliation failed after receiving {} blocks: successor of received block is invalid",
×
428
                        peer_state.fork_reconciliation_blocks.len() + 1
×
429
                    );
430
            }
10✔
431
            valid
10✔
432
        } else {
433
            true
10✔
434
        };
435

436
        // Are we running out of RAM?
437
        let too_many_blocks = peer_state.fork_reconciliation_blocks.len() + 1
20✔
438
            >= self.global_state_lock.cli().sync_mode_threshold;
20✔
439
        if too_many_blocks {
20✔
440
            warn!(
1✔
441
                "Fork reconciliation failed after receiving {} blocks: block count exceeds sync mode threshold",
×
442
                peer_state.fork_reconciliation_blocks.len() + 1
×
443
            );
444
        }
19✔
445

446
        // Block mismatch or too many blocks: abort!
447
        if !received_block_matches_fork_reconciliation_list || too_many_blocks {
20✔
448
            self.punish(NegativePeerSanction::ForkResolutionError((
1✔
449
                received_block.header().height,
1✔
450
                peer_state.fork_reconciliation_blocks.len() as u16,
1✔
451
                received_block.hash(),
1✔
452
            )))
1✔
453
            .await?;
1✔
454
            peer_state.fork_reconciliation_blocks = vec![];
1✔
455
            return Ok(());
1✔
456
        }
19✔
457

19✔
458
        // otherwise, append
19✔
459
        peer_state.fork_reconciliation_blocks.push(*received_block);
19✔
460

19✔
461
        // Try fetch parent
19✔
462
        let received_block_header = *peer_state
19✔
463
            .fork_reconciliation_blocks
19✔
464
            .last()
19✔
465
            .unwrap()
19✔
466
            .header();
19✔
467

19✔
468
        let parent_digest = received_block_header.prev_block_digest;
19✔
469
        let parent_height = received_block_header.height.previous()
19✔
470
            .expect("transferred block must have previous height because genesis block cannot be transferred");
19✔
471
        debug!("Try ensure path: fetching parent block");
19✔
472
        let parent_block = self
19✔
473
            .global_state_lock
19✔
474
            .lock_guard()
19✔
475
            .await
19✔
476
            .chain
477
            .archival_state()
19✔
478
            .get_block(parent_digest)
19✔
479
            .await?;
19✔
480
        debug!(
19✔
481
            "Completed parent block fetching from DB: {}",
×
482
            if parent_block.is_some() {
×
483
                "found".to_string()
×
484
            } else {
485
                "not found".to_string()
×
486
            }
487
        );
488

489
        // If parent is not known (but not genesis) request it.
490
        let Some(parent_block) = parent_block else {
19✔
491
            if parent_height.is_genesis() {
11✔
492
                peer_state.fork_reconciliation_blocks.clear();
1✔
493
                self.punish(NegativePeerSanction::DifferentGenesis).await?;
1✔
494
                return Ok(());
×
495
            }
10✔
496
            info!(
10✔
497
                "Parent not known: Requesting previous block with height {} from peer",
×
498
                parent_height
499
            );
500

501
            peer.send(PeerMessage::BlockRequestByHash(parent_digest))
10✔
502
                .await?;
10✔
503

504
            return Ok(());
10✔
505
        };
506

507
        // We want to treat the received fork reconciliation blocks (plus the
508
        // received block) in reverse order, from oldest to newest, because
509
        // they were requested from high to low block height.
510
        let mut new_blocks = peer_state.fork_reconciliation_blocks.clone();
8✔
511
        new_blocks.reverse();
8✔
512

8✔
513
        // Reset the fork resolution state since we got all the way back to a
8✔
514
        // block that we have.
8✔
515
        let fork_reconciliation_event = !peer_state.fork_reconciliation_blocks.is_empty();
8✔
516
        peer_state.fork_reconciliation_blocks.clear();
8✔
517

518
        if let Some(new_block_height) = self.handle_blocks(new_blocks, parent_block).await? {
8✔
519
            // If `BlockNotification` was received during a block reconciliation
520
            // event, then the peer might have one (or more (unlikely)) blocks
521
            // that we do not have. We should thus request those blocks.
522
            if fork_reconciliation_event
6✔
523
                && peer_state.highest_shared_block_height > new_block_height
6✔
524
            {
525
                peer.send(PeerMessage::BlockRequestByHeight(
1✔
526
                    peer_state.highest_shared_block_height,
1✔
527
                ))
1✔
528
                .await?;
1✔
529
            }
5✔
530
        }
2✔
531

532
        Ok(())
8✔
533
    }
20✔
534

535
    /// Handle peer messages and returns Ok(true) if connection should be closed.
536
    /// Connection should also be closed if an error is returned.
537
    /// Otherwise, returns OK(false).
538
    ///
539
    /// Locking:
540
    ///   * Acquires `global_state_lock` for read.
541
    ///   * Acquires `global_state_lock` for write via `self.punish(..)` and
542
    ///     `self.reward(..)`.
543
    async fn handle_peer_message<S>(
89✔
544
        &mut self,
89✔
545
        msg: PeerMessage,
89✔
546
        peer: &mut S,
89✔
547
        peer_state_info: &mut MutablePeerState,
89✔
548
    ) -> Result<bool>
89✔
549
    where
89✔
550
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
89✔
551
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
89✔
552
        <S as TryStream>::Error: std::error::Error,
89✔
553
    {
89✔
554
        debug!(
89✔
555
            "Received {} from peer {}",
×
556
            msg.get_type(),
×
557
            self.peer_address
558
        );
559
        match msg {
89✔
560
            PeerMessage::Bye => {
561
                // Note that the current peer is not removed from the global_state.peer_map here
562
                // but that this is done by the caller.
563
                info!("Got bye. Closing connection to peer");
38✔
564
                Ok(DISCONNECT_CONNECTION)
38✔
565
            }
566
            PeerMessage::PeerListRequest => {
567
                let peer_info = {
2✔
568
                    log_slow_scope!(fn_name!() + "::PeerMessage::PeerListRequest");
2✔
569

570
                    // We are interested in the address on which peers accept ingoing connections,
571
                    // not in the address in which they are connected to us. We are only interested in
572
                    // peers that accept incoming connections.
573
                    let mut peer_info: Vec<(SocketAddr, u128)> = self
2✔
574
                        .global_state_lock
2✔
575
                        .lock_guard()
2✔
576
                        .await
2✔
577
                        .net
578
                        .peer_map
579
                        .values()
2✔
580
                        .filter(|peer_info| peer_info.listen_address().is_some())
5✔
581
                        .take(MAX_PEER_LIST_LENGTH) // limit length of response
2✔
582
                        .map(|peer_info| {
5✔
583
                            (
5✔
584
                                // unwrap is safe bc of above `filter`
5✔
585
                                peer_info.listen_address().unwrap(),
5✔
586
                                peer_info.instance_id(),
5✔
587
                            )
5✔
588
                        })
5✔
589
                        .collect();
2✔
590

2✔
591
                    // We sort the returned list, so this function is easier to test
2✔
592
                    peer_info.sort_by_cached_key(|x| x.0);
5✔
593
                    peer_info
2✔
594
                };
2✔
595

2✔
596
                debug!("Responding with: {:?}", peer_info);
2✔
597
                peer.send(PeerMessage::PeerListResponse(peer_info)).await?;
2✔
598
                Ok(KEEP_CONNECTION_ALIVE)
2✔
599
            }
600
            PeerMessage::PeerListResponse(peers) => {
×
601
                log_slow_scope!(fn_name!() + "::PeerMessage::PeerListResponse");
×
602

×
603
                if peers.len() > MAX_PEER_LIST_LENGTH {
×
604
                    self.punish(NegativePeerSanction::FloodPeerListResponse)
×
605
                        .await?;
×
606
                }
×
607
                self.to_main_tx
×
608
                    .send(PeerTaskToMain::PeerDiscoveryAnswer((
×
609
                        peers,
×
610
                        self.peer_address,
×
611
                        // The distance to the revealed peers is 1 + this peer's distance
×
612
                        self.distance + 1,
×
613
                    )))
×
614
                    .await?;
×
615
                Ok(KEEP_CONNECTION_ALIVE)
×
616
            }
617
            PeerMessage::BlockNotificationRequest => {
618
                debug!("Got BlockNotificationRequest");
×
619

620
                peer.send(PeerMessage::BlockNotification(
×
621
                    self.global_state_lock
×
622
                        .lock_guard()
×
623
                        .await
×
624
                        .chain
625
                        .light_state()
×
626
                        .into(),
×
627
                ))
×
628
                .await?;
×
629

630
                Ok(KEEP_CONNECTION_ALIVE)
×
631
            }
632
            PeerMessage::BlockNotification(block_notification) => {
4✔
633
                const SYNC_CHALLENGE_COOLDOWN: Timestamp = Timestamp::minutes(10);
634

635
                let (tip_header, sync_anchor_is_set) = {
4✔
636
                    let state = self.global_state_lock.lock_guard().await;
4✔
637
                    (
4✔
638
                        *state.chain.light_state().header(),
4✔
639
                        state.net.sync_anchor.is_some(),
4✔
640
                    )
4✔
641
                };
4✔
642
                debug!(
4✔
643
                    "Got BlockNotification of height {}. Own height is {}",
×
644
                    block_notification.height, tip_header.height
645
                );
646

647
                let sync_mode_threshold = self.global_state_lock.cli().sync_mode_threshold;
4✔
648
                let now = self.now();
4✔
649
                let time_since_latest_successful_challenge = peer_state_info
4✔
650
                    .successful_sync_challenge_response_time
4✔
651
                    .map(|then| now - then);
4✔
652
                let cooldown_expired = time_since_latest_successful_challenge
4✔
653
                    .is_none_or(|time_passed| time_passed > SYNC_CHALLENGE_COOLDOWN);
4✔
654
                let exceeds_sync_mode_threshold = GlobalState::sync_mode_threshold_stateless(
4✔
655
                    &tip_header,
4✔
656
                    block_notification.height,
4✔
657
                    block_notification.cumulative_proof_of_work,
4✔
658
                    sync_mode_threshold,
4✔
659
                );
4✔
660
                if cooldown_expired && exceeds_sync_mode_threshold {
4✔
661
                    debug!("sync mode criterion satisfied.");
1✔
662

663
                    if peer_state_info.sync_challenge.is_some() {
1✔
664
                        warn!("Cannot launch new sync challenge because one is already on-going.");
×
665
                        return Ok(KEEP_CONNECTION_ALIVE);
×
666
                    }
1✔
667

1✔
668
                    info!(
1✔
669
                        "Peer indicates block which satisfies sync mode criterion, issuing challenge."
×
670
                    );
671
                    let challenge = SyncChallenge::generate(
1✔
672
                        &block_notification,
1✔
673
                        tip_header.height,
1✔
674
                        self.rng.random(),
1✔
675
                    );
1✔
676
                    peer_state_info.sync_challenge = Some(IssuedSyncChallenge::new(
1✔
677
                        challenge,
1✔
678
                        block_notification.cumulative_proof_of_work,
1✔
679
                        self.now(),
1✔
680
                    ));
1✔
681

1✔
682
                    debug!("sending challenge ...");
1✔
683
                    peer.send(PeerMessage::SyncChallenge(challenge)).await?;
1✔
684

685
                    return Ok(KEEP_CONNECTION_ALIVE);
1✔
686
                }
3✔
687

3✔
688
                peer_state_info.highest_shared_block_height = block_notification.height;
3✔
689
                let block_is_new = tip_header.cumulative_proof_of_work
3✔
690
                    < block_notification.cumulative_proof_of_work;
3✔
691

3✔
692
                debug!("block_is_new: {}", block_is_new);
3✔
693

694
                if block_is_new
3✔
695
                    && peer_state_info.fork_reconciliation_blocks.is_empty()
3✔
696
                    && !sync_anchor_is_set
2✔
697
                    && !exceeds_sync_mode_threshold
2✔
698
                {
699
                    debug!(
1✔
700
                        "sending BlockRequestByHeight to peer for block with height {}",
×
701
                        block_notification.height
702
                    );
703
                    peer.send(PeerMessage::BlockRequestByHeight(block_notification.height))
1✔
704
                        .await?;
1✔
705
                } else {
706
                    debug!(
2✔
707
                        "ignoring peer block. height {}. new: {}, reconciling_fork: {}",
×
708
                        block_notification.height,
×
709
                        block_is_new,
×
710
                        !peer_state_info.fork_reconciliation_blocks.is_empty()
×
711
                    );
712
                }
713

714
                Ok(KEEP_CONNECTION_ALIVE)
3✔
715
            }
716
            PeerMessage::SyncChallenge(sync_challenge) => {
2✔
717
                let response = {
×
718
                    log_slow_scope!(fn_name!() + "::PeerMessage::SyncChallenge");
2✔
719

2✔
720
                    info!("Got sync challenge from {}", self.peer_address.ip());
2✔
721

722
                    let response = self
2✔
723
                        .global_state_lock
2✔
724
                        .lock_guard()
2✔
725
                        .await
2✔
726
                        .response_to_sync_challenge(sync_challenge)
2✔
727
                        .await;
2✔
728

729
                    match response {
2✔
730
                        Ok(resp) => resp,
×
731
                        Err(e) => {
2✔
732
                            warn!("could not generate sync challenge response:\n{e}");
2✔
733
                            self.punish(NegativePeerSanction::InvalidSyncChallenge)
2✔
734
                                .await?;
2✔
735
                            return Ok(KEEP_CONNECTION_ALIVE);
2✔
736
                        }
737
                    }
738
                };
739

740
                info!(
×
741
                    "Responding to sync challenge from {}",
×
742
                    self.peer_address.ip()
×
743
                );
744
                peer.send(PeerMessage::SyncChallengeResponse(Box::new(response)))
×
745
                    .await?;
×
746

747
                Ok(KEEP_CONNECTION_ALIVE)
×
748
            }
749
            PeerMessage::SyncChallengeResponse(challenge_response) => {
1✔
750
                const SYNC_RESPONSE_TIMEOUT: Timestamp = Timestamp::seconds(45);
751

752
                log_slow_scope!(fn_name!() + "::PeerMessage::SyncChallengeResponse");
1✔
753
                info!(
1✔
754
                    "Got sync challenge response from {}",
×
755
                    self.peer_address.ip()
×
756
                );
757

758
                // The purpose of the sync challenge and sync challenge response
759
                // is to avoid going into sync mode based on a malicious target
760
                // fork. Instead of verifying that the claimed proof-of-work
761
                // number is correct (which would require sending and verifying,
762
                // at least, all blocks between luca (whatever that is) and the
763
                // claimed tip), we use a heuristic that requires less
764
                // communication and less verification work. The downside of
765
                // using a heuristic here is a nonzero false positive and false
766
                // negative rate. Note that the false negative event
767
                // (maliciously sending someone into sync mode based on a bogus
768
                // fork) still requires a significant amount of work from the
769
                // attacker, *in addition* to being lucky. Also, down the line
770
                // succinctness (and more specifically, recursive block
771
                // validation) obviates this entire subprotocol.
772

773
                // Did we issue a challenge?
774
                let Some(issued_challenge) = peer_state_info.sync_challenge else {
1✔
775
                    warn!("Sync challenge response was not prompted.");
×
776
                    self.punish(NegativePeerSanction::UnexpectedSyncChallengeResponse)
×
777
                        .await?;
×
778
                    return Ok(KEEP_CONNECTION_ALIVE);
×
779
                };
780

781
                // Reset the challenge, regardless of the response's success.
782
                peer_state_info.sync_challenge = None;
1✔
783

1✔
784
                // Does response match issued challenge?
1✔
785
                if !challenge_response.matches(issued_challenge) {
1✔
786
                    self.punish(NegativePeerSanction::InvalidSyncChallengeResponse)
×
787
                        .await?;
×
788
                    return Ok(KEEP_CONNECTION_ALIVE);
×
789
                }
1✔
790

1✔
791
                // Does response verify?
1✔
792
                let claimed_tip_height = challenge_response.tip.header.height;
1✔
793
                let now = self.now();
1✔
794
                if !challenge_response.is_valid(now).await {
1✔
795
                    self.punish(NegativePeerSanction::InvalidSyncChallengeResponse)
×
796
                        .await?;
×
797
                    return Ok(KEEP_CONNECTION_ALIVE);
×
798
                }
1✔
799

800
                // Does cumulative proof-of-work evolve reasonably?
801
                let own_tip_header = *self
1✔
802
                    .global_state_lock
1✔
803
                    .lock_guard()
1✔
804
                    .await
1✔
805
                    .chain
806
                    .light_state()
1✔
807
                    .header();
1✔
808
                if !challenge_response
1✔
809
                    .check_pow(self.global_state_lock.cli().network, own_tip_header.height)
1✔
810
                {
811
                    self.punish(NegativePeerSanction::FishyPowEvolutionChallengeResponse)
×
812
                        .await?;
×
813
                    return Ok(KEEP_CONNECTION_ALIVE);
×
814
                }
1✔
815

1✔
816
                // Is there some specific (*i.e.*, not aggregate) proof of work?
1✔
817
                if !challenge_response.check_difficulty(own_tip_header.difficulty) {
1✔
818
                    self.punish(NegativePeerSanction::FishyDifficultiesChallengeResponse)
×
819
                        .await?;
×
820
                    return Ok(KEEP_CONNECTION_ALIVE);
×
821
                }
1✔
822

1✔
823
                // Did it come in time?
1✔
824
                if now - issued_challenge.issued_at > SYNC_RESPONSE_TIMEOUT {
1✔
825
                    self.punish(NegativePeerSanction::TimedOutSyncChallengeResponse)
×
826
                        .await?;
×
827
                    return Ok(KEEP_CONNECTION_ALIVE);
×
828
                }
1✔
829

1✔
830
                info!("Successful sync challenge response; relaying peer tip info to main loop.");
1✔
831
                peer_state_info.successful_sync_challenge_response_time = Some(now);
1✔
832

1✔
833
                let mut sync_mmra_anchor = challenge_response.tip.body.block_mmr_accumulator;
1✔
834
                sync_mmra_anchor.append(issued_challenge.challenge.tip_digest);
1✔
835

1✔
836
                // Inform main loop
1✔
837
                self.to_main_tx
1✔
838
                    .send(PeerTaskToMain::AddPeerMaxBlockHeight {
1✔
839
                        peer_address: self.peer_address,
1✔
840
                        claimed_height: claimed_tip_height,
1✔
841
                        claimed_cumulative_pow: issued_challenge.accumulated_pow,
1✔
842
                        claimed_block_mmra: sync_mmra_anchor,
1✔
843
                    })
1✔
844
                    .await?;
1✔
845

846
                Ok(KEEP_CONNECTION_ALIVE)
1✔
847
            }
848
            PeerMessage::BlockRequestByHash(block_digest) => {
×
849
                match self
×
850
                    .global_state_lock
×
851
                    .lock_guard()
×
852
                    .await
×
853
                    .chain
854
                    .archival_state()
×
855
                    .get_block(block_digest)
×
856
                    .await?
×
857
                {
858
                    None => {
859
                        // TODO: Consider punishing here
860
                        warn!("Peer requested unknown block with hash {}", block_digest);
×
861
                        Ok(KEEP_CONNECTION_ALIVE)
×
862
                    }
863
                    Some(b) => {
×
864
                        peer.send(PeerMessage::Block(Box::new(b.try_into().unwrap())))
×
865
                            .await?;
×
866
                        Ok(KEEP_CONNECTION_ALIVE)
×
867
                    }
868
                }
869
            }
870
            PeerMessage::BlockRequestByHeight(block_height) => {
4✔
871
                let block_response = {
3✔
872
                    log_slow_scope!(fn_name!() + "::PeerMessage::BlockRequestByHeight");
4✔
873

4✔
874
                    debug!("Got BlockRequestByHeight of height {}", block_height);
4✔
875

876
                    let canonical_block_digest = self
4✔
877
                        .global_state_lock
4✔
878
                        .lock_guard()
4✔
879
                        .await
4✔
880
                        .chain
881
                        .archival_state()
4✔
882
                        .archival_block_mmr
4✔
883
                        .ammr()
4✔
884
                        .try_get_leaf(block_height.into())
4✔
885
                        .await;
4✔
886

887
                    let canonical_block_digest = match canonical_block_digest {
4✔
888
                        None => {
889
                            let own_tip_height = self
1✔
890
                                .global_state_lock
1✔
891
                                .lock_guard()
1✔
892
                                .await
1✔
893
                                .chain
894
                                .light_state()
1✔
895
                                .header()
1✔
896
                                .height;
1✔
897
                            warn!("Got block request by height ({block_height}) for unknown block. Own tip height is {own_tip_height}.");
1✔
898
                            self.punish(NegativePeerSanction::BlockRequestUnknownHeight)
1✔
899
                                .await?;
1✔
900

901
                            return Ok(KEEP_CONNECTION_ALIVE);
1✔
902
                        }
903
                        Some(digest) => digest,
3✔
904
                    };
905

906
                    let canonical_chain_block: Block = self
3✔
907
                        .global_state_lock
3✔
908
                        .lock_guard()
3✔
909
                        .await
3✔
910
                        .chain
911
                        .archival_state()
3✔
912
                        .get_block(canonical_block_digest)
3✔
913
                        .await?
3✔
914
                        .unwrap();
3✔
915

3✔
916
                    PeerMessage::Block(Box::new(canonical_chain_block.try_into().unwrap()))
3✔
917
                };
3✔
918

3✔
919
                debug!("Sending block");
3✔
920
                peer.send(block_response).await?;
3✔
921
                debug!("Sent block");
3✔
922
                Ok(KEEP_CONNECTION_ALIVE)
3✔
923
            }
924
            PeerMessage::Block(t_block) => {
20✔
925
                log_slow_scope!(fn_name!() + "::PeerMessage::Block");
20✔
926

20✔
927
                info!(
20✔
928
                    "Got new block from peer {}, height {}, mined {}",
×
929
                    self.peer_address,
×
930
                    t_block.header.height,
×
931
                    t_block.header.timestamp.standard_format()
×
932
                );
933
                let new_block_height = t_block.header.height;
20✔
934

935
                let block = match Block::try_from(*t_block) {
20✔
936
                    Ok(block) => Box::new(block),
20✔
937
                    Err(e) => {
×
938
                        warn!("Peer sent invalid block: {e:?}");
×
939
                        self.punish(NegativePeerSanction::InvalidTransferBlock)
×
940
                            .await?;
×
941

942
                        return Ok(KEEP_CONNECTION_ALIVE);
×
943
                    }
944
                };
945

946
                // Update the value for the highest known height that peer possesses iff
947
                // we are not in a fork reconciliation state.
948
                if peer_state_info.fork_reconciliation_blocks.is_empty() {
20✔
949
                    peer_state_info.highest_shared_block_height = new_block_height;
10✔
950
                }
10✔
951

952
                self.try_ensure_path(block, peer, peer_state_info).await?;
20✔
953

954
                // Reward happens as part of `try_ensure_path`
955

956
                Ok(KEEP_CONNECTION_ALIVE)
19✔
957
            }
958
            PeerMessage::BlockRequestBatch(BlockRequestBatch {
959
                known_blocks,
8✔
960
                max_response_len,
8✔
961
                anchor,
8✔
962
            }) => {
8✔
963
                debug!(
8✔
964
                    "Received BlockRequestBatch from peer {}, max_response_len: {max_response_len}",
×
965
                    self.peer_address
966
                );
967

968
                if known_blocks.len() > MAX_NUM_DIGESTS_IN_BATCH_REQUEST {
8✔
969
                    self.punish(NegativePeerSanction::BatchBlocksRequestTooManyDigests)
×
970
                        .await?;
×
971

972
                    return Ok(KEEP_CONNECTION_ALIVE);
×
973
                }
8✔
974

975
                // The last block in the list of the peers known block is the
976
                // earliest block, block with lowest height, the peer has
977
                // requested. If it does not belong to canonical chain, none of
978
                // the later will. So we can do an early abort in that case.
979
                let least_preferred = match known_blocks.last() {
8✔
980
                    Some(least_preferred) => *least_preferred,
8✔
981
                    None => {
982
                        self.punish(NegativePeerSanction::BatchBlocksRequestEmpty)
×
983
                            .await?;
×
984

985
                        return Ok(KEEP_CONNECTION_ALIVE);
×
986
                    }
987
                };
988

989
                let state = self.global_state_lock.lock_guard().await;
8✔
990
                let block_mmr_num_leafs = state.chain.light_state().header().height.next().into();
8✔
991
                let luca_is_known = state
8✔
992
                    .chain
8✔
993
                    .archival_state()
8✔
994
                    .block_belongs_to_canonical_chain(least_preferred)
8✔
995
                    .await;
8✔
996
                if !luca_is_known || anchor.num_leafs() > block_mmr_num_leafs {
8✔
997
                    drop(state);
×
998
                    self.punish(NegativePeerSanction::BatchBlocksUnknownRequest)
×
999
                        .await?;
×
1000
                    peer.send(PeerMessage::UnableToSatisfyBatchRequest).await?;
×
1001

1002
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1003
                }
8✔
1004

1005
                // Happy case: At least *one* of the blocks referenced by peer
1006
                // is known to us.
1007
                let first_block_in_response = {
8✔
1008
                    let mut first_block_in_response: Option<BlockHeight> = None;
8✔
1009
                    for block_digest in known_blocks {
10✔
1010
                        if state
10✔
1011
                            .chain
10✔
1012
                            .archival_state()
10✔
1013
                            .block_belongs_to_canonical_chain(block_digest)
10✔
1014
                            .await
10✔
1015
                        {
1016
                            let height = state
8✔
1017
                                .chain
8✔
1018
                                .archival_state()
8✔
1019
                                .get_block_header(block_digest)
8✔
1020
                                .await
8✔
1021
                                .unwrap()
8✔
1022
                                .height;
8✔
1023
                            first_block_in_response = Some(height);
8✔
1024
                            debug!(
8✔
1025
                                "Found block in canonical chain for batch response: {}",
×
1026
                                block_digest
1027
                            );
1028
                            break;
8✔
1029
                        }
2✔
1030
                    }
1031

1032
                    first_block_in_response
8✔
1033
                        .expect("existence of LUCA should have been established already.")
8✔
1034
                };
8✔
1035

8✔
1036
                debug!(
8✔
1037
                    "Peer's most preferred block has height {first_block_in_response}.\
×
1038
                 Now building response from that height."
×
1039
                );
1040

1041
                // Get the relevant blocks, at most batch-size many, descending from the
1042
                // peer's (alleged) most canonical block. Don't exceed `max_response_len`
1043
                // or `STANDARD_BLOCK_BATCH_SIZE` number of blocks in response.
1044
                let max_response_len = cmp::min(
8✔
1045
                    max_response_len,
8✔
1046
                    self.global_state_lock.cli().sync_mode_threshold,
8✔
1047
                );
8✔
1048
                let max_response_len = cmp::max(max_response_len, MINIMUM_BLOCK_BATCH_SIZE);
8✔
1049
                let max_response_len = cmp::min(max_response_len, STANDARD_BLOCK_BATCH_SIZE);
8✔
1050

8✔
1051
                let mut digests_of_returned_blocks = Vec::with_capacity(max_response_len);
8✔
1052
                let response_start_height: u64 = first_block_in_response.into();
8✔
1053
                let mut i: u64 = 1;
8✔
1054
                while digests_of_returned_blocks.len() < max_response_len {
31✔
1055
                    let block_height = response_start_height + i;
31✔
1056
                    match state
31✔
1057
                        .chain
31✔
1058
                        .archival_state()
31✔
1059
                        .archival_block_mmr
31✔
1060
                        .ammr()
31✔
1061
                        .try_get_leaf(block_height)
31✔
1062
                        .await
31✔
1063
                    {
1064
                        Some(digest) => {
23✔
1065
                            digests_of_returned_blocks.push(digest);
23✔
1066
                        }
23✔
1067
                        None => break,
8✔
1068
                    }
1069
                    i += 1;
23✔
1070
                }
1071

1072
                let mut returned_blocks: Vec<Block> =
8✔
1073
                    Vec::with_capacity(digests_of_returned_blocks.len());
8✔
1074
                for block_digest in digests_of_returned_blocks {
31✔
1075
                    let block = state
23✔
1076
                        .chain
23✔
1077
                        .archival_state()
23✔
1078
                        .get_block(block_digest)
23✔
1079
                        .await?
23✔
1080
                        .unwrap();
23✔
1081
                    returned_blocks.push(block);
23✔
1082
                }
1083

1084
                let response = Self::batch_response(&state, returned_blocks, &anchor).await;
8✔
1085

1086
                // issue 457. do not hold lock across a peer.send(), nor self.punish()
1087
                drop(state);
8✔
1088

1089
                let Some(response) = response else {
8✔
1090
                    warn!("Unable to satisfy batch-block request");
×
1091
                    self.punish(NegativePeerSanction::BatchBlocksUnknownRequest)
×
1092
                        .await?;
×
1093
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1094
                };
1095

1096
                debug!("Returning {} blocks in batch response", response.len());
8✔
1097

1098
                let response = PeerMessage::BlockResponseBatch(response);
8✔
1099
                peer.send(response).await?;
8✔
1100

1101
                Ok(KEEP_CONNECTION_ALIVE)
8✔
1102
            }
1103
            PeerMessage::BlockResponseBatch(authenticated_blocks) => {
×
1104
                log_slow_scope!(fn_name!() + "::PeerMessage::BlockResponseBatch");
×
1105

×
1106
                debug!(
×
1107
                    "handling block response batch with {} blocks",
×
1108
                    authenticated_blocks.len()
×
1109
                );
1110

1111
                // (Alan:) why is there even a minimum?
1112
                if authenticated_blocks.len() < MINIMUM_BLOCK_BATCH_SIZE {
×
1113
                    warn!("Got smaller batch response than allowed");
×
1114
                    self.punish(NegativePeerSanction::TooShortBlockBatch)
×
1115
                        .await?;
×
1116
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1117
                }
×
1118

1119
                // Verify that we are in fact in syncing mode
1120
                // TODO: Separate peer messages into those allowed under syncing
1121
                // and those that are not
1122
                let Some(sync_anchor) = self
×
1123
                    .global_state_lock
×
1124
                    .lock_guard()
×
1125
                    .await
×
1126
                    .net
1127
                    .sync_anchor
1128
                    .clone()
×
1129
                else {
1130
                    warn!("Received a batch of blocks without being in syncing mode");
×
1131
                    self.punish(NegativePeerSanction::ReceivedBatchBlocksOutsideOfSync)
×
1132
                        .await?;
×
1133
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1134
                };
1135

1136
                // Verify that the response matches the current state
1137
                // We get the latest block from the DB here since this message is
1138
                // only valid for archival nodes.
1139
                let (first_block, _) = &authenticated_blocks[0];
×
1140
                let first_blocks_parent_digest: Digest = first_block.header.prev_block_digest;
×
1141
                let most_canonical_own_block_match: Option<Block> = self
×
1142
                    .global_state_lock
×
1143
                    .lock_guard()
×
1144
                    .await
×
1145
                    .chain
1146
                    .archival_state()
×
1147
                    .get_block(first_blocks_parent_digest)
×
1148
                    .await
×
1149
                    .expect("Block lookup must succeed");
×
1150
                let most_canonical_own_block_match: Block = match most_canonical_own_block_match {
×
1151
                    Some(block) => block,
×
1152
                    None => {
1153
                        warn!("Got batch response with invalid start block");
×
1154
                        self.punish(NegativePeerSanction::BatchBlocksInvalidStartHeight)
×
1155
                            .await?;
×
1156
                        return Ok(KEEP_CONNECTION_ALIVE);
×
1157
                    }
1158
                };
1159

1160
                // Convert all blocks to Block objects
1161
                debug!(
×
1162
                    "Found own block of height {} to match received batch",
×
1163
                    most_canonical_own_block_match.kernel.header.height
×
1164
                );
1165
                let mut received_blocks = vec![];
×
1166
                for (t_block, membership_proof) in authenticated_blocks {
×
1167
                    let Ok(block) = Block::try_from(t_block) else {
×
1168
                        warn!("Received invalid transfer block from peer");
×
1169
                        self.punish(NegativePeerSanction::InvalidTransferBlock)
×
1170
                            .await?;
×
1171
                        return Ok(KEEP_CONNECTION_ALIVE);
×
1172
                    };
1173

1174
                    if !membership_proof.verify(
×
1175
                        block.header().height.into(),
×
1176
                        block.hash(),
×
1177
                        &sync_anchor.block_mmr.peaks(),
×
1178
                        sync_anchor.block_mmr.num_leafs(),
×
1179
                    ) {
×
1180
                        warn!("Authentication of received block fails relative to anchor");
×
1181
                        self.punish(NegativePeerSanction::InvalidBlockMmrAuthentication)
×
1182
                            .await?;
×
1183
                        return Ok(KEEP_CONNECTION_ALIVE);
×
1184
                    }
×
1185

×
1186
                    received_blocks.push(block);
×
1187
                }
1188

1189
                // Get the latest block that we know of and handle all received blocks
1190
                self.handle_blocks(received_blocks, most_canonical_own_block_match)
×
1191
                    .await?;
×
1192

1193
                // Reward happens as part of `handle_blocks`.
1194

1195
                Ok(KEEP_CONNECTION_ALIVE)
×
1196
            }
1197
            PeerMessage::UnableToSatisfyBatchRequest => {
1198
                log_slow_scope!(fn_name!() + "::PeerMessage::UnableToSatisfyBatchRequest");
×
1199
                warn!(
×
1200
                    "Peer {} reports inability to satisfy batch request.",
×
1201
                    self.peer_address
1202
                );
1203

1204
                Ok(KEEP_CONNECTION_ALIVE)
×
1205
            }
1206
            PeerMessage::Handshake(_) => {
1207
                log_slow_scope!(fn_name!() + "::PeerMessage::Handshake");
×
1208

×
1209
                // The handshake should have been sent during connection
×
1210
                // initialization. Here it is out of order at best, malicious at
×
1211
                // worst.
×
1212
                self.punish(NegativePeerSanction::InvalidMessage).await?;
×
1213
                Ok(KEEP_CONNECTION_ALIVE)
×
1214
            }
1215
            PeerMessage::ConnectionStatus(_) => {
1216
                log_slow_scope!(fn_name!() + "::PeerMessage::ConnectionStatus");
×
1217

×
1218
                // The connection status should have been sent during connection
×
1219
                // initialization. Here it is out of order at best, malicious at
×
1220
                // worst.
×
1221

×
1222
                self.punish(NegativePeerSanction::InvalidMessage).await?;
×
1223
                Ok(KEEP_CONNECTION_ALIVE)
×
1224
            }
1225
            PeerMessage::Transaction(transaction) => {
2✔
1226
                log_slow_scope!(fn_name!() + "::PeerMessage::Transaction");
2✔
1227

2✔
1228
                debug!(
2✔
1229
                    "`peer_loop` received following transaction from peer. {} inputs, {} outputs. Synced to mutator set hash: {}",
×
1230
                    transaction.kernel.inputs.len(),
×
1231
                    transaction.kernel.outputs.len(),
×
1232
                    transaction.kernel.mutator_set_hash
×
1233
                );
1234

1235
                let transaction: Transaction = (*transaction).into();
2✔
1236

2✔
1237
                // 1. If transaction is invalid, punish.
2✔
1238
                if !transaction.is_valid().await {
2✔
1239
                    warn!("Received invalid tx");
×
1240
                    self.punish(NegativePeerSanction::InvalidTransaction)
×
1241
                        .await?;
×
1242
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1243
                }
2✔
1244

2✔
1245
                // 2. If transaction has coinbase, punish.
2✔
1246
                // Transactions received from peers have not been mined yet.
2✔
1247
                // Only the miner is allowed to produce transactions with non-empty coinbase fields.
2✔
1248
                if transaction.kernel.coinbase.is_some() {
2✔
1249
                    warn!("Received non-mined transaction with coinbase.");
×
1250
                    self.punish(NegativePeerSanction::NonMinedTransactionHasCoinbase)
×
1251
                        .await?;
×
1252
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1253
                }
2✔
1254

2✔
1255
                // 3. If negative fee, punish.
2✔
1256
                if transaction.kernel.fee.is_negative() {
2✔
1257
                    warn!("Received negative-fee transaction.");
×
1258
                    self.punish(NegativePeerSanction::TransactionWithNegativeFee)
×
1259
                        .await?;
×
1260
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1261
                }
2✔
1262

2✔
1263
                // 4. If transaction is already known, ignore.
2✔
1264
                if self
2✔
1265
                    .global_state_lock
2✔
1266
                    .lock_guard()
2✔
1267
                    .await
2✔
1268
                    .mempool
1269
                    .contains_with_higher_proof_quality(
1270
                        transaction.kernel.txid(),
2✔
1271
                        transaction.proof.proof_quality()?,
2✔
1272
                    )
1273
                {
1274
                    warn!("Received transaction that was already known");
×
1275

1276
                    // We received a transaction that we *probably* haven't requested.
1277
                    // Consider punishing here, if this is abused.
1278
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1279
                }
2✔
1280

1281
                // 5. if transaction is not confirmable, punish.
1282
                let (tip, mutator_set_accumulator_after) = {
2✔
1283
                    let state = self.global_state_lock.lock_guard().await;
2✔
1284

1285
                    (
2✔
1286
                        state.chain.light_state().hash(),
2✔
1287
                        state.chain.light_state().mutator_set_accumulator_after(),
2✔
1288
                    )
2✔
1289
                };
2✔
1290
                if !transaction.is_confirmable_relative_to(&mutator_set_accumulator_after) {
2✔
1291
                    warn!(
×
1292
                        "Received unconfirmable transaction with TXID {}. Unconfirmable because:",
×
1293
                        transaction.kernel.txid()
×
1294
                    );
1295
                    // get fine-grained error code for informative logging
1296
                    let confirmability_error_code = transaction
×
1297
                        .kernel
×
1298
                        .is_confirmable_relative_to(&mutator_set_accumulator_after);
×
1299
                    match confirmability_error_code {
×
1300
                        Ok(_) => unreachable!(),
×
1301
                        Err(TransactionConfirmabilityError::InvalidRemovalRecord(index)) => {
×
1302
                            warn!("invalid removal record (at index {index})");
×
1303
                            let invalid_removal_record = transaction.kernel.inputs[index].clone();
×
1304
                            let removal_record_error_code = invalid_removal_record
×
1305
                                .validate_inner(&mutator_set_accumulator_after);
×
1306
                            debug!(
×
1307
                                "Absolute index set of removal record {index}: {:?}",
×
1308
                                invalid_removal_record.absolute_indices
1309
                            );
1310
                            match removal_record_error_code {
×
1311
                                Ok(_) => unreachable!(),
×
1312
                                Err(RemovalRecordValidityError::AbsentAuthenticatedChunk) => {
1313
                                    debug!("invalid because membership proof is missing");
×
1314
                                }
1315
                                Err(RemovalRecordValidityError::InvalidSwbfiMmrMp {
1316
                                    chunk_index,
×
1317
                                }) => {
×
1318
                                    debug!("invalid because membership proof for chunk index {chunk_index} is invalid");
×
1319
                                }
1320
                            };
1321
                            self.punish(NegativePeerSanction::UnconfirmableTransaction)
×
1322
                                .await?;
×
1323
                            return Ok(KEEP_CONNECTION_ALIVE);
×
1324
                        }
1325
                        Err(TransactionConfirmabilityError::DuplicateInputs) => {
1326
                            warn!("duplicate inputs");
×
1327
                            self.punish(NegativePeerSanction::DoubleSpendingTransaction)
×
1328
                                .await?;
×
1329
                            return Ok(KEEP_CONNECTION_ALIVE);
×
1330
                        }
1331
                        Err(TransactionConfirmabilityError::AlreadySpentInput(index)) => {
×
1332
                            warn!("already spent input (at index {index})");
×
1333
                            self.punish(NegativePeerSanction::DoubleSpendingTransaction)
×
1334
                                .await?;
×
1335
                            return Ok(KEEP_CONNECTION_ALIVE);
×
1336
                        }
1337
                    };
1338
                }
2✔
1339

2✔
1340
                // If transaction cannot be applied to mutator set, punish.
2✔
1341
                // I don't think this can happen when above checks pass but we include
2✔
1342
                // the check to ensure that transaction can be applied.
2✔
1343
                let ms_update = MutatorSetUpdate::new(
2✔
1344
                    transaction.kernel.inputs.clone(),
2✔
1345
                    transaction.kernel.outputs.clone(),
2✔
1346
                );
2✔
1347
                let can_apply = ms_update
2✔
1348
                    .apply_to_accumulator(&mut mutator_set_accumulator_after.clone())
2✔
1349
                    .is_ok();
2✔
1350
                if !can_apply {
2✔
1351
                    warn!("Cannot apply transaction to current mutator set.");
×
1352
                    warn!("Transaction ID: {}", transaction.kernel.txid());
×
1353
                    self.punish(NegativePeerSanction::CannotApplyTransactionToMutatorSet)
×
1354
                        .await?;
×
1355
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1356
                }
2✔
1357

2✔
1358
                let tx_timestamp = transaction.kernel.timestamp;
2✔
1359

2✔
1360
                // 6. Ignore if transaction is too old
2✔
1361
                let now = self.now();
2✔
1362
                if tx_timestamp < now - Timestamp::seconds(MEMPOOL_TX_THRESHOLD_AGE_IN_SECS) {
2✔
1363
                    // TODO: Consider punishing here
1364
                    warn!("Received too old tx");
×
1365
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1366
                }
2✔
1367

2✔
1368
                // 7. Ignore if transaction is too far into the future
2✔
1369
                if tx_timestamp
2✔
1370
                    > now + Timestamp::seconds(MEMPOOL_IGNORE_TRANSACTIONS_THIS_MANY_SECS_AHEAD)
2✔
1371
                {
1372
                    // TODO: Consider punishing here
1373
                    warn!("Received tx too far into the future. Got timestamp: {tx_timestamp:?}");
×
1374
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1375
                }
2✔
1376

2✔
1377
                // Otherwise, relay to main
2✔
1378
                let pt2m_transaction = PeerTaskToMainTransaction {
2✔
1379
                    transaction,
2✔
1380
                    confirmable_for_block: tip,
2✔
1381
                };
2✔
1382
                self.to_main_tx
2✔
1383
                    .send(PeerTaskToMain::Transaction(Box::new(pt2m_transaction)))
2✔
1384
                    .await?;
2✔
1385

1386
                Ok(KEEP_CONNECTION_ALIVE)
2✔
1387
            }
1388
            PeerMessage::TransactionNotification(tx_notification) => {
6✔
1389
                // addresses #457
6✔
1390
                // new scope for state read-lock to avoid holding across peer.send()
6✔
1391
                {
6✔
1392
                    log_slow_scope!(fn_name!() + "::PeerMessage::TransactionNotification");
6✔
1393

1394
                    // 1. Ignore if we already know this transaction, and
1395
                    // the proof quality is not higher than what we already know.
1396
                    let state = self.global_state_lock.lock_guard().await;
6✔
1397
                    let transaction_of_same_or_higher_proof_quality_is_known =
6✔
1398
                        state.mempool.contains_with_higher_proof_quality(
6✔
1399
                            tx_notification.txid,
6✔
1400
                            tx_notification.proof_quality,
6✔
1401
                        );
6✔
1402
                    if transaction_of_same_or_higher_proof_quality_is_known {
6✔
1403
                        debug!("transaction with same or higher proof quality was already known");
4✔
1404
                        return Ok(KEEP_CONNECTION_ALIVE);
4✔
1405
                    }
2✔
1406

2✔
1407
                    // Only accept transactions that do not require executing
2✔
1408
                    // `update`.
2✔
1409
                    if state
2✔
1410
                        .chain
2✔
1411
                        .light_state()
2✔
1412
                        .mutator_set_accumulator_after()
2✔
1413
                        .hash()
2✔
1414
                        != tx_notification.mutator_set_hash
2✔
1415
                    {
1416
                        debug!("transaction refers to non-canonical mutator set state");
×
1417
                        return Ok(KEEP_CONNECTION_ALIVE);
×
1418
                    }
2✔
1419
                }
2✔
1420

2✔
1421
                // 2. Request the actual `Transaction` from peer
2✔
1422
                debug!("requesting transaction from peer");
2✔
1423
                peer.send(PeerMessage::TransactionRequest(tx_notification.txid))
2✔
1424
                    .await?;
2✔
1425

1426
                Ok(KEEP_CONNECTION_ALIVE)
2✔
1427
            }
1428
            PeerMessage::TransactionRequest(transaction_identifier) => {
×
1429
                if let Some(transaction) = self
×
1430
                    .global_state_lock
×
1431
                    .lock_guard()
×
1432
                    .await
×
1433
                    .mempool
1434
                    .get(transaction_identifier)
×
1435
                {
1436
                    if let Ok(transfer_transaction) = transaction.try_into() {
×
1437
                        peer.send(PeerMessage::Transaction(Box::new(transfer_transaction)))
×
1438
                            .await?;
×
1439
                    } else {
1440
                        warn!("Peer requested transaction that cannot be converted to transfer object");
×
1441
                    }
1442
                }
×
1443

1444
                Ok(KEEP_CONNECTION_ALIVE)
×
1445
            }
1446
            PeerMessage::BlockProposalNotification(block_proposal_notification) => {
1✔
1447
                let verdict = self
1✔
1448
                    .global_state_lock
1✔
1449
                    .lock_guard()
1✔
1450
                    .await
1✔
1451
                    .favor_incoming_block_proposal(
1✔
1452
                        block_proposal_notification.height,
1✔
1453
                        block_proposal_notification.guesser_fee,
1✔
1454
                    );
1✔
1455
                match verdict {
1✔
1456
                    Ok(_) => {
1457
                        peer.send(PeerMessage::BlockProposalRequest(
1✔
1458
                            BlockProposalRequest::new(block_proposal_notification.body_mast_hash),
1✔
1459
                        ))
1✔
1460
                        .await?
1✔
1461
                    }
NEW
1462
                    Err(reject_reason) => {
×
NEW
1463
                        info!(
×
NEW
1464
                        "Rejecting notification of block proposal with guesser fee {} from peer \
×
NEW
1465
                        {}. Reason:\n{reject_reason}",
×
NEW
1466
                        block_proposal_notification.guesser_fee.display_n_decimals(5),
×
1467
                        self.peer_address
1468
                    )
1469
                    }
1470
                }
1471

1472
                Ok(KEEP_CONNECTION_ALIVE)
1✔
1473
            }
1474
            PeerMessage::BlockProposalRequest(block_proposal_request) => {
×
1475
                let matching_proposal = self
×
1476
                    .global_state_lock
×
1477
                    .lock_guard()
×
1478
                    .await
×
1479
                    .mining_state
1480
                    .block_proposal
1481
                    .filter(|x| x.body().mast_hash() == block_proposal_request.body_mast_hash)
×
1482
                    .map(|x| x.to_owned());
×
1483
                if let Some(proposal) = matching_proposal {
×
1484
                    peer.send(PeerMessage::BlockProposal(Box::new(proposal)))
×
1485
                        .await?;
×
1486
                } else {
1487
                    self.punish(NegativePeerSanction::BlockProposalNotFound)
×
1488
                        .await?;
×
1489
                }
1490

1491
                Ok(KEEP_CONNECTION_ALIVE)
×
1492
            }
1493
            PeerMessage::BlockProposal(block) => {
1✔
1494
                info!("Got block proposal from peer.");
1✔
1495

1496
                let should_punish = {
1✔
1497
                    log_slow_scope!(fn_name!() + "::PeerMessage::BlockProposal::should_punish");
1✔
1498

1499
                    let (verdict, tip) = {
1✔
1500
                        let state = self.global_state_lock.lock_guard().await;
1✔
1501

1502
                        let verdict = state.favor_incoming_block_proposal(
1✔
1503
                            block.header().height,
1✔
1504
                            block.total_guesser_reward(),
1✔
1505
                        );
1✔
1506
                        let tip = state.chain.light_state().to_owned();
1✔
1507
                        (verdict, tip)
1✔
1508
                    };
1509

1510
                    if let Err(rejection_reason) = verdict {
1✔
1511
                        match rejection_reason {
×
1512
                            // no need to punish and log if the fees are equal.  we just ignore the incoming proposal.
1513
                            BlockProposalRejectError::InsufficientFee { current, received }
×
1514
                                if Some(received) == current =>
×
1515
                            {
×
1516
                                debug!("ignoring new block proposal because the fee is equal to the present one");
×
1517
                                None
×
1518
                            }
1519
                            _ => {
1520
                                warn!("Rejecting new block proposal:\n{rejection_reason}");
×
1521
                                Some(NegativePeerSanction::NonFavorableBlockProposal)
×
1522
                            }
1523
                        }
1524
                    } else {
1525
                        // Verify validity and that proposal is child of current tip
1526
                        if block.is_valid(&tip, self.now()).await {
1✔
1527
                            None // all is well, no punishment.
1✔
1528
                        } else {
1529
                            Some(NegativePeerSanction::InvalidBlockProposal)
×
1530
                        }
1531
                    }
1532
                };
1533

1534
                if let Some(sanction) = should_punish {
1✔
1535
                    self.punish(sanction).await?;
×
1536
                } else {
1537
                    self.send_to_main(PeerTaskToMain::BlockProposal(block), line!())
1✔
1538
                        .await?;
1✔
1539

1540
                    // Valuable, new, hard-to-produce information. Reward peer.
1541
                    self.reward(PositivePeerSanction::NewBlockProposal).await?;
1✔
1542
                }
1543

1544
                Ok(KEEP_CONNECTION_ALIVE)
1✔
1545
            }
1546
        }
1547
    }
89✔
1548

1549
    /// send msg to main via mpsc channel `to_main_tx` and logs if slow.
1550
    ///
1551
    /// the channel could potentially fill up in which case the send() will
1552
    /// block until there is capacity.  we wrap the send() so we can log if
1553
    /// that ever happens to the extent it passes slow-scope threshold.
1554
    async fn send_to_main(
1✔
1555
        &self,
1✔
1556
        msg: PeerTaskToMain,
1✔
1557
        line: u32,
1✔
1558
    ) -> Result<(), tokio::sync::mpsc::error::SendError<PeerTaskToMain>> {
1✔
1559
        // we measure across the send() in case the channel ever fills up.
1✔
1560
        log_slow_scope!(fn_name!() + &format!("peer_loop.rs:{}", line));
1✔
1561

1✔
1562
        self.to_main_tx.send(msg).await
1✔
1563
    }
1✔
1564

1565
    /// Handle message from main task. The boolean return value indicates if
1566
    /// the connection should be closed.
1567
    ///
1568
    /// Locking:
1569
    ///   * acquires `global_state_lock` for write via Self::punish()
1570
    async fn handle_main_task_message<S>(
×
1571
        &mut self,
×
1572
        msg: MainToPeerTask,
×
1573
        peer: &mut S,
×
1574
        peer_state_info: &mut MutablePeerState,
×
1575
    ) -> Result<bool>
×
1576
    where
×
1577
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
×
1578
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
×
1579
        <S as TryStream>::Error: std::error::Error,
×
1580
    {
×
1581
        debug!("Handling {} message from main in peer loop", msg.get_type());
×
1582
        match msg {
×
1583
            MainToPeerTask::Block(block) => {
×
1584
                // We don't currently differentiate whether a new block came from a peer, or from our
×
1585
                // own miner. It's always shared through this logic.
×
1586
                let new_block_height = block.kernel.header.height;
×
1587
                if new_block_height > peer_state_info.highest_shared_block_height {
×
1588
                    debug!("Sending PeerMessage::BlockNotification");
×
1589
                    peer_state_info.highest_shared_block_height = new_block_height;
×
1590
                    peer.send(PeerMessage::BlockNotification(block.as_ref().into()))
×
1591
                        .await?;
×
1592
                    debug!("Sent PeerMessage::BlockNotification");
×
1593
                }
×
1594
                Ok(KEEP_CONNECTION_ALIVE)
×
1595
            }
1596
            MainToPeerTask::RequestBlockBatch(batch_block_request) => {
×
1597
                // Only ask one of the peers about the batch of blocks
×
1598
                if batch_block_request.peer_addr_target != self.peer_address {
×
1599
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1600
                }
×
1601

×
1602
                let max_response_len = std::cmp::min(
×
1603
                    STANDARD_BLOCK_BATCH_SIZE,
×
1604
                    self.global_state_lock.cli().sync_mode_threshold,
×
1605
                );
×
1606

×
1607
                peer.send(PeerMessage::BlockRequestBatch(BlockRequestBatch {
×
1608
                    known_blocks: batch_block_request.known_blocks,
×
1609
                    max_response_len,
×
1610
                    anchor: batch_block_request.anchor_mmr,
×
1611
                }))
×
1612
                .await?;
×
1613

1614
                Ok(KEEP_CONNECTION_ALIVE)
×
1615
            }
1616
            MainToPeerTask::PeerSynchronizationTimeout(socket_addr) => {
×
1617
                log_slow_scope!(fn_name!() + "::MainToPeerTask::PeerSynchronizationTimeout");
×
1618

×
1619
                if self.peer_address != socket_addr {
×
1620
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1621
                }
×
1622

×
1623
                self.punish(NegativePeerSanction::SynchronizationTimeout)
×
1624
                    .await?;
×
1625

1626
                // If this peer failed the last synchronization attempt, we only
1627
                // sanction, we don't disconnect.
1628
                Ok(KEEP_CONNECTION_ALIVE)
×
1629
            }
1630
            MainToPeerTask::MakePeerDiscoveryRequest => {
1631
                peer.send(PeerMessage::PeerListRequest).await?;
×
1632
                Ok(KEEP_CONNECTION_ALIVE)
×
1633
            }
1634
            MainToPeerTask::Disconnect(peer_address) => {
×
1635
                log_slow_scope!(fn_name!() + "::MainToPeerTask::Disconnect");
×
1636

×
1637
                // Only disconnect from the peer the main task requested a disconnect for.
×
1638
                if peer_address != self.peer_address {
×
1639
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1640
                }
×
1641
                self.register_peer_disconnection().await;
×
1642

1643
                Ok(DISCONNECT_CONNECTION)
×
1644
            }
1645
            MainToPeerTask::DisconnectAll() => {
1646
                self.register_peer_disconnection().await;
×
1647

1648
                Ok(DISCONNECT_CONNECTION)
×
1649
            }
1650
            MainToPeerTask::MakeSpecificPeerDiscoveryRequest(target_socket_addr) => {
×
1651
                if target_socket_addr == self.peer_address {
×
1652
                    peer.send(PeerMessage::PeerListRequest).await?;
×
1653
                }
×
1654
                Ok(KEEP_CONNECTION_ALIVE)
×
1655
            }
1656
            MainToPeerTask::TransactionNotification(transaction_notification) => {
×
1657
                debug!("Sending PeerMessage::TransactionNotification");
×
1658
                peer.send(PeerMessage::TransactionNotification(
×
1659
                    transaction_notification,
×
1660
                ))
×
1661
                .await?;
×
1662
                debug!("Sent PeerMessage::TransactionNotification");
×
1663
                Ok(KEEP_CONNECTION_ALIVE)
×
1664
            }
1665
            MainToPeerTask::BlockProposalNotification(block_proposal_notification) => {
×
1666
                debug!("Sending PeerMessage::BlockProposalNotification");
×
1667
                peer.send(PeerMessage::BlockProposalNotification(
×
1668
                    block_proposal_notification,
×
1669
                ))
×
1670
                .await?;
×
1671
                debug!("Sent PeerMessage::BlockProposalNotification");
×
1672
                Ok(KEEP_CONNECTION_ALIVE)
×
1673
            }
1674
        }
1675
    }
×
1676

1677
    /// Loop for the peer tasks. Awaits either a message from the peer over TCP,
1678
    /// or a message from main over the main-to-peer-tasks broadcast channel.
1679
    async fn run<S>(
39✔
1680
        &mut self,
39✔
1681
        mut peer: S,
39✔
1682
        mut from_main_rx: broadcast::Receiver<MainToPeerTask>,
39✔
1683
        peer_state_info: &mut MutablePeerState,
39✔
1684
    ) -> Result<()>
39✔
1685
    where
39✔
1686
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
39✔
1687
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
39✔
1688
        <S as TryStream>::Error: std::error::Error,
39✔
1689
    {
39✔
1690
        loop {
1691
            select! {
89✔
1692
                // Handle peer messages
1693
                peer_message = peer.try_next() => {
89✔
1694
                    let peer_address = self.peer_address;
89✔
1695
                    let peer_message = match peer_message {
89✔
1696
                        Ok(message) => message,
89✔
1697
                        Err(err) => {
×
1698
                            let msg = format!("Error when receiving from peer: {peer_address}");
×
1699
                            error!("{msg}. Error: {err}");
×
1700
                            bail!("{msg}. Closing connection.");
×
1701
                        }
1702
                    };
1703
                    let Some(peer_message) = peer_message else {
89✔
1704
                        info!("Peer {peer_address} closed connection.");
×
1705
                        break;
×
1706
                    };
1707

1708
                    let syncing =
89✔
1709
                        self.global_state_lock.lock(|s| s.net.sync_anchor.is_some()).await;
89✔
1710
                    let message_type = peer_message.get_type();
89✔
1711
                    if peer_message.ignore_during_sync() && syncing {
89✔
1712
                        debug!(
×
1713
                            "Ignoring {message_type} message when syncing, from {peer_address}",
×
1714
                        );
1715
                        continue;
×
1716
                    }
89✔
1717
                    if peer_message.ignore_when_not_sync() && !syncing {
89✔
1718
                        debug!(
×
1719
                            "Ignoring {message_type} message when not syncing, from {peer_address}",
×
1720
                        );
1721
                        continue;
×
1722
                    }
89✔
1723

89✔
1724
                    match self
89✔
1725
                        .handle_peer_message(peer_message, &mut peer, peer_state_info)
89✔
1726
                        .await
89✔
1727
                    {
1728
                        Ok(false) => {}
50✔
1729
                        Ok(true) => {
1730
                            info!("Closing connection to {peer_address}");
38✔
1731
                            break;
38✔
1732
                        }
1733
                        Err(err) => {
1✔
1734
                            warn!("Closing connection to {peer_address} because of error {err}.");
1✔
1735
                            bail!("{err}");
1✔
1736
                        }
1737
                    };
1738
                }
1739

1740
                // Handle messages from main task
1741
                main_msg_res = from_main_rx.recv() => {
89✔
1742
                    let main_msg = main_msg_res
×
1743
                        .unwrap_or_else(|e| panic!("Failed to read from main loop: {e}"));
×
1744
                    let close_connection = self
×
1745
                        .handle_main_task_message(main_msg, &mut peer, peer_state_info)
×
1746
                        .await
×
1747
                        .unwrap_or_else(|err| {
×
1748
                            warn!("handle_main_task_message returned an error: {err}");
×
1749
                            true
×
1750
                        });
×
1751

×
1752
                    if close_connection {
×
1753
                        info!(
×
1754
                            "handle_main_task_message is closing the connection to {}",
×
1755
                            self.peer_address
1756
                        );
1757
                        break;
×
1758
                    }
×
1759
                }
1760
            }
1761
        }
1762

1763
        Ok(())
38✔
1764
    }
39✔
1765

1766
    /// Function called before entering the peer loop. Reads the potentially stored
1767
    /// peer standing from the database and does other book-keeping before entering
1768
    /// its final resting place: the `peer_loop`. Note that the peer has already been
1769
    /// accepted for a connection for this loop to be entered. So we don't need
1770
    /// to check the standing again.
1771
    ///
1772
    /// Locking:
1773
    ///   * acquires `global_state_lock` for write
1774
    pub(crate) async fn run_wrapper<S>(
32✔
1775
        &mut self,
32✔
1776
        mut peer: S,
32✔
1777
        from_main_rx: broadcast::Receiver<MainToPeerTask>,
32✔
1778
    ) -> Result<()>
32✔
1779
    where
32✔
1780
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
32✔
1781
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
32✔
1782
        <S as TryStream>::Error: std::error::Error,
32✔
1783
    {
32✔
1784
        const TIME_DIFFERENCE_WARN_THRESHOLD_IN_SECONDS: i128 = 120;
1785

1786
        let cli_args = self.global_state_lock.cli().clone();
32✔
1787
        let global_state = self.global_state_lock.lock_guard().await;
32✔
1788

1789
        let standing = global_state
32✔
1790
            .net
32✔
1791
            .peer_databases
32✔
1792
            .peer_standings
32✔
1793
            .get(self.peer_address.ip())
32✔
1794
            .await
32✔
1795
            .unwrap_or_else(|| PeerStanding::new(cli_args.peer_tolerance));
31✔
1796

31✔
1797
        // Add peer to peer map
31✔
1798
        let peer_connection_info = PeerConnectionInfo::new(
31✔
1799
            self.peer_handshake_data.listen_port,
31✔
1800
            self.peer_address,
31✔
1801
            self.inbound_connection,
31✔
1802
        );
31✔
1803
        let new_peer = PeerInfo::new(
31✔
1804
            peer_connection_info,
31✔
1805
            &self.peer_handshake_data,
31✔
1806
            SystemTime::now(),
31✔
1807
            cli_args.peer_tolerance,
31✔
1808
        )
31✔
1809
        .with_standing(standing);
31✔
1810

31✔
1811
        // If timestamps are different, we currently just log a warning.
31✔
1812
        let peer_clock_ahead_in_seconds = new_peer.time_difference_in_seconds();
31✔
1813
        let own_clock_ahead_in_seconds = -peer_clock_ahead_in_seconds;
31✔
1814
        if peer_clock_ahead_in_seconds > TIME_DIFFERENCE_WARN_THRESHOLD_IN_SECONDS
31✔
1815
            || own_clock_ahead_in_seconds > TIME_DIFFERENCE_WARN_THRESHOLD_IN_SECONDS
31✔
1816
        {
1817
            let own_datetime_utc: DateTime<Utc> =
×
1818
                new_peer.own_timestamp_connection_established.into();
×
1819
            let peer_datetime_utc: DateTime<Utc> =
×
1820
                new_peer.peer_timestamp_connection_established.into();
×
1821
            warn!(
×
1822
                "New peer {} disagrees with us about time. Peer reports time {} but our clock at handshake was {}.",
×
1823
                new_peer.connected_address(),
×
1824
                peer_datetime_utc.format("%Y-%m-%d %H:%M:%S"),
×
1825
                own_datetime_utc.format("%Y-%m-%d %H:%M:%S"));
×
1826
        }
31✔
1827

1828
        // There is potential for a race-condition in the peer_map here, as we've previously
1829
        // counted the number of entries and checked if instance ID was already connected. But
1830
        // this check could have been invalidated by other tasks so we perform it again
1831

1832
        if global_state
31✔
1833
            .net
31✔
1834
            .peer_map
31✔
1835
            .values()
31✔
1836
            .any(|pi| pi.instance_id() == self.peer_handshake_data.instance_id)
31✔
1837
        {
1838
            bail!("Attempted to connect to already connected peer. Aborting connection.");
×
1839
        }
31✔
1840

31✔
1841
        if global_state.net.peer_map.len() >= cli_args.max_num_peers {
31✔
1842
            bail!("Attempted to connect to more peers than allowed. Aborting connection.");
×
1843
        }
31✔
1844

31✔
1845
        if global_state.net.peer_map.contains_key(&self.peer_address) {
31✔
1846
            // This shouldn't be possible, unless the peer reports a different instance ID than
1847
            // for the other connection. Only a malignant client would do that.
1848
            bail!("Already connected to peer. Aborting connection");
×
1849
        }
31✔
1850
        drop(global_state);
31✔
1851

31✔
1852
        self.global_state_lock
31✔
1853
            .lock_mut(|s| s.net.peer_map.insert(self.peer_address, new_peer))
31✔
1854
            .await;
31✔
1855

1856
        // `MutablePeerState` contains the part of the peer-loop's state that is mutable
1857
        let mut peer_state = MutablePeerState::new(self.peer_handshake_data.tip_header.height);
31✔
1858

31✔
1859
        // If peer indicates more canonical block, request a block notification to catch up ASAP
31✔
1860
        if self.peer_handshake_data.tip_header.cumulative_proof_of_work
31✔
1861
            > self
31✔
1862
                .global_state_lock
31✔
1863
                .lock_guard()
31✔
1864
                .await
31✔
1865
                .chain
1866
                .light_state()
31✔
1867
                .kernel
1868
                .header
1869
                .cumulative_proof_of_work
1870
        {
1871
            // Send block notification request to catch up ASAP, in case we're
1872
            // behind the newly-connected peer.
1873
            peer.send(PeerMessage::BlockNotificationRequest).await?;
×
1874
        }
31✔
1875

1876
        let res = self.run(peer, from_main_rx, &mut peer_state).await;
31✔
1877
        debug!("Exited peer loop for {}", self.peer_address);
31✔
1878

1879
        close_peer_connected_callback(
31✔
1880
            self.global_state_lock.clone(),
31✔
1881
            self.peer_address,
31✔
1882
            &self.to_main_tx,
31✔
1883
        )
31✔
1884
        .await;
31✔
1885

1886
        debug!("Ending peer loop for {}", self.peer_address);
31✔
1887

1888
        // Return any error that `run` returned. Returning and not suppressing errors is a quite nice
1889
        // feature to have for testing purposes.
1890
        res
31✔
1891
    }
31✔
1892

1893
    /// Register graceful peer disconnection in the global state.
1894
    ///
1895
    /// See also [`NetworkingState::register_peer_disconnection`][1].
1896
    ///
1897
    /// # Locking:
1898
    ///   * acquires `global_state_lock` for write
1899
    ///
1900
    /// [1]: crate::models::state::networking_state::NetworkingState::register_peer_disconnection
1901
    async fn register_peer_disconnection(&mut self) {
×
1902
        let peer_id = self.peer_handshake_data.instance_id;
×
1903
        self.global_state_lock
×
1904
            .lock_guard_mut()
×
1905
            .await
×
1906
            .net
1907
            .register_peer_disconnection(peer_id, SystemTime::now());
×
1908
    }
×
1909
}
1910

1911
#[cfg(test)]
1912
mod peer_loop_tests {
1913
    use rand::rngs::StdRng;
1914
    use rand::Rng;
1915
    use rand::SeedableRng;
1916
    use tokio::sync::mpsc::error::TryRecvError;
1917
    use tracing_test::traced_test;
1918

1919
    use super::*;
1920
    use crate::config_models::cli_args;
1921
    use crate::config_models::network::Network;
1922
    use crate::job_queue::triton_vm::TritonVmJobQueue;
1923
    use crate::models::blockchain::block::block_header::TARGET_BLOCK_INTERVAL;
1924
    use crate::models::blockchain::type_scripts::native_currency_amount::NativeCurrencyAmount;
1925
    use crate::models::peer::peer_block_notifications::PeerBlockNotification;
1926
    use crate::models::peer::transaction_notification::TransactionNotification;
1927
    use crate::models::state::mempool::TransactionOrigin;
1928
    use crate::models::state::tx_proving_capability::TxProvingCapability;
1929
    use crate::models::state::wallet::utxo_notification::UtxoNotificationMedium;
1930
    use crate::models::state::wallet::wallet_entropy::WalletEntropy;
1931
    use crate::tests::shared::fake_valid_block_for_tests;
1932
    use crate::tests::shared::fake_valid_sequence_of_blocks_for_tests;
1933
    use crate::tests::shared::get_dummy_handshake_data_for_genesis;
1934
    use crate::tests::shared::get_dummy_peer_connection_data_genesis;
1935
    use crate::tests::shared::get_dummy_socket_address;
1936
    use crate::tests::shared::get_test_genesis_setup;
1937
    use crate::tests::shared::Action;
1938
    use crate::tests::shared::Mock;
1939

1940
    #[traced_test]
×
1941
    #[tokio::test]
1942
    async fn test_peer_loop_bye() -> Result<()> {
1✔
1943
        let mock = Mock::new(vec![Action::Read(PeerMessage::Bye)]);
1✔
1944

1✔
1945
        let (peer_broadcast_tx, _from_main_rx_clone, to_main_tx, _to_main_rx1, state_lock, hsd) =
1✔
1946
            get_test_genesis_setup(Network::Alpha, 2, cli_args::Args::default()).await?;
1✔
1947

1✔
1948
        let peer_address = get_dummy_socket_address(2);
1✔
1949
        let from_main_rx_clone = peer_broadcast_tx.subscribe();
1✔
1950
        let mut peer_loop_handler =
1✔
1951
            PeerLoopHandler::new(to_main_tx, state_lock.clone(), peer_address, hsd, true, 1);
1✔
1952
        peer_loop_handler
1✔
1953
            .run_wrapper(mock, from_main_rx_clone)
1✔
1954
            .await?;
1✔
1955

1✔
1956
        assert_eq!(
1✔
1957
            2,
1✔
1958
            state_lock.lock_guard().await.net.peer_map.len(),
1✔
1959
            "peer map length must be back to 2 after goodbye"
1✔
1960
        );
1✔
1961

1✔
1962
        Ok(())
1✔
1963
    }
1✔
1964

1965
    #[traced_test]
×
1966
    #[tokio::test]
1967
    async fn test_peer_loop_peer_list() {
1✔
1968
        let (peer_broadcast_tx, _from_main_rx_clone, to_main_tx, _to_main_rx1, state_lock, _hsd) =
1✔
1969
            get_test_genesis_setup(Network::Alpha, 2, cli_args::Args::default())
1✔
1970
                .await
1✔
1971
                .unwrap();
1✔
1972

1✔
1973
        let mut peer_infos = state_lock
1✔
1974
            .lock_guard()
1✔
1975
            .await
1✔
1976
            .net
1✔
1977
            .peer_map
1✔
1978
            .clone()
1✔
1979
            .into_values()
1✔
1980
            .collect::<Vec<_>>();
1✔
1981
        peer_infos.sort_by_cached_key(|x| x.connected_address());
2✔
1982
        let (peer_address0, instance_id0) = (
1✔
1983
            peer_infos[0].connected_address(),
1✔
1984
            peer_infos[0].instance_id(),
1✔
1985
        );
1✔
1986
        let (peer_address1, instance_id1) = (
1✔
1987
            peer_infos[1].connected_address(),
1✔
1988
            peer_infos[1].instance_id(),
1✔
1989
        );
1✔
1990

1✔
1991
        let (hsd2, sa2) = get_dummy_peer_connection_data_genesis(Network::Alpha, 2);
1✔
1992
        let expected_response = vec![
1✔
1993
            (peer_address0, instance_id0),
1✔
1994
            (peer_address1, instance_id1),
1✔
1995
            (sa2, hsd2.instance_id),
1✔
1996
        ];
1✔
1997
        let mock = Mock::new(vec![
1✔
1998
            Action::Read(PeerMessage::PeerListRequest),
1✔
1999
            Action::Write(PeerMessage::PeerListResponse(expected_response)),
1✔
2000
            Action::Read(PeerMessage::Bye),
1✔
2001
        ]);
1✔
2002

1✔
2003
        let from_main_rx_clone = peer_broadcast_tx.subscribe();
1✔
2004

1✔
2005
        let mut peer_loop_handler =
1✔
2006
            PeerLoopHandler::new(to_main_tx, state_lock.clone(), sa2, hsd2, true, 0);
1✔
2007
        peer_loop_handler
1✔
2008
            .run_wrapper(mock, from_main_rx_clone)
1✔
2009
            .await
1✔
2010
            .unwrap();
1✔
2011

1✔
2012
        assert_eq!(
1✔
2013
            2,
1✔
2014
            state_lock.lock_guard().await.net.peer_map.len(),
1✔
2015
            "peer map must have length 2 after saying goodbye to peer 2"
1✔
2016
        );
1✔
2017
    }
1✔
2018

2019
    #[traced_test]
×
2020
    #[tokio::test]
2021
    async fn different_genesis_test() -> Result<()> {
1✔
2022
        // In this scenario a peer provides another genesis block than what has been
1✔
2023
        // hardcoded. This should lead to the closing of the connection to this peer
1✔
2024
        // and a ban.
1✔
2025

1✔
2026
        let network = Network::Main;
1✔
2027
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, mut to_main_rx1, state_lock, hsd) =
1✔
2028
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
2029
        assert_eq!(1000, state_lock.cli().peer_tolerance);
1✔
2030
        let peer_address = get_dummy_socket_address(0);
1✔
2031

1✔
2032
        // Although the database is empty, `get_latest_block` still returns the genesis block,
1✔
2033
        // since that block is hardcoded.
1✔
2034
        let mut different_genesis_block = state_lock
1✔
2035
            .lock_guard()
1✔
2036
            .await
1✔
2037
            .chain
1✔
2038
            .archival_state()
1✔
2039
            .get_tip()
1✔
2040
            .await;
1✔
2041

1✔
2042
        different_genesis_block.set_header_nonce(StdRng::seed_from_u64(5550001).random());
1✔
2043
        let [block_1_with_different_genesis] = fake_valid_sequence_of_blocks_for_tests(
1✔
2044
            &different_genesis_block,
1✔
2045
            Timestamp::hours(1),
1✔
2046
            StdRng::seed_from_u64(5550001).random(),
1✔
2047
        )
1✔
2048
        .await;
1✔
2049
        let mock = Mock::new(vec![Action::Read(PeerMessage::Block(Box::new(
1✔
2050
            block_1_with_different_genesis.try_into().unwrap(),
1✔
2051
        )))]);
1✔
2052

1✔
2053
        let mut peer_loop_handler = PeerLoopHandler::new(
1✔
2054
            to_main_tx.clone(),
1✔
2055
            state_lock.clone(),
1✔
2056
            peer_address,
1✔
2057
            hsd,
1✔
2058
            true,
1✔
2059
            1,
1✔
2060
        );
1✔
2061
        let res = peer_loop_handler
1✔
2062
            .run_wrapper(mock, from_main_rx_clone)
1✔
2063
            .await;
1✔
2064
        assert!(
1✔
2065
            res.is_err(),
1✔
2066
            "run_wrapper must return failure when genesis is different"
1✔
2067
        );
1✔
2068

1✔
2069
        match to_main_rx1.recv().await {
1✔
2070
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
1✔
2071
            _ => bail!("Must receive remove of peer block max height"),
1✔
2072
        }
1✔
2073

1✔
2074
        // Verify that no further message was sent to main loop
1✔
2075
        match to_main_rx1.try_recv() {
1✔
2076
            Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (),
1✔
2077
            _ => bail!("Block notification must not be sent for block with invalid PoW"),
1✔
2078
        };
1✔
2079

1✔
2080
        drop(to_main_tx);
1✔
2081

1✔
2082
        let peer_standing = state_lock
1✔
2083
            .lock_guard()
1✔
2084
            .await
1✔
2085
            .net
1✔
2086
            .get_peer_standing_from_database(peer_address.ip())
1✔
2087
            .await;
1✔
2088
        assert_eq!(
1✔
2089
            -i32::from(state_lock.cli().peer_tolerance),
1✔
2090
            peer_standing.unwrap().standing
1✔
2091
        );
1✔
2092
        assert_eq!(
1✔
2093
            NegativePeerSanction::DifferentGenesis,
1✔
2094
            peer_standing.unwrap().latest_punishment.unwrap().0
1✔
2095
        );
1✔
2096

1✔
2097
        Ok(())
1✔
2098
    }
1✔
2099

2100
    #[traced_test]
×
2101
    #[tokio::test]
2102
    async fn node_does_not_record_disconnection_time_when_peer_initiates_disconnect() -> Result<()>
1✔
2103
    {
1✔
2104
        let args = cli_args::Args::default();
1✔
2105
        let network = args.network;
1✔
2106
        let (from_main_tx, from_main_rx, to_main_tx, to_main_rx, state_lock, _) =
1✔
2107
            get_test_genesis_setup(network, 0, args).await?;
1✔
2108

1✔
2109
        let peer_address = get_dummy_socket_address(0);
1✔
2110
        let peer_handshake_data = get_dummy_handshake_data_for_genesis(network);
1✔
2111
        let peer_id = peer_handshake_data.instance_id;
1✔
2112
        let mut peer_loop_handler = PeerLoopHandler::new(
1✔
2113
            to_main_tx,
1✔
2114
            state_lock.clone(),
1✔
2115
            peer_address,
1✔
2116
            peer_handshake_data,
1✔
2117
            true,
1✔
2118
            1,
1✔
2119
        );
1✔
2120
        let mock = Mock::new(vec![Action::Read(PeerMessage::Bye)]);
1✔
2121
        peer_loop_handler.run_wrapper(mock, from_main_rx).await?;
1✔
2122

1✔
2123
        let global_state = state_lock.lock_guard().await;
1✔
2124
        assert!(global_state
1✔
2125
            .net
1✔
2126
            .last_disconnection_time_of_peer(peer_id)
1✔
2127
            .is_none());
1✔
2128

1✔
2129
        drop(to_main_rx);
1✔
2130
        drop(from_main_tx);
1✔
2131

1✔
2132
        Ok(())
1✔
2133
    }
1✔
2134

2135
    #[traced_test]
×
2136
    #[tokio::test]
2137
    async fn block_without_valid_pow_test() -> Result<()> {
1✔
2138
        // In this scenario, a block without a valid PoW is received. This block should be rejected
1✔
2139
        // by the peer loop and a notification should never reach the main loop.
1✔
2140

1✔
2141
        let network = Network::Main;
1✔
2142
        let (peer_broadcast_tx, _from_main_rx_clone, to_main_tx, mut to_main_rx1, state_lock, hsd) =
1✔
2143
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
2144
        let peer_address = get_dummy_socket_address(0);
1✔
2145
        let genesis_block: Block = state_lock
1✔
2146
            .lock_guard()
1✔
2147
            .await
1✔
2148
            .chain
1✔
2149
            .archival_state()
1✔
2150
            .get_tip()
1✔
2151
            .await;
1✔
2152

1✔
2153
        // Make a with hash above what the implied threshold from
1✔
2154
        let [mut block_without_valid_pow] = fake_valid_sequence_of_blocks_for_tests(
1✔
2155
            &genesis_block,
1✔
2156
            Timestamp::hours(1),
1✔
2157
            StdRng::seed_from_u64(5550001).random(),
1✔
2158
        )
1✔
2159
        .await;
1✔
2160

1✔
2161
        // This *probably* is invalid PoW -- and needs to be for this test to
1✔
2162
        // work.
1✔
2163
        block_without_valid_pow.set_header_nonce(Digest::default());
1✔
2164

1✔
2165
        // Sending an invalid block will not necessarily result in a ban. This depends on the peer
1✔
2166
        // tolerance that is set in the client. For this reason, we include a "Bye" here.
1✔
2167
        let mock = Mock::new(vec![
1✔
2168
            Action::Read(PeerMessage::Block(Box::new(
1✔
2169
                block_without_valid_pow.clone().try_into().unwrap(),
1✔
2170
            ))),
1✔
2171
            Action::Read(PeerMessage::Bye),
1✔
2172
        ]);
1✔
2173

1✔
2174
        let from_main_rx_clone = peer_broadcast_tx.subscribe();
1✔
2175

1✔
2176
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
2177
            to_main_tx.clone(),
1✔
2178
            state_lock.clone(),
1✔
2179
            peer_address,
1✔
2180
            hsd,
1✔
2181
            true,
1✔
2182
            1,
1✔
2183
            block_without_valid_pow.header().timestamp,
1✔
2184
        );
1✔
2185
        peer_loop_handler
1✔
2186
            .run_wrapper(mock, from_main_rx_clone)
1✔
2187
            .await
1✔
2188
            .expect("sending (one) invalid block should not result in closed connection");
1✔
2189

1✔
2190
        match to_main_rx1.recv().await {
1✔
2191
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
1✔
2192
            _ => bail!("Must receive remove of peer block max height"),
1✔
2193
        }
1✔
2194

1✔
2195
        // Verify that no further message was sent to main loop
1✔
2196
        match to_main_rx1.try_recv() {
1✔
2197
            Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (),
1✔
2198
            _ => bail!("Block notification must not be sent for block with invalid PoW"),
1✔
2199
        };
1✔
2200

1✔
2201
        // We need to have the transmitter in scope until we have received from it
1✔
2202
        // otherwise the receiver will report the disconnected error when we attempt
1✔
2203
        // to read from it. And the purpose is to verify that the channel is empty,
1✔
2204
        // not that it has been closed.
1✔
2205
        drop(to_main_tx);
1✔
2206

1✔
2207
        // Verify that peer standing was stored in database
1✔
2208
        let standing = state_lock
1✔
2209
            .lock_guard()
1✔
2210
            .await
1✔
2211
            .net
1✔
2212
            .peer_databases
1✔
2213
            .peer_standings
1✔
2214
            .get(peer_address.ip())
1✔
2215
            .await
1✔
2216
            .unwrap();
1✔
2217
        assert!(
1✔
2218
            standing.standing < 0,
1✔
2219
            "Peer must be sanctioned for sending a bad block"
1✔
2220
        );
1✔
2221

1✔
2222
        Ok(())
1✔
2223
    }
1✔
2224

2225
    #[traced_test]
×
2226
    #[tokio::test]
2227
    async fn test_peer_loop_block_with_block_in_db() -> Result<()> {
1✔
2228
        // The scenario tested here is that a client receives a block that is already
1✔
2229
        // known and stored. The expected behavior is to ignore the block and not send
1✔
2230
        // a message to the main task.
1✔
2231

1✔
2232
        let network = Network::Main;
1✔
2233
        let (peer_broadcast_tx, _from_main_rx_clone, to_main_tx, mut to_main_rx1, mut alice, hsd) =
1✔
2234
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
2235
        let peer_address = get_dummy_socket_address(0);
1✔
2236
        let genesis_block: Block = Block::genesis(network);
1✔
2237

1✔
2238
        let now = genesis_block.header().timestamp + Timestamp::hours(1);
1✔
2239
        let block_1 =
1✔
2240
            fake_valid_block_for_tests(&alice, StdRng::seed_from_u64(5550001).random()).await;
1✔
2241
        assert!(
1✔
2242
            block_1.is_valid(&genesis_block, now).await,
1✔
2243
            "Block must be valid for this test to make sense"
1✔
2244
        );
1✔
2245
        alice.set_new_tip(block_1.clone()).await?;
1✔
2246

1✔
2247
        let mock_peer_messages = Mock::new(vec![
1✔
2248
            Action::Read(PeerMessage::Block(Box::new(
1✔
2249
                block_1.clone().try_into().unwrap(),
1✔
2250
            ))),
1✔
2251
            Action::Read(PeerMessage::Bye),
1✔
2252
        ]);
1✔
2253

1✔
2254
        let from_main_rx_clone = peer_broadcast_tx.subscribe();
1✔
2255

1✔
2256
        let mut alice_peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
2257
            to_main_tx.clone(),
1✔
2258
            alice.clone(),
1✔
2259
            peer_address,
1✔
2260
            hsd,
1✔
2261
            false,
1✔
2262
            1,
1✔
2263
            block_1.header().timestamp,
1✔
2264
        );
1✔
2265
        alice_peer_loop_handler
1✔
2266
            .run_wrapper(mock_peer_messages, from_main_rx_clone)
1✔
2267
            .await?;
1✔
2268

1✔
2269
        match to_main_rx1.recv().await {
1✔
2270
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
1✔
2271
            other => bail!("Must receive remove of peer block max height. Got:\n {other:?}"),
1✔
2272
        }
1✔
2273
        match to_main_rx1.try_recv() {
1✔
2274
            Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (),
1✔
2275
            _ => bail!("Block notification must not be sent for block with invalid PoW"),
1✔
2276
        };
1✔
2277
        drop(to_main_tx);
1✔
2278

1✔
2279
        if !alice.lock_guard().await.net.peer_map.is_empty() {
1✔
2280
            bail!("peer map must be empty after closing connection gracefully");
1✔
2281
        }
1✔
2282

1✔
2283
        Ok(())
1✔
2284
    }
1✔
2285

2286
    #[traced_test]
×
2287
    #[tokio::test]
2288
    async fn block_request_batch_simple() {
1✔
2289
        // Scenario: Six blocks (including genesis) are known. Peer requests
1✔
2290
        // from all possible starting points, and client responds with the
1✔
2291
        // correct list of blocks.
1✔
2292
        let network = Network::Main;
1✔
2293
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, mut state_lock, hsd) =
1✔
2294
            get_test_genesis_setup(network, 0, cli_args::Args::default())
1✔
2295
                .await
1✔
2296
                .unwrap();
1✔
2297
        let genesis_block: Block = Block::genesis(network);
1✔
2298
        let peer_address = get_dummy_socket_address(0);
1✔
2299
        let [block_1, block_2, block_3, block_4, block_5] =
1✔
2300
            fake_valid_sequence_of_blocks_for_tests(
1✔
2301
                &genesis_block,
1✔
2302
                Timestamp::hours(1),
1✔
2303
                StdRng::seed_from_u64(5550001).random(),
1✔
2304
            )
1✔
2305
            .await;
1✔
2306
        let blocks = vec![
1✔
2307
            genesis_block,
1✔
2308
            block_1,
1✔
2309
            block_2,
1✔
2310
            block_3,
1✔
2311
            block_4,
1✔
2312
            block_5.clone(),
1✔
2313
        ];
1✔
2314
        for block in blocks.iter().skip(1) {
5✔
2315
            state_lock.set_new_tip(block.to_owned()).await.unwrap();
5✔
2316
        }
1✔
2317

1✔
2318
        let mmra = state_lock
1✔
2319
            .lock_guard()
1✔
2320
            .await
1✔
2321
            .chain
1✔
2322
            .archival_state()
1✔
2323
            .archival_block_mmr
1✔
2324
            .ammr()
1✔
2325
            .to_accumulator_async()
1✔
2326
            .await;
1✔
2327
        for i in 0..=4 {
6✔
2328
            let expected_response = {
5✔
2329
                let state = state_lock.lock_guard().await;
5✔
2330
                let blocks_for_response = blocks.iter().skip(i + 1).cloned().collect_vec();
5✔
2331
                PeerLoopHandler::batch_response(&state, blocks_for_response, &mmra)
5✔
2332
                    .await
5✔
2333
                    .unwrap()
5✔
2334
            };
5✔
2335
            let mock = Mock::new(vec![
5✔
2336
                Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch {
5✔
2337
                    known_blocks: vec![blocks[i].hash()],
5✔
2338
                    max_response_len: 14,
5✔
2339
                    anchor: mmra.clone(),
5✔
2340
                })),
5✔
2341
                Action::Write(PeerMessage::BlockResponseBatch(expected_response)),
5✔
2342
                Action::Read(PeerMessage::Bye),
5✔
2343
            ]);
5✔
2344
            let mut peer_loop_handler = PeerLoopHandler::new(
5✔
2345
                to_main_tx.clone(),
5✔
2346
                state_lock.clone(),
5✔
2347
                peer_address,
5✔
2348
                hsd.clone(),
5✔
2349
                false,
5✔
2350
                1,
5✔
2351
            );
5✔
2352

5✔
2353
            peer_loop_handler
5✔
2354
                .run_wrapper(mock, from_main_rx_clone.resubscribe())
5✔
2355
                .await
5✔
2356
                .unwrap();
5✔
2357
        }
1✔
2358
    }
1✔
2359

2360
    #[traced_test]
×
2361
    #[tokio::test]
2362
    async fn block_request_batch_in_order_test() -> Result<()> {
1✔
2363
        // Scenario: A fork began at block 2, node knows two blocks of height 2 and two of height 3.
1✔
2364
        // A peer requests a batch of blocks starting from block 1. Ensure that the correct blocks
1✔
2365
        // are returned.
1✔
2366

1✔
2367
        let network = Network::Main;
1✔
2368
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, mut state_lock, hsd) =
1✔
2369
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
2370
        let genesis_block: Block = Block::genesis(network);
1✔
2371
        let peer_address = get_dummy_socket_address(0);
1✔
2372
        let [block_1, block_2_a, block_3_a] = fake_valid_sequence_of_blocks_for_tests(
1✔
2373
            &genesis_block,
1✔
2374
            Timestamp::hours(1),
1✔
2375
            StdRng::seed_from_u64(5550001).random(),
1✔
2376
        )
1✔
2377
        .await;
1✔
2378
        let [block_2_b, block_3_b] = fake_valid_sequence_of_blocks_for_tests(
1✔
2379
            &block_1,
1✔
2380
            Timestamp::hours(1),
1✔
2381
            StdRng::seed_from_u64(5550002).random(),
1✔
2382
        )
1✔
2383
        .await;
1✔
2384
        assert_ne!(block_2_b.hash(), block_2_a.hash());
1✔
2385

1✔
2386
        state_lock.set_new_tip(block_1.clone()).await?;
1✔
2387
        state_lock.set_new_tip(block_2_a.clone()).await?;
1✔
2388
        state_lock.set_new_tip(block_2_b.clone()).await?;
1✔
2389
        state_lock.set_new_tip(block_3_b.clone()).await?;
1✔
2390
        state_lock.set_new_tip(block_3_a.clone()).await?;
1✔
2391

1✔
2392
        let anchor = state_lock
1✔
2393
            .lock_guard()
1✔
2394
            .await
1✔
2395
            .chain
1✔
2396
            .archival_state()
1✔
2397
            .archival_block_mmr
1✔
2398
            .ammr()
1✔
2399
            .to_accumulator_async()
1✔
2400
            .await;
1✔
2401
        let response_1 = {
1✔
2402
            let state_lock = state_lock.lock_guard().await;
1✔
2403
            PeerLoopHandler::batch_response(
1✔
2404
                &state_lock,
1✔
2405
                vec![block_1.clone(), block_2_a.clone(), block_3_a.clone()],
1✔
2406
                &anchor,
1✔
2407
            )
1✔
2408
            .await
1✔
2409
            .unwrap()
1✔
2410
        };
1✔
2411

1✔
2412
        let mut mock = Mock::new(vec![
1✔
2413
            Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch {
1✔
2414
                known_blocks: vec![genesis_block.hash()],
1✔
2415
                max_response_len: 14,
1✔
2416
                anchor: anchor.clone(),
1✔
2417
            })),
1✔
2418
            Action::Write(PeerMessage::BlockResponseBatch(response_1)),
1✔
2419
            Action::Read(PeerMessage::Bye),
1✔
2420
        ]);
1✔
2421

1✔
2422
        let mut peer_loop_handler_1 = PeerLoopHandler::with_mocked_time(
1✔
2423
            to_main_tx.clone(),
1✔
2424
            state_lock.clone(),
1✔
2425
            peer_address,
1✔
2426
            hsd.clone(),
1✔
2427
            false,
1✔
2428
            1,
1✔
2429
            block_3_a.header().timestamp,
1✔
2430
        );
1✔
2431

1✔
2432
        peer_loop_handler_1
1✔
2433
            .run_wrapper(mock, from_main_rx_clone.resubscribe())
1✔
2434
            .await?;
1✔
2435

1✔
2436
        // Peer knows block 2_b, verify that canonical chain with 2_a is returned
1✔
2437
        let response_2 = {
1✔
2438
            let state_lock = state_lock.lock_guard().await;
1✔
2439
            PeerLoopHandler::batch_response(
1✔
2440
                &state_lock,
1✔
2441
                vec![block_2_a, block_3_a.clone()],
1✔
2442
                &anchor,
1✔
2443
            )
1✔
2444
            .await
1✔
2445
            .unwrap()
1✔
2446
        };
1✔
2447
        mock = Mock::new(vec![
1✔
2448
            Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch {
1✔
2449
                known_blocks: vec![block_2_b.hash(), block_1.hash(), genesis_block.hash()],
1✔
2450
                max_response_len: 14,
1✔
2451
                anchor,
1✔
2452
            })),
1✔
2453
            Action::Write(PeerMessage::BlockResponseBatch(response_2)),
1✔
2454
            Action::Read(PeerMessage::Bye),
1✔
2455
        ]);
1✔
2456

1✔
2457
        let mut peer_loop_handler_2 = PeerLoopHandler::with_mocked_time(
1✔
2458
            to_main_tx.clone(),
1✔
2459
            state_lock.clone(),
1✔
2460
            peer_address,
1✔
2461
            hsd,
1✔
2462
            false,
1✔
2463
            1,
1✔
2464
            block_3_a.header().timestamp,
1✔
2465
        );
1✔
2466

1✔
2467
        peer_loop_handler_2
1✔
2468
            .run_wrapper(mock, from_main_rx_clone)
1✔
2469
            .await?;
1✔
2470

1✔
2471
        Ok(())
1✔
2472
    }
1✔
2473

2474
    #[traced_test]
×
2475
    #[tokio::test]
2476
    async fn block_request_batch_out_of_order_test() -> Result<()> {
1✔
2477
        // Scenario: A fork began at block 2, node knows two blocks of height 2 and two of height 3.
1✔
2478
        // A peer requests a batch of blocks starting from block 1, but the peer supplies their
1✔
2479
        // hashes in a wrong order. Ensure that the correct blocks are returned, in the right order.
1✔
2480
        // The blocks will be supplied in the correct order but starting from the first digest in
1✔
2481
        // the list that is known and canonical.
1✔
2482

1✔
2483
        let network = Network::Main;
1✔
2484
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, mut state_lock, hsd) =
1✔
2485
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
2486
        let genesis_block = Block::genesis(network);
1✔
2487
        let peer_address = get_dummy_socket_address(0);
1✔
2488
        let [block_1, block_2_a, block_3_a] = fake_valid_sequence_of_blocks_for_tests(
1✔
2489
            &genesis_block,
1✔
2490
            Timestamp::hours(1),
1✔
2491
            StdRng::seed_from_u64(5550001).random(),
1✔
2492
        )
1✔
2493
        .await;
1✔
2494
        let [block_2_b, block_3_b] = fake_valid_sequence_of_blocks_for_tests(
1✔
2495
            &block_1,
1✔
2496
            Timestamp::hours(1),
1✔
2497
            StdRng::seed_from_u64(5550002).random(),
1✔
2498
        )
1✔
2499
        .await;
1✔
2500
        assert_ne!(block_2_a.hash(), block_2_b.hash());
1✔
2501

1✔
2502
        state_lock.set_new_tip(block_1.clone()).await?;
1✔
2503
        state_lock.set_new_tip(block_2_a.clone()).await?;
1✔
2504
        state_lock.set_new_tip(block_2_b.clone()).await?;
1✔
2505
        state_lock.set_new_tip(block_3_b.clone()).await?;
1✔
2506
        state_lock.set_new_tip(block_3_a.clone()).await?;
1✔
2507

1✔
2508
        // Peer knows block 2_b, verify that canonical chain with 2_a is returned
1✔
2509
        let mut expected_anchor = block_3_a.body().block_mmr_accumulator.clone();
1✔
2510
        expected_anchor.append(block_3_a.hash());
1✔
2511
        let state_anchor = state_lock
1✔
2512
            .lock_guard()
1✔
2513
            .await
1✔
2514
            .chain
1✔
2515
            .archival_state()
1✔
2516
            .archival_block_mmr
1✔
2517
            .ammr()
1✔
2518
            .to_accumulator_async()
1✔
2519
            .await;
1✔
2520
        assert_eq!(
1✔
2521
            expected_anchor, state_anchor,
1✔
2522
            "Catching assumption about MMRA in tip and in archival state"
1✔
2523
        );
1✔
2524

1✔
2525
        let response = {
1✔
2526
            let state_lock = state_lock.lock_guard().await;
1✔
2527
            PeerLoopHandler::batch_response(
1✔
2528
                &state_lock,
1✔
2529
                vec![block_1.clone(), block_2_a, block_3_a.clone()],
1✔
2530
                &expected_anchor,
1✔
2531
            )
1✔
2532
            .await
1✔
2533
            .unwrap()
1✔
2534
        };
1✔
2535
        let mock = Mock::new(vec![
1✔
2536
            Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch {
1✔
2537
                known_blocks: vec![block_2_b.hash(), genesis_block.hash(), block_1.hash()],
1✔
2538
                max_response_len: 14,
1✔
2539
                anchor: expected_anchor,
1✔
2540
            })),
1✔
2541
            // Since genesis block is the 1st known in the list of known blocks,
1✔
2542
            // it's immediate descendent, block_1, is the first one returned.
1✔
2543
            Action::Write(PeerMessage::BlockResponseBatch(response)),
1✔
2544
            Action::Read(PeerMessage::Bye),
1✔
2545
        ]);
1✔
2546

1✔
2547
        let mut peer_loop_handler_2 = PeerLoopHandler::with_mocked_time(
1✔
2548
            to_main_tx.clone(),
1✔
2549
            state_lock.clone(),
1✔
2550
            peer_address,
1✔
2551
            hsd,
1✔
2552
            false,
1✔
2553
            1,
1✔
2554
            block_3_a.header().timestamp,
1✔
2555
        );
1✔
2556

1✔
2557
        peer_loop_handler_2
1✔
2558
            .run_wrapper(mock, from_main_rx_clone)
1✔
2559
            .await?;
1✔
2560

1✔
2561
        Ok(())
1✔
2562
    }
1✔
2563

2564
    #[traced_test]
×
2565
    #[tokio::test]
2566
    async fn request_unknown_height_doesnt_crash() {
1✔
2567
        // Scenario: Only genesis block is known. Peer requests block of height
1✔
2568
        // 2.
1✔
2569
        let network = Network::Main;
1✔
2570
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, state_lock, hsd) =
1✔
2571
            get_test_genesis_setup(network, 0, cli_args::Args::default())
1✔
2572
                .await
1✔
2573
                .unwrap();
1✔
2574
        let peer_address = get_dummy_socket_address(0);
1✔
2575
        let mock = Mock::new(vec![
1✔
2576
            Action::Read(PeerMessage::BlockRequestByHeight(2.into())),
1✔
2577
            Action::Read(PeerMessage::Bye),
1✔
2578
        ]);
1✔
2579

1✔
2580
        let mut peer_loop_handler = PeerLoopHandler::new(
1✔
2581
            to_main_tx.clone(),
1✔
2582
            state_lock.clone(),
1✔
2583
            peer_address,
1✔
2584
            hsd,
1✔
2585
            false,
1✔
2586
            1,
1✔
2587
        );
1✔
2588

1✔
2589
        // This will return error if seen read/write order does not match that of the
1✔
2590
        // mocked object.
1✔
2591
        peer_loop_handler
1✔
2592
            .run_wrapper(mock, from_main_rx_clone)
1✔
2593
            .await
1✔
2594
            .unwrap();
1✔
2595

1✔
2596
        // Verify that peer is sanctioned for this nonsense.
1✔
2597
        assert!(state_lock
1✔
2598
            .lock_guard()
1✔
2599
            .await
1✔
2600
            .net
1✔
2601
            .get_peer_standing_from_database(peer_address.ip())
1✔
2602
            .await
1✔
2603
            .unwrap()
1✔
2604
            .standing
1✔
2605
            .is_negative());
1✔
2606
    }
1✔
2607

2608
    #[traced_test]
×
2609
    #[tokio::test]
2610
    async fn find_canonical_chain_when_multiple_blocks_at_same_height_test() -> Result<()> {
1✔
2611
        // Scenario: A fork began at block 2, node knows two blocks of height 2 and two of height 3.
1✔
2612
        // A peer requests a block at height 2. Verify that the correct block at height 2 is
1✔
2613
        // returned.
1✔
2614

1✔
2615
        let network = Network::Main;
1✔
2616
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, mut state_lock, hsd) =
1✔
2617
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
2618
        let genesis_block = Block::genesis(network);
1✔
2619
        let peer_address = get_dummy_socket_address(0);
1✔
2620

1✔
2621
        let [block_1, block_2_a, block_3_a] = fake_valid_sequence_of_blocks_for_tests(
1✔
2622
            &genesis_block,
1✔
2623
            Timestamp::hours(1),
1✔
2624
            StdRng::seed_from_u64(5550001).random(),
1✔
2625
        )
1✔
2626
        .await;
1✔
2627
        let [block_2_b, block_3_b] = fake_valid_sequence_of_blocks_for_tests(
1✔
2628
            &block_1,
1✔
2629
            Timestamp::hours(1),
1✔
2630
            StdRng::seed_from_u64(5550002).random(),
1✔
2631
        )
1✔
2632
        .await;
1✔
2633
        assert_ne!(block_2_a.hash(), block_2_b.hash());
1✔
2634

1✔
2635
        state_lock.set_new_tip(block_1.clone()).await?;
1✔
2636
        state_lock.set_new_tip(block_2_a.clone()).await?;
1✔
2637
        state_lock.set_new_tip(block_2_b.clone()).await?;
1✔
2638
        state_lock.set_new_tip(block_3_b.clone()).await?;
1✔
2639
        state_lock.set_new_tip(block_3_a.clone()).await?;
1✔
2640

1✔
2641
        let mock = Mock::new(vec![
1✔
2642
            Action::Read(PeerMessage::BlockRequestByHeight(2.into())),
1✔
2643
            Action::Write(PeerMessage::Block(Box::new(block_2_a.try_into().unwrap()))),
1✔
2644
            Action::Read(PeerMessage::BlockRequestByHeight(3.into())),
1✔
2645
            Action::Write(PeerMessage::Block(Box::new(
1✔
2646
                block_3_a.clone().try_into().unwrap(),
1✔
2647
            ))),
1✔
2648
            Action::Read(PeerMessage::Bye),
1✔
2649
        ]);
1✔
2650

1✔
2651
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
2652
            to_main_tx.clone(),
1✔
2653
            state_lock.clone(),
1✔
2654
            peer_address,
1✔
2655
            hsd,
1✔
2656
            false,
1✔
2657
            1,
1✔
2658
            block_3_a.header().timestamp,
1✔
2659
        );
1✔
2660

1✔
2661
        // This will return error if seen read/write order does not match that of the
1✔
2662
        // mocked object.
1✔
2663
        peer_loop_handler
1✔
2664
            .run_wrapper(mock, from_main_rx_clone)
1✔
2665
            .await?;
1✔
2666

1✔
2667
        Ok(())
1✔
2668
    }
1✔
2669

2670
    #[traced_test]
×
2671
    #[tokio::test]
2672
    async fn receival_of_block_notification_height_1() {
1✔
2673
        // Scenario: client only knows genesis block. Then receives block
1✔
2674
        // notification of height 1. Must request block 1.
1✔
2675
        let network = Network::Main;
1✔
2676
        let mut rng = StdRng::seed_from_u64(5552401);
1✔
2677
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, to_main_rx1, state_lock, hsd) =
1✔
2678
            get_test_genesis_setup(network, 0, cli_args::Args::default())
1✔
2679
                .await
1✔
2680
                .unwrap();
1✔
2681
        let block_1 = fake_valid_block_for_tests(&state_lock, rng.random()).await;
1✔
2682
        let notification_height1 = (&block_1).into();
1✔
2683
        let mock = Mock::new(vec![
1✔
2684
            Action::Read(PeerMessage::BlockNotification(notification_height1)),
1✔
2685
            Action::Write(PeerMessage::BlockRequestByHeight(1u64.into())),
1✔
2686
            Action::Read(PeerMessage::Bye),
1✔
2687
        ]);
1✔
2688

1✔
2689
        let peer_address = get_dummy_socket_address(0);
1✔
2690
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
2691
            to_main_tx.clone(),
1✔
2692
            state_lock.clone(),
1✔
2693
            peer_address,
1✔
2694
            hsd,
1✔
2695
            false,
1✔
2696
            1,
1✔
2697
            block_1.header().timestamp,
1✔
2698
        );
1✔
2699
        peer_loop_handler
1✔
2700
            .run_wrapper(mock, from_main_rx_clone)
1✔
2701
            .await
1✔
2702
            .unwrap();
1✔
2703

1✔
2704
        drop(to_main_rx1);
1✔
2705
    }
1✔
2706

2707
    #[traced_test]
×
2708
    #[tokio::test]
2709
    async fn receive_block_request_by_height_block_7() {
1✔
2710
        // Scenario: client only knows blocks up to height 7. Then receives block-
1✔
2711
        // request-by-height for height 7. Must respond with block 7.
1✔
2712
        let network = Network::Main;
1✔
2713
        let mut rng = StdRng::seed_from_u64(5552401);
1✔
2714
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, to_main_rx1, mut state_lock, hsd) =
1✔
2715
            get_test_genesis_setup(network, 0, cli_args::Args::default())
1✔
2716
                .await
1✔
2717
                .unwrap();
1✔
2718
        let genesis_block = Block::genesis(network);
1✔
2719
        let blocks: [Block; 7] = fake_valid_sequence_of_blocks_for_tests(
1✔
2720
            &genesis_block,
1✔
2721
            Timestamp::hours(1),
1✔
2722
            rng.random(),
1✔
2723
        )
1✔
2724
        .await;
1✔
2725
        let block7 = blocks.last().unwrap().to_owned();
1✔
2726
        let tip_height: u64 = block7.header().height.into();
1✔
2727
        assert_eq!(7, tip_height);
1✔
2728

1✔
2729
        for block in &blocks {
8✔
2730
            state_lock.set_new_tip(block.to_owned()).await.unwrap();
7✔
2731
        }
1✔
2732

1✔
2733
        let block7_response = PeerMessage::Block(Box::new(block7.try_into().unwrap()));
1✔
2734
        let mock = Mock::new(vec![
1✔
2735
            Action::Read(PeerMessage::BlockRequestByHeight(7u64.into())),
1✔
2736
            Action::Write(block7_response),
1✔
2737
            Action::Read(PeerMessage::Bye),
1✔
2738
        ]);
1✔
2739

1✔
2740
        let peer_address = get_dummy_socket_address(0);
1✔
2741
        let mut peer_loop_handler = PeerLoopHandler::new(
1✔
2742
            to_main_tx.clone(),
1✔
2743
            state_lock.clone(),
1✔
2744
            peer_address,
1✔
2745
            hsd,
1✔
2746
            false,
1✔
2747
            1,
1✔
2748
        );
1✔
2749
        peer_loop_handler
1✔
2750
            .run_wrapper(mock, from_main_rx_clone)
1✔
2751
            .await
1✔
2752
            .unwrap();
1✔
2753

1✔
2754
        drop(to_main_rx1);
1✔
2755
    }
1✔
2756

2757
    #[traced_test]
×
2758
    #[tokio::test]
2759
    async fn test_peer_loop_receival_of_first_block() -> Result<()> {
1✔
2760
        // Scenario: client only knows genesis block. Then receives block 1.
1✔
2761

1✔
2762
        let network = Network::Main;
1✔
2763
        let mut rng = StdRng::seed_from_u64(5550001);
1✔
2764
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, mut to_main_rx1, state_lock, hsd) =
1✔
2765
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
2766
        let peer_address = get_dummy_socket_address(0);
1✔
2767

1✔
2768
        let block_1 = fake_valid_block_for_tests(&state_lock, rng.random()).await;
1✔
2769
        let mock = Mock::new(vec![
1✔
2770
            Action::Read(PeerMessage::Block(Box::new(
1✔
2771
                block_1.clone().try_into().unwrap(),
1✔
2772
            ))),
1✔
2773
            Action::Read(PeerMessage::Bye),
1✔
2774
        ]);
1✔
2775

1✔
2776
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
2777
            to_main_tx.clone(),
1✔
2778
            state_lock.clone(),
1✔
2779
            peer_address,
1✔
2780
            hsd,
1✔
2781
            false,
1✔
2782
            1,
1✔
2783
            block_1.header().timestamp,
1✔
2784
        );
1✔
2785
        peer_loop_handler
1✔
2786
            .run_wrapper(mock, from_main_rx_clone)
1✔
2787
            .await?;
1✔
2788

1✔
2789
        // Verify that a block was sent to `main_loop`
1✔
2790
        match to_main_rx1.recv().await {
1✔
2791
            Some(PeerTaskToMain::NewBlocks(_block)) => (),
1✔
2792
            _ => bail!("Did not find msg sent to main task"),
1✔
2793
        };
1✔
2794

1✔
2795
        match to_main_rx1.recv().await {
1✔
2796
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
1✔
2797
            _ => bail!("Must receive remove of peer block max height"),
1✔
2798
        }
1✔
2799

1✔
2800
        if !state_lock.lock_guard().await.net.peer_map.is_empty() {
1✔
2801
            bail!("peer map must be empty after closing connection gracefully");
1✔
2802
        }
1✔
2803

1✔
2804
        Ok(())
1✔
2805
    }
1✔
2806

2807
    #[traced_test]
×
2808
    #[tokio::test]
2809
    async fn test_peer_loop_receival_of_second_block_no_blocks_in_db() -> Result<()> {
1✔
2810
        // In this scenario, the client only knows the genesis block (block 0) and then
1✔
2811
        // receives block 2, meaning that block 1 will have to be requested.
1✔
2812

1✔
2813
        let network = Network::Main;
1✔
2814
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, mut to_main_rx1, state_lock, hsd) =
1✔
2815
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
2816
        let peer_address = get_dummy_socket_address(0);
1✔
2817
        let genesis_block: Block = state_lock
1✔
2818
            .lock_guard()
1✔
2819
            .await
1✔
2820
            .chain
1✔
2821
            .archival_state()
1✔
2822
            .get_tip()
1✔
2823
            .await;
1✔
2824
        let [block_1, block_2] = fake_valid_sequence_of_blocks_for_tests(
1✔
2825
            &genesis_block,
1✔
2826
            Timestamp::hours(1),
1✔
2827
            StdRng::seed_from_u64(5550001).random(),
1✔
2828
        )
1✔
2829
        .await;
1✔
2830

1✔
2831
        let mock = Mock::new(vec![
1✔
2832
            Action::Read(PeerMessage::Block(Box::new(
1✔
2833
                block_2.clone().try_into().unwrap(),
1✔
2834
            ))),
1✔
2835
            Action::Write(PeerMessage::BlockRequestByHash(block_1.hash())),
1✔
2836
            Action::Read(PeerMessage::Block(Box::new(
1✔
2837
                block_1.clone().try_into().unwrap(),
1✔
2838
            ))),
1✔
2839
            Action::Read(PeerMessage::Bye),
1✔
2840
        ]);
1✔
2841

1✔
2842
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
2843
            to_main_tx.clone(),
1✔
2844
            state_lock.clone(),
1✔
2845
            peer_address,
1✔
2846
            hsd,
1✔
2847
            true,
1✔
2848
            1,
1✔
2849
            block_2.header().timestamp,
1✔
2850
        );
1✔
2851
        peer_loop_handler
1✔
2852
            .run_wrapper(mock, from_main_rx_clone)
1✔
2853
            .await?;
1✔
2854

1✔
2855
        match to_main_rx1.recv().await {
1✔
2856
            Some(PeerTaskToMain::NewBlocks(blocks)) => {
1✔
2857
                if blocks[0].hash() != block_1.hash() {
1✔
2858
                    bail!("1st received block by main loop must be block 1");
1✔
2859
                }
1✔
2860
                if blocks[1].hash() != block_2.hash() {
1✔
2861
                    bail!("2nd received block by main loop must be block 2");
1✔
2862
                }
1✔
2863
            }
1✔
2864
            _ => bail!("Did not find msg sent to main task 1"),
1✔
2865
        };
1✔
2866
        match to_main_rx1.recv().await {
1✔
2867
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
1✔
2868
            _ => bail!("Must receive remove of peer block max height"),
1✔
2869
        }
1✔
2870

1✔
2871
        if !state_lock.lock_guard().await.net.peer_map.is_empty() {
1✔
2872
            bail!("peer map must be empty after closing connection gracefully");
1✔
2873
        }
1✔
2874

1✔
2875
        Ok(())
1✔
2876
    }
1✔
2877

2878
    #[traced_test]
×
2879
    #[tokio::test]
2880
    async fn prevent_ram_exhaustion_test() -> Result<()> {
1✔
2881
        // In this scenario the peer sends more blocks than the client allows to store in the
1✔
2882
        // fork-reconciliation field. This should result in abandonment of the fork-reconciliation
1✔
2883
        // process as the alternative is that the program will crash because it runs out of RAM.
1✔
2884

1✔
2885
        let network = Network::Main;
1✔
2886
        let mut rng = StdRng::seed_from_u64(5550001);
1✔
2887
        let (
1✔
2888
            _peer_broadcast_tx,
1✔
2889
            from_main_rx_clone,
1✔
2890
            to_main_tx,
1✔
2891
            mut to_main_rx1,
1✔
2892
            mut state_lock,
1✔
2893
            _hsd,
1✔
2894
        ) = get_test_genesis_setup(network, 1, cli_args::Args::default()).await?;
1✔
2895
        let genesis_block = Block::genesis(network);
1✔
2896

1✔
2897
        // Restrict max number of blocks held in memory to 2.
1✔
2898
        let mut cli = state_lock.cli().clone();
1✔
2899
        cli.sync_mode_threshold = 2;
1✔
2900
        state_lock.set_cli(cli).await;
1✔
2901

1✔
2902
        let (hsd1, peer_address1) = get_dummy_peer_connection_data_genesis(Network::Alpha, 1);
1✔
2903
        let [block_1, _block_2, block_3, block_4] = fake_valid_sequence_of_blocks_for_tests(
1✔
2904
            &genesis_block,
1✔
2905
            Timestamp::hours(1),
1✔
2906
            rng.random(),
1✔
2907
        )
1✔
2908
        .await;
1✔
2909
        state_lock.set_new_tip(block_1.clone()).await?;
1✔
2910

1✔
2911
        let mock = Mock::new(vec![
1✔
2912
            Action::Read(PeerMessage::Block(Box::new(
1✔
2913
                block_4.clone().try_into().unwrap(),
1✔
2914
            ))),
1✔
2915
            Action::Write(PeerMessage::BlockRequestByHash(block_3.hash())),
1✔
2916
            Action::Read(PeerMessage::Block(Box::new(
1✔
2917
                block_3.clone().try_into().unwrap(),
1✔
2918
            ))),
1✔
2919
            Action::Read(PeerMessage::Bye),
1✔
2920
        ]);
1✔
2921

1✔
2922
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
2923
            to_main_tx.clone(),
1✔
2924
            state_lock.clone(),
1✔
2925
            peer_address1,
1✔
2926
            hsd1,
1✔
2927
            true,
1✔
2928
            1,
1✔
2929
            block_4.header().timestamp,
1✔
2930
        );
1✔
2931
        peer_loop_handler
1✔
2932
            .run_wrapper(mock, from_main_rx_clone)
1✔
2933
            .await?;
1✔
2934

1✔
2935
        match to_main_rx1.recv().await {
1✔
2936
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
1✔
2937
            _ => bail!("Must receive remove of peer block max height"),
1✔
2938
        }
1✔
2939

1✔
2940
        // Verify that no block is sent to main loop.
1✔
2941
        match to_main_rx1.try_recv() {
1✔
2942
            Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (),
1✔
2943
            _ => bail!("Peer must not handle more fork-reconciliation blocks than specified in CLI arguments"),
1✔
2944
        };
1✔
2945
        drop(to_main_tx);
1✔
2946

1✔
2947
        // Verify that peer is sanctioned for failed fork reconciliation attempt
1✔
2948
        assert!(state_lock
1✔
2949
            .lock_guard()
1✔
2950
            .await
1✔
2951
            .net
1✔
2952
            .get_peer_standing_from_database(peer_address1.ip())
1✔
2953
            .await
1✔
2954
            .unwrap()
1✔
2955
            .standing
1✔
2956
            .is_negative());
1✔
2957

1✔
2958
        Ok(())
1✔
2959
    }
1✔
2960

2961
    #[traced_test]
×
2962
    #[tokio::test]
2963
    async fn test_peer_loop_receival_of_fourth_block_one_block_in_db() {
1✔
2964
        // In this scenario, the client know the genesis block (block 0) and block 1, it
1✔
2965
        // then receives block 4, meaning that block 3 and 2 will have to be requested.
1✔
2966

1✔
2967
        let network = Network::Main;
1✔
2968
        let (
1✔
2969
            _peer_broadcast_tx,
1✔
2970
            from_main_rx_clone,
1✔
2971
            to_main_tx,
1✔
2972
            mut to_main_rx1,
1✔
2973
            mut state_lock,
1✔
2974
            hsd,
1✔
2975
        ) = get_test_genesis_setup(network, 0, cli_args::Args::default())
1✔
2976
            .await
1✔
2977
            .unwrap();
1✔
2978
        let peer_address: SocketAddr = get_dummy_socket_address(0);
1✔
2979
        let genesis_block = Block::genesis(network);
1✔
2980
        let [block_1, block_2, block_3, block_4] = fake_valid_sequence_of_blocks_for_tests(
1✔
2981
            &genesis_block,
1✔
2982
            Timestamp::hours(1),
1✔
2983
            StdRng::seed_from_u64(5550001).random(),
1✔
2984
        )
1✔
2985
        .await;
1✔
2986
        state_lock.set_new_tip(block_1.clone()).await.unwrap();
1✔
2987

1✔
2988
        let mock = Mock::new(vec![
1✔
2989
            Action::Read(PeerMessage::Block(Box::new(
1✔
2990
                block_4.clone().try_into().unwrap(),
1✔
2991
            ))),
1✔
2992
            Action::Write(PeerMessage::BlockRequestByHash(block_3.hash())),
1✔
2993
            Action::Read(PeerMessage::Block(Box::new(
1✔
2994
                block_3.clone().try_into().unwrap(),
1✔
2995
            ))),
1✔
2996
            Action::Write(PeerMessage::BlockRequestByHash(block_2.hash())),
1✔
2997
            Action::Read(PeerMessage::Block(Box::new(
1✔
2998
                block_2.clone().try_into().unwrap(),
1✔
2999
            ))),
1✔
3000
            Action::Read(PeerMessage::Bye),
1✔
3001
        ]);
1✔
3002

1✔
3003
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
3004
            to_main_tx.clone(),
1✔
3005
            state_lock.clone(),
1✔
3006
            peer_address,
1✔
3007
            hsd,
1✔
3008
            true,
1✔
3009
            1,
1✔
3010
            block_4.header().timestamp,
1✔
3011
        );
1✔
3012
        peer_loop_handler
1✔
3013
            .run_wrapper(mock, from_main_rx_clone)
1✔
3014
            .await
1✔
3015
            .unwrap();
1✔
3016

1✔
3017
        let Some(PeerTaskToMain::NewBlocks(blocks)) = to_main_rx1.recv().await else {
1✔
3018
            panic!("Did not find msg sent to main task");
1✔
3019
        };
1✔
3020
        assert_eq!(blocks[0].hash(), block_2.hash());
1✔
3021
        assert_eq!(blocks[1].hash(), block_3.hash());
1✔
3022
        assert_eq!(blocks[2].hash(), block_4.hash());
1✔
3023

1✔
3024
        let Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) = to_main_rx1.recv().await else {
1✔
3025
            panic!("Must receive remove of peer block max height");
1✔
3026
        };
1✔
3027

1✔
3028
        assert!(
1✔
3029
            state_lock.lock_guard().await.net.peer_map.is_empty(),
1✔
3030
            "peer map must be empty after closing connection gracefully"
1✔
3031
        );
1✔
3032
    }
1✔
3033

3034
    #[traced_test]
×
3035
    #[tokio::test]
3036
    async fn test_peer_loop_receival_of_third_block_no_blocks_in_db() -> Result<()> {
1✔
3037
        // In this scenario, the client only knows the genesis block (block 0) and then
1✔
3038
        // receives block 3, meaning that block 2 and 1 will have to be requested.
1✔
3039

1✔
3040
        let network = Network::Main;
1✔
3041
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, mut to_main_rx1, state_lock, hsd) =
1✔
3042
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
3043
        let peer_address = get_dummy_socket_address(0);
1✔
3044
        let genesis_block = Block::genesis(network);
1✔
3045

1✔
3046
        let [block_1, block_2, block_3] = fake_valid_sequence_of_blocks_for_tests(
1✔
3047
            &genesis_block,
1✔
3048
            Timestamp::hours(1),
1✔
3049
            StdRng::seed_from_u64(5550001).random(),
1✔
3050
        )
1✔
3051
        .await;
1✔
3052

1✔
3053
        let mock = Mock::new(vec![
1✔
3054
            Action::Read(PeerMessage::Block(Box::new(
1✔
3055
                block_3.clone().try_into().unwrap(),
1✔
3056
            ))),
1✔
3057
            Action::Write(PeerMessage::BlockRequestByHash(block_2.hash())),
1✔
3058
            Action::Read(PeerMessage::Block(Box::new(
1✔
3059
                block_2.clone().try_into().unwrap(),
1✔
3060
            ))),
1✔
3061
            Action::Write(PeerMessage::BlockRequestByHash(block_1.hash())),
1✔
3062
            Action::Read(PeerMessage::Block(Box::new(
1✔
3063
                block_1.clone().try_into().unwrap(),
1✔
3064
            ))),
1✔
3065
            Action::Read(PeerMessage::Bye),
1✔
3066
        ]);
1✔
3067

1✔
3068
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
3069
            to_main_tx.clone(),
1✔
3070
            state_lock.clone(),
1✔
3071
            peer_address,
1✔
3072
            hsd,
1✔
3073
            true,
1✔
3074
            1,
1✔
3075
            block_3.header().timestamp,
1✔
3076
        );
1✔
3077
        peer_loop_handler
1✔
3078
            .run_wrapper(mock, from_main_rx_clone)
1✔
3079
            .await?;
1✔
3080

1✔
3081
        match to_main_rx1.recv().await {
1✔
3082
            Some(PeerTaskToMain::NewBlocks(blocks)) => {
1✔
3083
                if blocks[0].hash() != block_1.hash() {
1✔
3084
                    bail!("1st received block by main loop must be block 1");
1✔
3085
                }
1✔
3086
                if blocks[1].hash() != block_2.hash() {
1✔
3087
                    bail!("2nd received block by main loop must be block 2");
1✔
3088
                }
1✔
3089
                if blocks[2].hash() != block_3.hash() {
1✔
3090
                    bail!("3rd received block by main loop must be block 3");
1✔
3091
                }
1✔
3092
            }
1✔
3093
            _ => bail!("Did not find msg sent to main task"),
1✔
3094
        };
1✔
3095
        match to_main_rx1.recv().await {
1✔
3096
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
1✔
3097
            _ => bail!("Must receive remove of peer block max height"),
1✔
3098
        }
1✔
3099

1✔
3100
        if !state_lock.lock_guard().await.net.peer_map.is_empty() {
1✔
3101
            bail!("peer map must be empty after closing connection gracefully");
1✔
3102
        }
1✔
3103

1✔
3104
        Ok(())
1✔
3105
    }
1✔
3106

3107
    #[traced_test]
×
3108
    #[tokio::test]
3109
    async fn test_block_reconciliation_interrupted_by_block_notification() -> Result<()> {
1✔
3110
        // In this scenario, the client know the genesis block (block 0) and block 1, it
1✔
3111
        // then receives block 4, meaning that block 3, 2, and 1 will have to be requested.
1✔
3112
        // But the requests are interrupted by the peer sending another message: a new block
1✔
3113
        // notification.
1✔
3114

1✔
3115
        let network = Network::Main;
1✔
3116
        let (
1✔
3117
            _peer_broadcast_tx,
1✔
3118
            from_main_rx_clone,
1✔
3119
            to_main_tx,
1✔
3120
            mut to_main_rx1,
1✔
3121
            mut state_lock,
1✔
3122
            hsd,
1✔
3123
        ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
3124
        let peer_socket_address: SocketAddr = get_dummy_socket_address(0);
1✔
3125
        let genesis_block: Block = state_lock
1✔
3126
            .lock_guard()
1✔
3127
            .await
1✔
3128
            .chain
1✔
3129
            .archival_state()
1✔
3130
            .get_tip()
1✔
3131
            .await;
1✔
3132

1✔
3133
        let [block_1, block_2, block_3, block_4, block_5] =
1✔
3134
            fake_valid_sequence_of_blocks_for_tests(
1✔
3135
                &genesis_block,
1✔
3136
                Timestamp::hours(1),
1✔
3137
                StdRng::seed_from_u64(5550001).random(),
1✔
3138
            )
1✔
3139
            .await;
1✔
3140
        state_lock.set_new_tip(block_1.clone()).await?;
1✔
3141

1✔
3142
        let mock = Mock::new(vec![
1✔
3143
            Action::Read(PeerMessage::Block(Box::new(
1✔
3144
                block_4.clone().try_into().unwrap(),
1✔
3145
            ))),
1✔
3146
            Action::Write(PeerMessage::BlockRequestByHash(block_3.hash())),
1✔
3147
            Action::Read(PeerMessage::Block(Box::new(
1✔
3148
                block_3.clone().try_into().unwrap(),
1✔
3149
            ))),
1✔
3150
            Action::Write(PeerMessage::BlockRequestByHash(block_2.hash())),
1✔
3151
            //
1✔
3152
            // Now make the interruption of the block reconciliation process
1✔
3153
            Action::Read(PeerMessage::BlockNotification((&block_5).into())),
1✔
3154
            //
1✔
3155
            // Complete the block reconciliation process by requesting the last block
1✔
3156
            // in this process, to get back to a mutually known block.
1✔
3157
            Action::Read(PeerMessage::Block(Box::new(
1✔
3158
                block_2.clone().try_into().unwrap(),
1✔
3159
            ))),
1✔
3160
            //
1✔
3161
            // Then anticipate the request of the block that was announced
1✔
3162
            // in the interruption.
1✔
3163
            // Note that we cannot anticipate the response, as only the main
1✔
3164
            // task writes to the database. And the database needs to be updated
1✔
3165
            // for the handling of block 5 to be done correctly.
1✔
3166
            Action::Write(PeerMessage::BlockRequestByHeight(
1✔
3167
                block_5.kernel.header.height,
1✔
3168
            )),
1✔
3169
            Action::Read(PeerMessage::Bye),
1✔
3170
        ]);
1✔
3171

1✔
3172
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
3173
            to_main_tx.clone(),
1✔
3174
            state_lock.clone(),
1✔
3175
            peer_socket_address,
1✔
3176
            hsd,
1✔
3177
            false,
1✔
3178
            1,
1✔
3179
            block_5.header().timestamp,
1✔
3180
        );
1✔
3181
        peer_loop_handler
1✔
3182
            .run_wrapper(mock, from_main_rx_clone)
1✔
3183
            .await?;
1✔
3184

1✔
3185
        match to_main_rx1.recv().await {
1✔
3186
            Some(PeerTaskToMain::NewBlocks(blocks)) => {
1✔
3187
                if blocks[0].hash() != block_2.hash() {
1✔
3188
                    bail!("1st received block by main loop must be block 1");
1✔
3189
                }
1✔
3190
                if blocks[1].hash() != block_3.hash() {
1✔
3191
                    bail!("2nd received block by main loop must be block 2");
1✔
3192
                }
1✔
3193
                if blocks[2].hash() != block_4.hash() {
1✔
3194
                    bail!("3rd received block by main loop must be block 3");
1✔
3195
                }
1✔
3196
            }
1✔
3197
            _ => bail!("Did not find msg sent to main task"),
1✔
3198
        };
1✔
3199
        match to_main_rx1.recv().await {
1✔
3200
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
1✔
3201
            _ => bail!("Must receive remove of peer block max height"),
1✔
3202
        }
1✔
3203

1✔
3204
        if !state_lock.lock_guard().await.net.peer_map.is_empty() {
1✔
3205
            bail!("peer map must be empty after closing connection gracefully");
1✔
3206
        }
1✔
3207

1✔
3208
        Ok(())
1✔
3209
    }
1✔
3210

3211
    #[traced_test]
×
3212
    #[tokio::test]
3213
    async fn test_block_reconciliation_interrupted_by_peer_list_request() -> Result<()> {
1✔
3214
        // In this scenario, the client knows the genesis block (block 0) and block 1, it
1✔
3215
        // then receives block 4, meaning that block 3, 2, and 1 will have to be requested.
1✔
3216
        // But the requests are interrupted by the peer sending another message: a request
1✔
3217
        // for a list of peers.
1✔
3218

1✔
3219
        let network = Network::Main;
1✔
3220
        let (
1✔
3221
            _peer_broadcast_tx,
1✔
3222
            from_main_rx_clone,
1✔
3223
            to_main_tx,
1✔
3224
            mut to_main_rx1,
1✔
3225
            mut state_lock,
1✔
3226
            _hsd,
1✔
3227
        ) = get_test_genesis_setup(network, 1, cli_args::Args::default()).await?;
1✔
3228
        let genesis_block = Block::genesis(network);
1✔
3229
        let peer_infos: Vec<PeerInfo> = state_lock
1✔
3230
            .lock_guard()
1✔
3231
            .await
1✔
3232
            .net
1✔
3233
            .peer_map
1✔
3234
            .clone()
1✔
3235
            .into_values()
1✔
3236
            .collect::<Vec<_>>();
1✔
3237

1✔
3238
        let [block_1, block_2, block_3, block_4] = fake_valid_sequence_of_blocks_for_tests(
1✔
3239
            &genesis_block,
1✔
3240
            Timestamp::hours(1),
1✔
3241
            StdRng::seed_from_u64(5550001).random(),
1✔
3242
        )
1✔
3243
        .await;
1✔
3244
        state_lock.set_new_tip(block_1.clone()).await?;
1✔
3245

1✔
3246
        let (hsd_1, sa_1) = get_dummy_peer_connection_data_genesis(network, 1);
1✔
3247
        let expected_peer_list_resp = vec![
1✔
3248
            (
1✔
3249
                peer_infos[0].listen_address().unwrap(),
1✔
3250
                peer_infos[0].instance_id(),
1✔
3251
            ),
1✔
3252
            (sa_1, hsd_1.instance_id),
1✔
3253
        ];
1✔
3254
        let mock = Mock::new(vec![
1✔
3255
            Action::Read(PeerMessage::Block(Box::new(
1✔
3256
                block_4.clone().try_into().unwrap(),
1✔
3257
            ))),
1✔
3258
            Action::Write(PeerMessage::BlockRequestByHash(block_3.hash())),
1✔
3259
            Action::Read(PeerMessage::Block(Box::new(
1✔
3260
                block_3.clone().try_into().unwrap(),
1✔
3261
            ))),
1✔
3262
            Action::Write(PeerMessage::BlockRequestByHash(block_2.hash())),
1✔
3263
            //
1✔
3264
            // Now make the interruption of the block reconciliation process
1✔
3265
            Action::Read(PeerMessage::PeerListRequest),
1✔
3266
            //
1✔
3267
            // Answer the request for a peer list
1✔
3268
            Action::Write(PeerMessage::PeerListResponse(expected_peer_list_resp)),
1✔
3269
            //
1✔
3270
            // Complete the block reconciliation process by requesting the last block
1✔
3271
            // in this process, to get back to a mutually known block.
1✔
3272
            Action::Read(PeerMessage::Block(Box::new(
1✔
3273
                block_2.clone().try_into().unwrap(),
1✔
3274
            ))),
1✔
3275
            Action::Read(PeerMessage::Bye),
1✔
3276
        ]);
1✔
3277

1✔
3278
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
3279
            to_main_tx,
1✔
3280
            state_lock.clone(),
1✔
3281
            sa_1,
1✔
3282
            hsd_1,
1✔
3283
            true,
1✔
3284
            1,
1✔
3285
            block_4.header().timestamp,
1✔
3286
        );
1✔
3287
        peer_loop_handler
1✔
3288
            .run_wrapper(mock, from_main_rx_clone)
1✔
3289
            .await?;
1✔
3290

1✔
3291
        // Verify that blocks are sent to `main_loop` in expected ordering
1✔
3292
        match to_main_rx1.recv().await {
1✔
3293
            Some(PeerTaskToMain::NewBlocks(blocks)) => {
1✔
3294
                if blocks[0].hash() != block_2.hash() {
1✔
3295
                    bail!("1st received block by main loop must be block 1");
1✔
3296
                }
1✔
3297
                if blocks[1].hash() != block_3.hash() {
1✔
3298
                    bail!("2nd received block by main loop must be block 2");
1✔
3299
                }
1✔
3300
                if blocks[2].hash() != block_4.hash() {
1✔
3301
                    bail!("3rd received block by main loop must be block 3");
1✔
3302
                }
1✔
3303
            }
1✔
3304
            _ => bail!("Did not find msg sent to main task"),
1✔
3305
        };
1✔
3306

1✔
3307
        assert_eq!(
1✔
3308
            1,
1✔
3309
            state_lock.lock_guard().await.net.peer_map.len(),
1✔
3310
            "One peer must remain in peer list after peer_1 closed gracefully"
1✔
3311
        );
1✔
3312

1✔
3313
        Ok(())
1✔
3314
    }
1✔
3315

3316
    #[traced_test]
×
3317
    #[tokio::test]
3318
    async fn empty_mempool_request_tx_test() {
1✔
3319
        // In this scenario the client receives a transaction notification from
1✔
3320
        // a peer of a transaction it doesn't know; the client must then request it.
1✔
3321

1✔
3322
        let network = Network::Main;
1✔
3323
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, mut to_main_rx1, state_lock, _hsd) =
1✔
3324
            get_test_genesis_setup(network, 1, cli_args::Args::default())
1✔
3325
                .await
1✔
3326
                .unwrap();
1✔
3327

1✔
3328
        let spending_key = state_lock
1✔
3329
            .lock_guard()
1✔
3330
            .await
1✔
3331
            .wallet_state
1✔
3332
            .wallet_entropy
1✔
3333
            .nth_symmetric_key_for_tests(0);
1✔
3334
        let genesis_block = Block::genesis(network);
1✔
3335
        let now = genesis_block.kernel.header.timestamp;
1✔
3336
        let (transaction_1, _, _change_output) = state_lock
1✔
3337
            .lock_guard()
1✔
3338
            .await
1✔
3339
            .create_transaction_with_prover_capability(
1✔
3340
                Default::default(),
1✔
3341
                spending_key.into(),
1✔
3342
                UtxoNotificationMedium::OffChain,
1✔
3343
                NativeCurrencyAmount::coins(0),
1✔
3344
                now,
1✔
3345
                TxProvingCapability::ProofCollection,
1✔
3346
                &TritonVmJobQueue::dummy(),
1✔
3347
            )
1✔
3348
            .await
1✔
3349
            .unwrap();
1✔
3350

1✔
3351
        // Build the resulting transaction notification
1✔
3352
        let tx_notification: TransactionNotification = (&transaction_1).try_into().unwrap();
1✔
3353
        let mock = Mock::new(vec![
1✔
3354
            Action::Read(PeerMessage::TransactionNotification(tx_notification)),
1✔
3355
            Action::Write(PeerMessage::TransactionRequest(tx_notification.txid)),
1✔
3356
            Action::Read(PeerMessage::Transaction(Box::new(
1✔
3357
                (&transaction_1).try_into().unwrap(),
1✔
3358
            ))),
1✔
3359
            Action::Read(PeerMessage::Bye),
1✔
3360
        ]);
1✔
3361

1✔
3362
        let (hsd_1, _sa_1) = get_dummy_peer_connection_data_genesis(network, 1);
1✔
3363

1✔
3364
        // Mock a timestamp to allow transaction to be considered valid
1✔
3365
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
3366
            to_main_tx,
1✔
3367
            state_lock.clone(),
1✔
3368
            get_dummy_socket_address(0),
1✔
3369
            hsd_1.clone(),
1✔
3370
            true,
1✔
3371
            1,
1✔
3372
            now,
1✔
3373
        );
1✔
3374

1✔
3375
        let mut peer_state = MutablePeerState::new(hsd_1.tip_header.height);
1✔
3376

1✔
3377
        assert!(
1✔
3378
            state_lock.lock_guard().await.mempool.is_empty(),
1✔
3379
            "Mempool must be empty at init"
1✔
3380
        );
1✔
3381
        peer_loop_handler
1✔
3382
            .run(mock, from_main_rx_clone, &mut peer_state)
1✔
3383
            .await
1✔
3384
            .unwrap();
1✔
3385

1✔
3386
        // Transaction must be sent to `main_loop`. The transaction is stored to the mempool
1✔
3387
        // by the `main_loop`.
1✔
3388
        match to_main_rx1.recv().await {
1✔
3389
            Some(PeerTaskToMain::Transaction(_)) => (),
1✔
3390
            _ => panic!("Must receive remove of peer block max height"),
1✔
3391
        };
1✔
3392
    }
1✔
3393

3394
    #[traced_test]
×
3395
    #[tokio::test]
3396
    async fn populated_mempool_request_tx_test() -> Result<()> {
1✔
3397
        // In this scenario the peer is informed of a transaction that it already knows
1✔
3398

1✔
3399
        let network = Network::Main;
1✔
3400
        let (
1✔
3401
            _peer_broadcast_tx,
1✔
3402
            from_main_rx_clone,
1✔
3403
            to_main_tx,
1✔
3404
            mut to_main_rx1,
1✔
3405
            mut state_lock,
1✔
3406
            _hsd,
1✔
3407
        ) = get_test_genesis_setup(network, 1, cli_args::Args::default())
1✔
3408
            .await
1✔
3409
            .unwrap();
1✔
3410
        let spending_key = state_lock
1✔
3411
            .lock_guard()
1✔
3412
            .await
1✔
3413
            .wallet_state
1✔
3414
            .wallet_entropy
1✔
3415
            .nth_symmetric_key_for_tests(0);
1✔
3416

1✔
3417
        let genesis_block = Block::genesis(network);
1✔
3418
        let now = genesis_block.kernel.header.timestamp;
1✔
3419
        let (transaction_1, _, _change_output) = state_lock
1✔
3420
            .lock_guard()
1✔
3421
            .await
1✔
3422
            .create_transaction_with_prover_capability(
1✔
3423
                Default::default(),
1✔
3424
                spending_key.into(),
1✔
3425
                UtxoNotificationMedium::OffChain,
1✔
3426
                NativeCurrencyAmount::coins(0),
1✔
3427
                now,
1✔
3428
                TxProvingCapability::ProofCollection,
1✔
3429
                &TritonVmJobQueue::dummy(),
1✔
3430
            )
1✔
3431
            .await
1✔
3432
            .unwrap();
1✔
3433

1✔
3434
        let (hsd_1, _sa_1) = get_dummy_peer_connection_data_genesis(network, 1);
1✔
3435
        let mut peer_loop_handler = PeerLoopHandler::new(
1✔
3436
            to_main_tx,
1✔
3437
            state_lock.clone(),
1✔
3438
            get_dummy_socket_address(0),
1✔
3439
            hsd_1.clone(),
1✔
3440
            true,
1✔
3441
            1,
1✔
3442
        );
1✔
3443
        let mut peer_state = MutablePeerState::new(hsd_1.tip_header.height);
1✔
3444

1✔
3445
        assert!(
1✔
3446
            state_lock.lock_guard().await.mempool.is_empty(),
1✔
3447
            "Mempool must be empty at init"
1✔
3448
        );
1✔
3449
        state_lock
1✔
3450
            .lock_guard_mut()
1✔
3451
            .await
1✔
3452
            .mempool_insert(transaction_1.clone(), TransactionOrigin::Foreign)
1✔
3453
            .await;
1✔
3454
        assert!(
1✔
3455
            !state_lock.lock_guard().await.mempool.is_empty(),
1✔
3456
            "Mempool must be non-empty after insertion"
1✔
3457
        );
1✔
3458

1✔
3459
        // Run the peer loop and verify expected exchange -- namely that the
1✔
3460
        // tx notification is received and the the transaction is *not*
1✔
3461
        // requested.
1✔
3462
        let tx_notification: TransactionNotification = (&transaction_1).try_into().unwrap();
1✔
3463
        let mock = Mock::new(vec![
1✔
3464
            Action::Read(PeerMessage::TransactionNotification(tx_notification)),
1✔
3465
            Action::Read(PeerMessage::Bye),
1✔
3466
        ]);
1✔
3467
        peer_loop_handler
1✔
3468
            .run(mock, from_main_rx_clone, &mut peer_state)
1✔
3469
            .await
1✔
3470
            .unwrap();
1✔
3471

1✔
3472
        // nothing is allowed to be sent to `main_loop`
1✔
3473
        match to_main_rx1.try_recv() {
1✔
3474
            Err(TryRecvError::Empty) => (),
1✔
3475
            Err(TryRecvError::Disconnected) => panic!("to_main channel must still be open"),
1✔
3476
            Ok(_) => panic!("to_main channel must be empty"),
1✔
3477
        };
1✔
3478
        Ok(())
1✔
3479
    }
1✔
3480

3481
    mod block_proposals {
3482
        use super::*;
3483
        use crate::tests::shared::get_dummy_handshake_data_for_genesis;
3484

3485
        struct TestSetup {
3486
            peer_loop_handler: PeerLoopHandler,
3487
            to_main_rx: mpsc::Receiver<PeerTaskToMain>,
3488
            from_main_rx: broadcast::Receiver<MainToPeerTask>,
3489
            peer_state: MutablePeerState,
3490
            to_main_tx: mpsc::Sender<PeerTaskToMain>,
3491
            genesis_block: Block,
3492
            peer_broadcast_tx: broadcast::Sender<MainToPeerTask>,
3493
        }
3494

3495
        async fn genesis_setup(network: Network) -> TestSetup {
2✔
3496
            let (peer_broadcast_tx, from_main_rx, to_main_tx, to_main_rx, alice, _hsd) =
2✔
3497
                get_test_genesis_setup(network, 0, cli_args::Args::default())
2✔
3498
                    .await
2✔
3499
                    .unwrap();
2✔
3500
            let peer_hsd = get_dummy_handshake_data_for_genesis(network);
2✔
3501
            let peer_loop_handler = PeerLoopHandler::new(
2✔
3502
                to_main_tx.clone(),
2✔
3503
                alice.clone(),
2✔
3504
                get_dummy_socket_address(0),
2✔
3505
                peer_hsd.clone(),
2✔
3506
                true,
2✔
3507
                1,
2✔
3508
            );
2✔
3509
            let peer_state = MutablePeerState::new(peer_hsd.tip_header.height);
2✔
3510

2✔
3511
            // (peer_loop_handler, to_main_rx1)
2✔
3512
            TestSetup {
2✔
3513
                peer_broadcast_tx,
2✔
3514
                peer_loop_handler,
2✔
3515
                to_main_rx,
2✔
3516
                from_main_rx,
2✔
3517
                peer_state,
2✔
3518
                to_main_tx,
2✔
3519
                genesis_block: Block::genesis(network),
2✔
3520
            }
2✔
3521
        }
2✔
3522

3523
        #[traced_test]
×
3524
        #[tokio::test]
3525
        async fn accept_block_proposal_height_one() {
1✔
3526
            // Node knows genesis block, receives a block proposal for block 1
1✔
3527
            // and must accept this. Verify that main loop is informed of block
1✔
3528
            // proposal.
1✔
3529
            let TestSetup {
1✔
3530
                peer_broadcast_tx,
1✔
3531
                mut peer_loop_handler,
1✔
3532
                mut to_main_rx,
1✔
3533
                from_main_rx,
1✔
3534
                mut peer_state,
1✔
3535
                to_main_tx,
1✔
3536
                genesis_block,
1✔
3537
            } = genesis_setup(Network::Main).await;
1✔
3538
            let block1 = fake_valid_block_for_tests(
1✔
3539
                &peer_loop_handler.global_state_lock,
1✔
3540
                StdRng::seed_from_u64(5550001).random(),
1✔
3541
            )
1✔
3542
            .await;
1✔
3543

1✔
3544
            let mock = Mock::new(vec![
1✔
3545
                Action::Read(PeerMessage::BlockProposal(Box::new(block1))),
1✔
3546
                Action::Read(PeerMessage::Bye),
1✔
3547
            ]);
1✔
3548
            peer_loop_handler
1✔
3549
                .run(mock, from_main_rx, &mut peer_state)
1✔
3550
                .await
1✔
3551
                .unwrap();
1✔
3552

1✔
3553
            match to_main_rx.try_recv().unwrap() {
1✔
3554
                PeerTaskToMain::BlockProposal(block) => {
1✔
3555
                    assert_eq!(genesis_block.hash(), block.header().prev_block_digest);
1✔
3556
                }
1✔
3557
                _ => panic!("Expected main loop to be informed of block proposal"),
1✔
3558
            };
1✔
3559

1✔
3560
            drop(to_main_tx);
1✔
3561
            drop(peer_broadcast_tx);
1✔
3562
        }
1✔
3563

3564
        #[traced_test]
×
3565
        #[tokio::test]
3566
        async fn accept_block_proposal_notification_height_one() {
1✔
3567
            // Node knows genesis block, receives a block proposal notification
1✔
3568
            // for block 1 and must accept this by requesting the block
1✔
3569
            // proposal from peer.
1✔
3570
            let TestSetup {
1✔
3571
                peer_broadcast_tx,
1✔
3572
                mut peer_loop_handler,
1✔
3573
                to_main_rx: _,
1✔
3574
                from_main_rx,
1✔
3575
                mut peer_state,
1✔
3576
                to_main_tx,
1✔
3577
                ..
1✔
3578
            } = genesis_setup(Network::Main).await;
1✔
3579
            let block1 = fake_valid_block_for_tests(
1✔
3580
                &peer_loop_handler.global_state_lock,
1✔
3581
                StdRng::seed_from_u64(5550001).random(),
1✔
3582
            )
1✔
3583
            .await;
1✔
3584

1✔
3585
            let mock = Mock::new(vec![
1✔
3586
                Action::Read(PeerMessage::BlockProposalNotification((&block1).into())),
1✔
3587
                Action::Write(PeerMessage::BlockProposalRequest(
1✔
3588
                    BlockProposalRequest::new(block1.body().mast_hash()),
1✔
3589
                )),
1✔
3590
                Action::Read(PeerMessage::Bye),
1✔
3591
            ]);
1✔
3592
            peer_loop_handler
1✔
3593
                .run(mock, from_main_rx, &mut peer_state)
1✔
3594
                .await
1✔
3595
                .unwrap();
1✔
3596

1✔
3597
            drop(to_main_tx);
1✔
3598
            drop(peer_broadcast_tx);
1✔
3599
        }
1✔
3600
    }
3601

3602
    mod proof_qualities {
3603
        use strum::IntoEnumIterator;
3604

3605
        use super::*;
3606
        use crate::config_models::cli_args;
3607
        use crate::models::blockchain::transaction::Transaction;
3608
        use crate::models::peer::transfer_transaction::TransactionProofQuality;
3609
        use crate::tests::shared::mock_genesis_global_state;
3610

3611
        async fn tx_of_proof_quality(
2✔
3612
            network: Network,
2✔
3613
            quality: TransactionProofQuality,
2✔
3614
        ) -> Transaction {
2✔
3615
            let wallet_secret = WalletEntropy::devnet_wallet();
2✔
3616
            let alice_key = wallet_secret.nth_generation_spending_key_for_tests(0);
2✔
3617
            let alice =
2✔
3618
                mock_genesis_global_state(network, 1, wallet_secret, cli_args::Args::default())
2✔
3619
                    .await;
2✔
3620
            let alice = alice.lock_guard().await;
2✔
3621
            let genesis_block = alice.chain.light_state();
2✔
3622
            let in_seven_months = genesis_block.header().timestamp + Timestamp::months(7);
2✔
3623
            let prover_capability = match quality {
2✔
3624
                TransactionProofQuality::ProofCollection => TxProvingCapability::ProofCollection,
1✔
3625
                TransactionProofQuality::SingleProof => TxProvingCapability::SingleProof,
1✔
3626
            };
3627
            alice
2✔
3628
                .create_transaction_with_prover_capability(
2✔
3629
                    vec![].into(),
2✔
3630
                    alice_key.into(),
2✔
3631
                    UtxoNotificationMedium::OffChain,
2✔
3632
                    NativeCurrencyAmount::coins(1),
2✔
3633
                    in_seven_months,
2✔
3634
                    prover_capability,
2✔
3635
                    &TritonVmJobQueue::dummy(),
2✔
3636
                )
2✔
3637
                .await
2✔
3638
                .unwrap()
2✔
3639
                .0
2✔
3640
        }
2✔
3641

3642
        #[traced_test]
×
3643
        #[tokio::test]
3644
        async fn client_favors_higher_proof_quality() {
1✔
3645
            // In this scenario the peer is informed of a transaction that it
1✔
3646
            // already knows, and it's tested that it checks the proof quality
1✔
3647
            // field and verifies that it exceeds the proof in the mempool
1✔
3648
            // before requesting the transasction.
1✔
3649
            let network = Network::Main;
1✔
3650
            let proof_collection_tx =
1✔
3651
                tx_of_proof_quality(network, TransactionProofQuality::ProofCollection).await;
1✔
3652
            let single_proof_tx =
1✔
3653
                tx_of_proof_quality(network, TransactionProofQuality::SingleProof).await;
1✔
3654

1✔
3655
            for (own_tx_pq, new_tx_pq) in
4✔
3656
                TransactionProofQuality::iter().cartesian_product(TransactionProofQuality::iter())
1✔
3657
            {
1✔
3658
                use TransactionProofQuality::*;
1✔
3659

1✔
3660
                let (
1✔
3661
                    _peer_broadcast_tx,
4✔
3662
                    from_main_rx_clone,
4✔
3663
                    to_main_tx,
4✔
3664
                    mut to_main_rx1,
4✔
3665
                    mut alice,
4✔
3666
                    handshake_data,
4✔
3667
                ) = get_test_genesis_setup(network, 1, cli_args::Args::default())
4✔
3668
                    .await
4✔
3669
                    .unwrap();
4✔
3670

1✔
3671
                let (own_tx, new_tx) = match (own_tx_pq, new_tx_pq) {
4✔
3672
                    (ProofCollection, ProofCollection) => {
1✔
3673
                        (&proof_collection_tx, &proof_collection_tx)
1✔
3674
                    }
1✔
3675
                    (ProofCollection, SingleProof) => (&proof_collection_tx, &single_proof_tx),
1✔
3676
                    (SingleProof, ProofCollection) => (&single_proof_tx, &proof_collection_tx),
1✔
3677
                    (SingleProof, SingleProof) => (&single_proof_tx, &single_proof_tx),
1✔
3678
                };
1✔
3679

1✔
3680
                alice
4✔
3681
                    .lock_guard_mut()
4✔
3682
                    .await
4✔
3683
                    .mempool_insert(own_tx.to_owned(), TransactionOrigin::Foreign)
4✔
3684
                    .await;
4✔
3685

1✔
3686
                let tx_notification: TransactionNotification = new_tx.try_into().unwrap();
4✔
3687

4✔
3688
                let own_proof_is_supreme = own_tx_pq >= new_tx_pq;
4✔
3689
                let mock = if own_proof_is_supreme {
4✔
3690
                    Mock::new(vec![
3✔
3691
                        Action::Read(PeerMessage::TransactionNotification(tx_notification)),
3✔
3692
                        Action::Read(PeerMessage::Bye),
3✔
3693
                    ])
3✔
3694
                } else {
1✔
3695
                    Mock::new(vec![
1✔
3696
                        Action::Read(PeerMessage::TransactionNotification(tx_notification)),
1✔
3697
                        Action::Write(PeerMessage::TransactionRequest(tx_notification.txid)),
1✔
3698
                        Action::Read(PeerMessage::Transaction(Box::new(
1✔
3699
                            new_tx.try_into().unwrap(),
1✔
3700
                        ))),
1✔
3701
                        Action::Read(PeerMessage::Bye),
1✔
3702
                    ])
1✔
3703
                };
1✔
3704

1✔
3705
                let now = proof_collection_tx.kernel.timestamp;
4✔
3706
                let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
4✔
3707
                    to_main_tx,
4✔
3708
                    alice.clone(),
4✔
3709
                    get_dummy_socket_address(0),
4✔
3710
                    handshake_data.clone(),
4✔
3711
                    true,
4✔
3712
                    1,
4✔
3713
                    now,
4✔
3714
                );
4✔
3715
                let mut peer_state = MutablePeerState::new(handshake_data.tip_header.height);
4✔
3716

4✔
3717
                peer_loop_handler
4✔
3718
                    .run(mock, from_main_rx_clone, &mut peer_state)
4✔
3719
                    .await
4✔
3720
                    .unwrap();
4✔
3721

4✔
3722
                if own_proof_is_supreme {
4✔
3723
                    match to_main_rx1.try_recv() {
3✔
3724
                        Err(TryRecvError::Empty) => (),
3✔
3725
                        Err(TryRecvError::Disconnected) => {
1✔
3726
                            panic!("to_main channel must still be open")
1✔
3727
                        }
1✔
3728
                        Ok(_) => panic!("to_main channel must be empty"),
1✔
3729
                    }
1✔
3730
                } else {
1✔
3731
                    match to_main_rx1.try_recv() {
1✔
3732
                        Err(TryRecvError::Empty) => panic!("Transaction must be sent to main loop"),
1✔
3733
                        Err(TryRecvError::Disconnected) => {
1✔
3734
                            panic!("to_main channel must still be open")
1✔
3735
                        }
1✔
3736
                        Ok(PeerTaskToMain::Transaction(_)) => (),
1✔
3737
                        _ => panic!("Unexpected result from channel"),
1✔
3738
                    }
1✔
3739
                }
1✔
3740
            }
1✔
3741
        }
1✔
3742
    }
3743

3744
    mod sync_challenges {
3745
        use super::*;
3746
        use crate::tests::shared::fake_valid_sequence_of_blocks_for_tests_dyn;
3747

3748
        #[traced_test]
×
3749
        #[tokio::test]
3750
        async fn bad_sync_challenge_height_greater_than_tip() {
1✔
3751
            // Criterium: Challenge height may not exceed that of tip in the
1✔
3752
            // request.
1✔
3753

1✔
3754
            let network = Network::Main;
1✔
3755
            let (
1✔
3756
                _alice_main_to_peer_tx,
1✔
3757
                alice_main_to_peer_rx,
1✔
3758
                alice_peer_to_main_tx,
1✔
3759
                alice_peer_to_main_rx,
1✔
3760
                mut alice,
1✔
3761
                alice_hsd,
1✔
3762
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default())
1✔
3763
                .await
1✔
3764
                .unwrap();
1✔
3765
            let genesis_block: Block = Block::genesis(network);
1✔
3766
            let blocks: [Block; 11] = fake_valid_sequence_of_blocks_for_tests(
1✔
3767
                &genesis_block,
1✔
3768
                Timestamp::hours(1),
1✔
3769
                [0u8; 32],
1✔
3770
            )
1✔
3771
            .await;
1✔
3772
            for block in &blocks {
12✔
3773
                alice.set_new_tip(block.clone()).await.unwrap();
11✔
3774
            }
1✔
3775

1✔
3776
            let bh12 = blocks.last().unwrap().header().height;
1✔
3777
            let sync_challenge = SyncChallenge {
1✔
3778
                tip_digest: blocks[9].hash(),
1✔
3779
                challenges: [bh12; 10],
1✔
3780
            };
1✔
3781
            let alice_p2p_messages = Mock::new(vec![
1✔
3782
                Action::Read(PeerMessage::SyncChallenge(sync_challenge)),
1✔
3783
                Action::Read(PeerMessage::Bye),
1✔
3784
            ]);
1✔
3785

1✔
3786
            let peer_address = get_dummy_socket_address(0);
1✔
3787
            let mut alice_peer_loop_handler = PeerLoopHandler::new(
1✔
3788
                alice_peer_to_main_tx.clone(),
1✔
3789
                alice.clone(),
1✔
3790
                peer_address,
1✔
3791
                alice_hsd,
1✔
3792
                false,
1✔
3793
                1,
1✔
3794
            );
1✔
3795
            alice_peer_loop_handler
1✔
3796
                .run_wrapper(alice_p2p_messages, alice_main_to_peer_rx)
1✔
3797
                .await
1✔
3798
                .unwrap();
1✔
3799

1✔
3800
            drop(alice_peer_to_main_rx);
1✔
3801

1✔
3802
            let latest_sanction = alice
1✔
3803
                .lock_guard()
1✔
3804
                .await
1✔
3805
                .net
1✔
3806
                .get_peer_standing_from_database(peer_address.ip())
1✔
3807
                .await
1✔
3808
                .unwrap();
1✔
3809
            assert_eq!(
1✔
3810
                NegativePeerSanction::InvalidSyncChallenge,
1✔
3811
                latest_sanction
1✔
3812
                    .latest_punishment
1✔
3813
                    .expect("peer must be sanctioned")
1✔
3814
                    .0
1✔
3815
            );
1✔
3816
        }
1✔
3817

3818
        #[traced_test]
×
3819
        #[tokio::test]
3820
        async fn bad_sync_challenge_genesis_block_doesnt_crash_client() {
1✔
3821
            // Criterium: Challenge may not point to genesis block, or block 1, as
1✔
3822
            // tip.
1✔
3823

1✔
3824
            let network = Network::Main;
1✔
3825
            let genesis_block: Block = Block::genesis(network);
1✔
3826

1✔
3827
            let alice_cli = cli_args::Args::default();
1✔
3828
            let (
1✔
3829
                _alice_main_to_peer_tx,
1✔
3830
                alice_main_to_peer_rx,
1✔
3831
                alice_peer_to_main_tx,
1✔
3832
                alice_peer_to_main_rx,
1✔
3833
                alice,
1✔
3834
                alice_hsd,
1✔
3835
            ) = get_test_genesis_setup(network, 0, alice_cli).await.unwrap();
1✔
3836

1✔
3837
            let sync_challenge = SyncChallenge {
1✔
3838
                tip_digest: genesis_block.hash(),
1✔
3839
                challenges: [BlockHeight::genesis(); 10],
1✔
3840
            };
1✔
3841

1✔
3842
            let alice_p2p_messages = Mock::new(vec![
1✔
3843
                Action::Read(PeerMessage::SyncChallenge(sync_challenge)),
1✔
3844
                Action::Read(PeerMessage::Bye),
1✔
3845
            ]);
1✔
3846

1✔
3847
            let peer_address = get_dummy_socket_address(0);
1✔
3848
            let mut alice_peer_loop_handler = PeerLoopHandler::new(
1✔
3849
                alice_peer_to_main_tx.clone(),
1✔
3850
                alice.clone(),
1✔
3851
                peer_address,
1✔
3852
                alice_hsd,
1✔
3853
                false,
1✔
3854
                1,
1✔
3855
            );
1✔
3856
            alice_peer_loop_handler
1✔
3857
                .run_wrapper(alice_p2p_messages, alice_main_to_peer_rx)
1✔
3858
                .await
1✔
3859
                .unwrap();
1✔
3860

1✔
3861
            drop(alice_peer_to_main_rx);
1✔
3862

1✔
3863
            let latest_sanction = alice
1✔
3864
                .lock_guard()
1✔
3865
                .await
1✔
3866
                .net
1✔
3867
                .get_peer_standing_from_database(peer_address.ip())
1✔
3868
                .await
1✔
3869
                .unwrap();
1✔
3870
            assert_eq!(
1✔
3871
                NegativePeerSanction::InvalidSyncChallenge,
1✔
3872
                latest_sanction
1✔
3873
                    .latest_punishment
1✔
3874
                    .expect("peer must be sanctioned")
1✔
3875
                    .0
1✔
3876
            );
1✔
3877
        }
1✔
3878

3879
        #[traced_test]
×
3880
        #[tokio::test]
3881
        async fn sync_challenge_happy_path() -> Result<()> {
1✔
3882
            // Bob notifies Alice of a block whose parameters satisfy the sync mode
1✔
3883
            // criterion. Alice issues a challenge. Bob responds. Alice enters into
1✔
3884
            // sync mode.
1✔
3885

1✔
3886
            let mut rng = rand::rng();
1✔
3887
            let network = Network::Main;
1✔
3888
            let genesis_block: Block = Block::genesis(network);
1✔
3889

1✔
3890
            const ALICE_SYNC_MODE_THRESHOLD: usize = 10;
1✔
3891
            let alice_cli = cli_args::Args {
1✔
3892
                sync_mode_threshold: ALICE_SYNC_MODE_THRESHOLD,
1✔
3893
                ..Default::default()
1✔
3894
            };
1✔
3895
            let (
1✔
3896
                _alice_main_to_peer_tx,
1✔
3897
                alice_main_to_peer_rx,
1✔
3898
                alice_peer_to_main_tx,
1✔
3899
                mut alice_peer_to_main_rx,
1✔
3900
                mut alice,
1✔
3901
                alice_hsd,
1✔
3902
            ) = get_test_genesis_setup(network, 0, alice_cli).await?;
1✔
3903
            let _alice_socket_address = get_dummy_socket_address(0);
1✔
3904

1✔
3905
            let (
1✔
3906
                _bob_main_to_peer_tx,
1✔
3907
                _bob_main_to_peer_rx,
1✔
3908
                _bob_peer_to_main_tx,
1✔
3909
                _bob_peer_to_main_rx,
1✔
3910
                mut bob,
1✔
3911
                _bob_hsd,
1✔
3912
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
3913
            let bob_socket_address = get_dummy_socket_address(0);
1✔
3914

1✔
3915
            let now = genesis_block.header().timestamp + Timestamp::hours(1);
1✔
3916
            let block_1 = fake_valid_block_for_tests(&alice, rng.random()).await;
1✔
3917
            assert!(
1✔
3918
                block_1.is_valid(&genesis_block, now).await,
1✔
3919
                "Block must be valid for this test to make sense"
1✔
3920
            );
1✔
3921
            let alice_tip = &block_1;
1✔
3922
            alice.set_new_tip(block_1.clone()).await?;
1✔
3923
            bob.set_new_tip(block_1.clone()).await?;
1✔
3924

1✔
3925
            // produce enough blocks to ensure alice needs to go into sync mode
1✔
3926
            // with this block notification.
1✔
3927
            let blocks = fake_valid_sequence_of_blocks_for_tests_dyn(
1✔
3928
                &block_1,
1✔
3929
                TARGET_BLOCK_INTERVAL,
1✔
3930
                rng.random(),
1✔
3931
                rng.random_range(ALICE_SYNC_MODE_THRESHOLD + 1..20),
1✔
3932
            )
1✔
3933
            .await;
1✔
3934
            for block in &blocks {
16✔
3935
                bob.set_new_tip(block.clone()).await?;
15✔
3936
            }
1✔
3937
            let bob_tip = blocks.last().unwrap();
1✔
3938

1✔
3939
            let block_notification_from_bob = PeerBlockNotification {
1✔
3940
                hash: bob_tip.hash(),
1✔
3941
                height: bob_tip.header().height,
1✔
3942
                cumulative_proof_of_work: bob_tip.header().cumulative_proof_of_work,
1✔
3943
            };
1✔
3944

1✔
3945
            let alice_rng_seed = rng.random::<[u8; 32]>();
1✔
3946
            let mut alice_rng_clone = StdRng::from_seed(alice_rng_seed);
1✔
3947
            let sync_challenge_from_alice = SyncChallenge::generate(
1✔
3948
                &block_notification_from_bob,
1✔
3949
                alice_tip.header().height,
1✔
3950
                alice_rng_clone.random(),
1✔
3951
            );
1✔
3952

1✔
3953
            println!(
1✔
3954
                "sync challenge from alice:\n{:?}",
1✔
3955
                sync_challenge_from_alice
1✔
3956
            );
1✔
3957

1✔
3958
            let sync_challenge_response_from_bob = bob
1✔
3959
                .lock_guard()
1✔
3960
                .await
1✔
3961
                .response_to_sync_challenge(sync_challenge_from_alice)
1✔
3962
                .await
1✔
3963
                .expect("should be able to respond to sync challenge");
1✔
3964

1✔
3965
            let alice_p2p_messages = Mock::new(vec![
1✔
3966
                Action::Read(PeerMessage::BlockNotification(block_notification_from_bob)),
1✔
3967
                Action::Write(PeerMessage::SyncChallenge(sync_challenge_from_alice)),
1✔
3968
                Action::Read(PeerMessage::SyncChallengeResponse(Box::new(
1✔
3969
                    sync_challenge_response_from_bob,
1✔
3970
                ))),
1✔
3971
                Action::Read(PeerMessage::BlockNotification(block_notification_from_bob)),
1✔
3972
                // Ensure no 2nd sync challenge is sent, as timeout has not yet passed.
1✔
3973
                // The absence of a Write here checks that a 2nd challenge isn't sent
1✔
3974
                // when a successful was just received.
1✔
3975
                Action::Read(PeerMessage::Bye),
1✔
3976
            ]);
1✔
3977

1✔
3978
            let mut alice_peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
3979
                alice_peer_to_main_tx.clone(),
1✔
3980
                alice.clone(),
1✔
3981
                bob_socket_address,
1✔
3982
                alice_hsd,
1✔
3983
                false,
1✔
3984
                1,
1✔
3985
                bob_tip.header().timestamp,
1✔
3986
            );
1✔
3987
            alice_peer_loop_handler.set_rng(StdRng::from_seed(alice_rng_seed));
1✔
3988
            alice_peer_loop_handler
1✔
3989
                .run_wrapper(alice_p2p_messages, alice_main_to_peer_rx)
1✔
3990
                .await?;
1✔
3991

1✔
3992
            // AddPeerMaxBlockHeight message triggered *after* sync challenge
1✔
3993
            let mut expected_anchor_mmra = bob_tip.body().block_mmr_accumulator.clone();
1✔
3994
            expected_anchor_mmra.append(bob_tip.hash());
1✔
3995
            let expected_message_from_alice_peer_loop = PeerTaskToMain::AddPeerMaxBlockHeight {
1✔
3996
                peer_address: bob_socket_address,
1✔
3997
                claimed_height: bob_tip.header().height,
1✔
3998
                claimed_cumulative_pow: bob_tip.header().cumulative_proof_of_work,
1✔
3999
                claimed_block_mmra: expected_anchor_mmra,
1✔
4000
            };
1✔
4001
            let observed_message_from_alice_peer_loop = alice_peer_to_main_rx.recv().await.unwrap();
1✔
4002
            assert_eq!(
1✔
4003
                expected_message_from_alice_peer_loop,
1✔
4004
                observed_message_from_alice_peer_loop
1✔
4005
            );
1✔
4006

1✔
4007
            Ok(())
1✔
4008
        }
1✔
4009
    }
4010
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc