• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Neptune-Crypto / neptune-core / 14133619659

28 Mar 2025 04:35PM UTC coverage: 84.452% (-0.03%) from 84.479%
14133619659

push

github

dan-da
fix: avoid holding read-lock across peer.send()

addresses #534.  hanging node.

With this fix we obtain the block in a standalone statement so the
read-guard gets dropped before the match statement.

proof: https://play.rust-lang.org/?version=stable&mode=debug&edition=2024&gist=4394469e33398e6970a6c9d3562b0aaf

0 of 3 new or added lines in 1 file covered. (0.0%)

18 existing lines in 5 files now uncovered.

51866 of 61415 relevant lines covered (84.45%)

175766.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.88
/src/peer_loop.rs
1
use std::cmp;
2
use std::marker::Unpin;
3
use std::net::SocketAddr;
4
use std::time::SystemTime;
5

6
use anyhow::bail;
7
use anyhow::Result;
8
use chrono::DateTime;
9
use chrono::Utc;
10
use futures::sink::Sink;
11
use futures::sink::SinkExt;
12
use futures::stream::TryStream;
13
use futures::stream::TryStreamExt;
14
use itertools::Itertools;
15
use rand::rngs::StdRng;
16
use rand::Rng;
17
use rand::SeedableRng;
18
use tasm_lib::triton_vm::prelude::Digest;
19
use tasm_lib::twenty_first::prelude::Mmr;
20
use tasm_lib::twenty_first::prelude::MmrMembershipProof;
21
use tasm_lib::twenty_first::util_types::mmr::mmr_accumulator::MmrAccumulator;
22
use tokio::select;
23
use tokio::sync::broadcast;
24
use tokio::sync::mpsc;
25
use tracing::debug;
26
use tracing::error;
27
use tracing::info;
28
use tracing::warn;
29

30
use crate::connect_to_peers::close_peer_connected_callback;
31
use crate::macros::fn_name;
32
use crate::macros::log_slow_scope;
33
use crate::main_loop::MAX_NUM_DIGESTS_IN_BATCH_REQUEST;
34
use crate::models::blockchain::block::block_height::BlockHeight;
35
use crate::models::blockchain::block::mutator_set_update::MutatorSetUpdate;
36
use crate::models::blockchain::block::Block;
37
use crate::models::blockchain::transaction::transaction_kernel::TransactionConfirmabilityError;
38
use crate::models::blockchain::transaction::Transaction;
39
use crate::models::channel::MainToPeerTask;
40
use crate::models::channel::PeerTaskToMain;
41
use crate::models::channel::PeerTaskToMainTransaction;
42
use crate::models::peer::handshake_data::HandshakeData;
43
use crate::models::peer::peer_info::PeerConnectionInfo;
44
use crate::models::peer::peer_info::PeerInfo;
45
use crate::models::peer::transfer_block::TransferBlock;
46
use crate::models::peer::BlockProposalRequest;
47
use crate::models::peer::BlockRequestBatch;
48
use crate::models::peer::IssuedSyncChallenge;
49
use crate::models::peer::MutablePeerState;
50
use crate::models::peer::NegativePeerSanction;
51
use crate::models::peer::PeerMessage;
52
use crate::models::peer::PeerSanction;
53
use crate::models::peer::PeerStanding;
54
use crate::models::peer::PositivePeerSanction;
55
use crate::models::peer::SyncChallenge;
56
use crate::models::proof_abstractions::mast_hash::MastHash;
57
use crate::models::proof_abstractions::timestamp::Timestamp;
58
use crate::models::state::block_proposal::BlockProposalRejectError;
59
use crate::models::state::mempool::MEMPOOL_IGNORE_TRANSACTIONS_THIS_MANY_SECS_AHEAD;
60
use crate::models::state::mempool::MEMPOOL_TX_THRESHOLD_AGE_IN_SECS;
61
use crate::models::state::GlobalState;
62
use crate::models::state::GlobalStateLock;
63
use crate::util_types::mutator_set::removal_record::RemovalRecordValidityError;
64

65
const STANDARD_BLOCK_BATCH_SIZE: usize = 250;
66
const MAX_PEER_LIST_LENGTH: usize = 10;
67
const MINIMUM_BLOCK_BATCH_SIZE: usize = 2;
68

69
const KEEP_CONNECTION_ALIVE: bool = false;
70
const DISCONNECT_CONNECTION: bool = true;
71

72
pub type PeerStandingNumber = i32;
73

74
/// Handles messages from peers via TCP
75
///
76
/// also handles messages from main task over the main-to-peer-tasks broadcast
77
/// channel.
78
#[derive(Debug, Clone)]
79
pub struct PeerLoopHandler {
80
    to_main_tx: mpsc::Sender<PeerTaskToMain>,
81
    global_state_lock: GlobalStateLock,
82
    peer_address: SocketAddr,
83
    peer_handshake_data: HandshakeData,
84
    inbound_connection: bool,
85
    distance: u8,
86
    rng: StdRng,
87
    #[cfg(test)]
88
    mock_now: Option<Timestamp>,
89
}
90

91
impl PeerLoopHandler {
92
    pub(crate) fn new(
22✔
93
        to_main_tx: mpsc::Sender<PeerTaskToMain>,
22✔
94
        global_state_lock: GlobalStateLock,
22✔
95
        peer_address: SocketAddr,
22✔
96
        peer_handshake_data: HandshakeData,
22✔
97
        inbound_connection: bool,
22✔
98
        distance: u8,
22✔
99
    ) -> Self {
22✔
100
        Self {
22✔
101
            to_main_tx,
22✔
102
            global_state_lock,
22✔
103
            peer_address,
22✔
104
            peer_handshake_data,
22✔
105
            inbound_connection,
22✔
106
            distance,
22✔
107
            rng: StdRng::from_rng(&mut rand::rng()),
22✔
108
            #[cfg(test)]
22✔
109
            mock_now: None,
22✔
110
        }
22✔
111
    }
22✔
112

113
    /// Allows for mocked timestamps such that time dependencies may be tested.
114
    #[cfg(test)]
115
    pub(crate) fn with_mocked_time(
20✔
116
        to_main_tx: mpsc::Sender<PeerTaskToMain>,
20✔
117
        global_state_lock: GlobalStateLock,
20✔
118
        peer_address: SocketAddr,
20✔
119
        peer_handshake_data: HandshakeData,
20✔
120
        inbound_connection: bool,
20✔
121
        distance: u8,
20✔
122
        mocked_time: Timestamp,
20✔
123
    ) -> Self {
20✔
124
        Self {
20✔
125
            to_main_tx,
20✔
126
            global_state_lock,
20✔
127
            peer_address,
20✔
128
            peer_handshake_data,
20✔
129
            inbound_connection,
20✔
130
            distance,
20✔
131
            mock_now: Some(mocked_time),
20✔
132
            rng: StdRng::from_rng(&mut rand::rng()),
20✔
133
        }
20✔
134
    }
20✔
135

136
    /// Overwrite the random number generator object with a specific one.
137
    ///
138
    /// Useful for derandomizing tests.
139
    #[cfg(test)]
140
    fn set_rng(&mut self, rng: StdRng) {
1✔
141
        self.rng = rng;
1✔
142
    }
1✔
143

144
    fn now(&self) -> Timestamp {
27✔
145
        #[cfg(not(test))]
27✔
146
        {
27✔
147
            Timestamp::now()
27✔
148
        }
27✔
149
        #[cfg(test)]
27✔
150
        {
27✔
151
            self.mock_now.unwrap_or(Timestamp::now())
27✔
152
        }
27✔
153
    }
27✔
154

155
    /// Punish a peer for bad behavior.
156
    ///
157
    /// Return `Err` if the peer in question is (now) banned.
158
    ///
159
    /// # Locking:
160
    ///   * acquires `global_state_lock` for write
161
    async fn punish(&mut self, reason: NegativePeerSanction) -> Result<()> {
6✔
162
        let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
6✔
163
        warn!("Punishing peer {} for {:?}", self.peer_address.ip(), reason);
6✔
164
        debug!(
6✔
165
            "Peer standing before punishment is {}",
×
166
            global_state_mut
×
167
                .net
×
168
                .peer_map
×
169
                .get(&self.peer_address)
×
170
                .unwrap()
×
171
                .standing
172
        );
173

174
        let Some(peer_info) = global_state_mut.net.peer_map.get_mut(&self.peer_address) else {
6✔
175
            bail!("Could not read peer map.");
×
176
        };
177
        let sanction_result = peer_info.standing.sanction(PeerSanction::Negative(reason));
6✔
178
        if let Err(err) = sanction_result {
6✔
179
            warn!("Banning peer: {err}");
1✔
180
        }
5✔
181

182
        sanction_result.map_err(|err| anyhow::anyhow!("Banning peer: {err}"))
6✔
183
    }
6✔
184

185
    /// Reward a peer for good behavior.
186
    ///
187
    /// Return `Err` if the peer in question is banned.
188
    ///
189
    /// # Locking:
190
    ///   * acquires `global_state_lock` for write
191
    async fn reward(&mut self, reason: PositivePeerSanction) -> Result<()> {
7✔
192
        let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
7✔
193
        info!("Rewarding peer {} for {:?}", self.peer_address.ip(), reason);
7✔
194
        let Some(peer_info) = global_state_mut.net.peer_map.get_mut(&self.peer_address) else {
7✔
195
            error!("Could not read peer map.");
1✔
196
            return Ok(());
1✔
197
        };
198
        let sanction_result = peer_info.standing.sanction(PeerSanction::Positive(reason));
6✔
199
        if sanction_result.is_err() {
6✔
200
            error!("Cannot reward banned peer");
×
201
        }
6✔
202

203
        sanction_result.map_err(|err| anyhow::anyhow!("Cannot reward banned peer: {err}"))
6✔
204
    }
7✔
205

206
    /// Construct a batch response, with blocks and their MMR membership proofs
207
    /// relative to a specified anchor.
208
    ///
209
    /// Returns `None` if the anchor has a lower leaf count than the blocks, or
210
    /// a block height of the response exceeds own tip height.
211
    async fn batch_response(
16✔
212
        state: &GlobalState,
16✔
213
        blocks: Vec<Block>,
16✔
214
        anchor: &MmrAccumulator,
16✔
215
    ) -> Option<Vec<(TransferBlock, MmrMembershipProof)>> {
16✔
216
        let own_tip_height = state.chain.light_state().header().height;
16✔
217
        let block_heights_match_anchor = blocks
16✔
218
            .iter()
16✔
219
            .all(|bl| bl.header().height < anchor.num_leafs().into());
46✔
220
        let block_heights_known = blocks.iter().all(|bl| bl.header().height <= own_tip_height);
46✔
221
        if !block_heights_match_anchor || !block_heights_known {
16✔
222
            let max_block_height = match blocks.iter().map(|bl| bl.header().height).max() {
×
223
                Some(height) => height.to_string(),
×
224
                None => "None".to_owned(),
×
225
            };
226

227
            debug!("max_block_height: {max_block_height}");
×
228
            debug!("own_tip_height: {own_tip_height}");
×
229
            debug!("anchor.num_leafs(): {}", anchor.num_leafs());
×
230
            debug!("block_heights_match_anchor: {block_heights_match_anchor}");
×
231
            debug!("block_heights_known: {block_heights_known}");
×
232
            return None;
×
233
        }
16✔
234

16✔
235
        let mut ret = vec![];
16✔
236
        for block in blocks {
62✔
237
            let mmr_mp = state
46✔
238
                .chain
46✔
239
                .archival_state()
46✔
240
                .archival_block_mmr
46✔
241
                .ammr()
46✔
242
                .prove_membership_relative_to_smaller_mmr(
46✔
243
                    block.header().height.into(),
46✔
244
                    anchor.num_leafs(),
46✔
245
                )
46✔
246
                .await;
46✔
247
            let block: TransferBlock = block.try_into().unwrap();
46✔
248
            ret.push((block, mmr_mp));
46✔
249
        }
250

251
        Some(ret)
16✔
252
    }
16✔
253

254
    /// Handle validation and send all blocks to the main task if they're all
255
    /// valid. Use with a list of blocks or a single block. When the
256
    /// `received_blocks` is a list, the parent of the `i+1`th block in the
257
    /// list is the `i`th block. The parent of element zero in this list is
258
    /// `parent_of_first_block`.
259
    ///
260
    /// # Return Value
261
    ///  - `Err` when the connection should be closed;
262
    ///  - `Ok(None)` if some block is invalid
263
    ///  - `Ok(None)` if the last block has insufficient cumulative PoW and we
264
    ///    are not syncing;
265
    ///  - `Ok(None)` if the last block has insufficient height and we are
266
    ///    syncing;
267
    ///  - `Ok(Some(block_height))` otherwise, referring to the block with the
268
    ///    highest height in the batch.
269
    ///
270
    /// A return value of Ok(Some(_)) means that the message was passed on to
271
    /// main loop.
272
    ///
273
    /// # Locking
274
    ///   * Acquires `global_state_lock` for write via `self.punish(..)` and
275
    ///     `self.reward(..)`.
276
    ///
277
    /// # Panics
278
    ///
279
    ///  - Panics if called with the empty list.
280
    async fn handle_blocks(
8✔
281
        &mut self,
8✔
282
        received_blocks: Vec<Block>,
8✔
283
        parent_of_first_block: Block,
8✔
284
    ) -> Result<Option<BlockHeight>> {
8✔
285
        debug!(
8✔
286
            "attempting to validate {} {}",
×
287
            received_blocks.len(),
×
288
            if received_blocks.len() == 1 {
×
289
                "block"
×
290
            } else {
291
                "blocks"
×
292
            }
293
        );
294
        let now = self.now();
8✔
295
        debug!("validating with respect to current timestamp {now}");
8✔
296
        let mut previous_block = &parent_of_first_block;
8✔
297
        for new_block in &received_blocks {
24✔
298
            let new_block_has_proof_of_work = new_block.has_proof_of_work(previous_block.header());
17✔
299
            debug!("new block has proof of work? {new_block_has_proof_of_work}");
17✔
300
            let new_block_is_valid = new_block.is_valid(previous_block, now).await;
17✔
301
            debug!("new block is valid? {new_block_is_valid}");
17✔
302
            if !new_block_has_proof_of_work {
17✔
303
                warn!(
1✔
304
                    "Received invalid proof-of-work for block of height {} from peer with IP {}",
×
305
                    new_block.kernel.header.height, self.peer_address
×
306
                );
307
                warn!("Difficulty is {}.", previous_block.kernel.header.difficulty);
1✔
308
                warn!(
1✔
309
                    "Proof of work should be {} (or more) but was [{}].",
×
310
                    previous_block.kernel.header.difficulty.target(),
×
311
                    new_block.hash().values().iter().join(", ")
×
312
                );
313
                self.punish(NegativePeerSanction::InvalidBlock((
1✔
314
                    new_block.kernel.header.height,
1✔
315
                    new_block.hash(),
1✔
316
                )))
1✔
317
                .await?;
1✔
318
                warn!("Failed to validate block due to insufficient PoW");
1✔
319
                return Ok(None);
1✔
320
            } else if !new_block_is_valid {
16✔
321
                warn!(
×
322
                    "Received invalid block of height {} from peer with IP {}",
×
323
                    new_block.kernel.header.height, self.peer_address
×
324
                );
325
                self.punish(NegativePeerSanction::InvalidBlock((
×
326
                    new_block.kernel.header.height,
×
327
                    new_block.hash(),
×
328
                )))
×
329
                .await?;
×
330
                warn!("Failed to validate block: invalid block");
×
331
                return Ok(None);
×
332
            }
16✔
333
            info!(
16✔
334
                "Block with height {} is valid. mined: {}",
×
335
                new_block.kernel.header.height,
×
336
                new_block.kernel.header.timestamp.standard_format()
×
337
            );
338

339
            previous_block = new_block;
16✔
340
        }
341

342
        // evaluate the fork choice rule
343
        debug!("Checking last block's canonicity ...");
7✔
344
        let last_block = received_blocks.last().unwrap();
7✔
345
        let is_canonical = self
7✔
346
            .global_state_lock
7✔
347
            .lock_guard()
7✔
348
            .await
7✔
349
            .incoming_block_is_more_canonical(last_block);
7✔
350
        let last_block_height = last_block.header().height;
7✔
351
        let sync_mode_active_and_have_new_champion = self
7✔
352
            .global_state_lock
7✔
353
            .lock_guard()
7✔
354
            .await
7✔
355
            .net
356
            .sync_anchor
357
            .as_ref()
7✔
358
            .is_some_and(|x| {
7✔
359
                x.champion
×
360
                    .is_none_or(|(height, _)| height < last_block_height)
×
361
            });
7✔
362
        if !is_canonical && !sync_mode_active_and_have_new_champion {
7✔
363
            warn!(
1✔
364
                "Received {} blocks from peer but incoming blocks are less \
×
365
            canonical than current tip, or current sync-champion.",
×
366
                received_blocks.len()
×
367
            );
368
            return Ok(None);
1✔
369
        }
6✔
370

6✔
371
        // Send the new blocks to the main task which handles the state update
6✔
372
        // and storage to the database.
6✔
373
        let number_of_received_blocks = received_blocks.len();
6✔
374
        self.to_main_tx
6✔
375
            .send(PeerTaskToMain::NewBlocks(received_blocks))
6✔
376
            .await?;
6✔
377
        info!(
6✔
378
            "Updated block info by block from peer. block height {}",
×
379
            last_block_height
380
        );
381

382
        // Valuable, new, hard-to-produce information. Reward peer.
383
        self.reward(PositivePeerSanction::ValidBlocks(number_of_received_blocks))
6✔
384
            .await?;
6✔
385

386
        Ok(Some(last_block_height))
6✔
387
    }
8✔
388

389
    /// Take a single block received from a peer and (attempt to) find a path
390
    /// between the received block and a common ancestor stored in the blocks
391
    /// database.
392
    ///
393
    /// This function attempts to find the parent of the received block, either
394
    /// by searching the database or by requesting it from a peer.
395
    ///  - If the parent is not stored, it is requested from the peer and the
396
    ///    received block is pushed to the fork reconciliation list for later
397
    ///    handling by this function. The fork reconciliation list starts out
398
    ///    empty, but grows as more parents are requested and transmitted.
399
    ///  - If the parent is found in the database, a) block handling continues:
400
    ///    the entire list of fork reconciliation blocks are passed down the
401
    ///    pipeline, potentially leading to a state update; and b) the fork
402
    ///    reconciliation list is cleared.
403
    ///
404
    /// Locking:
405
    ///   * Acquires `global_state_lock` for write via `self.punish(..)` and
406
    ///     `self.reward(..)`.
407
    async fn try_ensure_path<S>(
20✔
408
        &mut self,
20✔
409
        received_block: Box<Block>,
20✔
410
        peer: &mut S,
20✔
411
        peer_state: &mut MutablePeerState,
20✔
412
    ) -> Result<()>
20✔
413
    where
20✔
414
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
20✔
415
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
20✔
416
        <S as TryStream>::Error: std::error::Error,
20✔
417
    {
20✔
418
        // Does the received block match the fork reconciliation list?
419
        let received_block_matches_fork_reconciliation_list = if let Some(successor) =
20✔
420
            peer_state.fork_reconciliation_blocks.last()
20✔
421
        {
422
            let valid = successor
10✔
423
                .is_valid(received_block.as_ref(), self.now())
10✔
424
                .await;
10✔
425
            if !valid {
10✔
426
                warn!(
×
427
                        "Fork reconciliation failed after receiving {} blocks: successor of received block is invalid",
×
428
                        peer_state.fork_reconciliation_blocks.len() + 1
×
429
                    );
430
            }
10✔
431
            valid
10✔
432
        } else {
433
            true
10✔
434
        };
435

436
        // Are we running out of RAM?
437
        let too_many_blocks = peer_state.fork_reconciliation_blocks.len() + 1
20✔
438
            >= self.global_state_lock.cli().sync_mode_threshold;
20✔
439
        if too_many_blocks {
20✔
440
            warn!(
1✔
441
                "Fork reconciliation failed after receiving {} blocks: block count exceeds sync mode threshold",
×
442
                peer_state.fork_reconciliation_blocks.len() + 1
×
443
            );
444
        }
19✔
445

446
        // Block mismatch or too many blocks: abort!
447
        if !received_block_matches_fork_reconciliation_list || too_many_blocks {
20✔
448
            self.punish(NegativePeerSanction::ForkResolutionError((
1✔
449
                received_block.header().height,
1✔
450
                peer_state.fork_reconciliation_blocks.len() as u16,
1✔
451
                received_block.hash(),
1✔
452
            )))
1✔
453
            .await?;
1✔
454
            peer_state.fork_reconciliation_blocks = vec![];
1✔
455
            return Ok(());
1✔
456
        }
19✔
457

19✔
458
        // otherwise, append
19✔
459
        peer_state.fork_reconciliation_blocks.push(*received_block);
19✔
460

19✔
461
        // Try fetch parent
19✔
462
        let received_block_header = *peer_state
19✔
463
            .fork_reconciliation_blocks
19✔
464
            .last()
19✔
465
            .unwrap()
19✔
466
            .header();
19✔
467

19✔
468
        let parent_digest = received_block_header.prev_block_digest;
19✔
469
        let parent_height = received_block_header.height.previous()
19✔
470
            .expect("transferred block must have previous height because genesis block cannot be transferred");
19✔
471
        debug!("Try ensure path: fetching parent block");
19✔
472
        let parent_block = self
19✔
473
            .global_state_lock
19✔
474
            .lock_guard()
19✔
475
            .await
19✔
476
            .chain
477
            .archival_state()
19✔
478
            .get_block(parent_digest)
19✔
479
            .await?;
19✔
480
        debug!(
19✔
481
            "Completed parent block fetching from DB: {}",
×
482
            if parent_block.is_some() {
×
483
                "found".to_string()
×
484
            } else {
485
                "not found".to_string()
×
486
            }
487
        );
488

489
        // If parent is not known (but not genesis) request it.
490
        let Some(parent_block) = parent_block else {
19✔
491
            if parent_height.is_genesis() {
11✔
492
                peer_state.fork_reconciliation_blocks.clear();
1✔
493
                self.punish(NegativePeerSanction::DifferentGenesis).await?;
1✔
494
                return Ok(());
×
495
            }
10✔
496
            info!(
10✔
497
                "Parent not known: Requesting previous block with height {} from peer",
×
498
                parent_height
499
            );
500

501
            peer.send(PeerMessage::BlockRequestByHash(parent_digest))
10✔
502
                .await?;
10✔
503

504
            return Ok(());
10✔
505
        };
506

507
        // We want to treat the received fork reconciliation blocks (plus the
508
        // received block) in reverse order, from oldest to newest, because
509
        // they were requested from high to low block height.
510
        let mut new_blocks = peer_state.fork_reconciliation_blocks.clone();
8✔
511
        new_blocks.reverse();
8✔
512

8✔
513
        // Reset the fork resolution state since we got all the way back to a
8✔
514
        // block that we have.
8✔
515
        let fork_reconciliation_event = !peer_state.fork_reconciliation_blocks.is_empty();
8✔
516
        peer_state.fork_reconciliation_blocks.clear();
8✔
517

518
        if let Some(new_block_height) = self.handle_blocks(new_blocks, parent_block).await? {
8✔
519
            // If `BlockNotification` was received during a block reconciliation
520
            // event, then the peer might have one (or more (unlikely)) blocks
521
            // that we do not have. We should thus request those blocks.
522
            if fork_reconciliation_event
6✔
523
                && peer_state.highest_shared_block_height > new_block_height
6✔
524
            {
525
                peer.send(PeerMessage::BlockRequestByHeight(
1✔
526
                    peer_state.highest_shared_block_height,
1✔
527
                ))
1✔
528
                .await?;
1✔
529
            }
5✔
530
        }
2✔
531

532
        Ok(())
8✔
533
    }
20✔
534

535
    /// Handle peer messages and returns Ok(true) if connection should be closed.
536
    /// Connection should also be closed if an error is returned.
537
    /// Otherwise, returns OK(false).
538
    ///
539
    /// Locking:
540
    ///   * Acquires `global_state_lock` for read.
541
    ///   * Acquires `global_state_lock` for write via `self.punish(..)` and
542
    ///     `self.reward(..)`.
543
    async fn handle_peer_message<S>(
93✔
544
        &mut self,
93✔
545
        msg: PeerMessage,
93✔
546
        peer: &mut S,
93✔
547
        peer_state_info: &mut MutablePeerState,
93✔
548
    ) -> Result<bool>
93✔
549
    where
93✔
550
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
93✔
551
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
93✔
552
        <S as TryStream>::Error: std::error::Error,
93✔
553
    {
93✔
554
        debug!(
93✔
555
            "Received {} from peer {}",
×
556
            msg.get_type(),
×
557
            self.peer_address
558
        );
559
        match msg {
93✔
560
            PeerMessage::Bye => {
561
                // Note that the current peer is not removed from the global_state.peer_map here
562
                // but that this is done by the caller.
563
                info!("Got bye. Closing connection to peer");
40✔
564
                Ok(DISCONNECT_CONNECTION)
40✔
565
            }
566
            PeerMessage::PeerListRequest => {
567
                let peer_info = {
2✔
568
                    log_slow_scope!(fn_name!() + "::PeerMessage::PeerListRequest");
2✔
569

570
                    // We are interested in the address on which peers accept ingoing connections,
571
                    // not in the address in which they are connected to us. We are only interested in
572
                    // peers that accept incoming connections.
573
                    let mut peer_info: Vec<(SocketAddr, u128)> = self
2✔
574
                        .global_state_lock
2✔
575
                        .lock_guard()
2✔
576
                        .await
2✔
577
                        .net
578
                        .peer_map
579
                        .values()
2✔
580
                        .filter(|peer_info| peer_info.listen_address().is_some())
5✔
581
                        .take(MAX_PEER_LIST_LENGTH) // limit length of response
2✔
582
                        .map(|peer_info| {
5✔
583
                            (
5✔
584
                                // unwrap is safe bc of above `filter`
5✔
585
                                peer_info.listen_address().unwrap(),
5✔
586
                                peer_info.instance_id(),
5✔
587
                            )
5✔
588
                        })
5✔
589
                        .collect();
2✔
590

2✔
591
                    // We sort the returned list, so this function is easier to test
2✔
592
                    peer_info.sort_by_cached_key(|x| x.0);
5✔
593
                    peer_info
2✔
594
                };
2✔
595

2✔
596
                debug!("Responding with: {:?}", peer_info);
2✔
597
                peer.send(PeerMessage::PeerListResponse(peer_info)).await?;
2✔
598
                Ok(KEEP_CONNECTION_ALIVE)
2✔
599
            }
600
            PeerMessage::PeerListResponse(peers) => {
×
601
                log_slow_scope!(fn_name!() + "::PeerMessage::PeerListResponse");
×
602

×
603
                if peers.len() > MAX_PEER_LIST_LENGTH {
×
604
                    self.punish(NegativePeerSanction::FloodPeerListResponse)
×
605
                        .await?;
×
606
                }
×
607
                self.to_main_tx
×
608
                    .send(PeerTaskToMain::PeerDiscoveryAnswer((
×
609
                        peers,
×
610
                        self.peer_address,
×
611
                        // The distance to the revealed peers is 1 + this peer's distance
×
612
                        self.distance + 1,
×
613
                    )))
×
614
                    .await?;
×
615
                Ok(KEEP_CONNECTION_ALIVE)
×
616
            }
617
            PeerMessage::BlockNotificationRequest => {
618
                debug!("Got BlockNotificationRequest");
×
619

620
                peer.send(PeerMessage::BlockNotification(
×
621
                    self.global_state_lock
×
622
                        .lock_guard()
×
623
                        .await
×
624
                        .chain
625
                        .light_state()
×
626
                        .into(),
×
627
                ))
×
628
                .await?;
×
629

630
                Ok(KEEP_CONNECTION_ALIVE)
×
631
            }
632
            PeerMessage::BlockNotification(block_notification) => {
4✔
633
                const SYNC_CHALLENGE_COOLDOWN: Timestamp = Timestamp::minutes(10);
634

635
                let (tip_header, sync_anchor_is_set) = {
4✔
636
                    let state = self.global_state_lock.lock_guard().await;
4✔
637
                    (
4✔
638
                        *state.chain.light_state().header(),
4✔
639
                        state.net.sync_anchor.is_some(),
4✔
640
                    )
4✔
641
                };
4✔
642
                debug!(
4✔
643
                    "Got BlockNotification of height {}. Own height is {}",
×
644
                    block_notification.height, tip_header.height
645
                );
646

647
                let sync_mode_threshold = self.global_state_lock.cli().sync_mode_threshold;
4✔
648
                let now = self.now();
4✔
649
                let time_since_latest_successful_challenge = peer_state_info
4✔
650
                    .successful_sync_challenge_response_time
4✔
651
                    .map(|then| now - then);
4✔
652
                let cooldown_expired = time_since_latest_successful_challenge
4✔
653
                    .is_none_or(|time_passed| time_passed > SYNC_CHALLENGE_COOLDOWN);
4✔
654
                let exceeds_sync_mode_threshold = GlobalState::sync_mode_threshold_stateless(
4✔
655
                    &tip_header,
4✔
656
                    block_notification.height,
4✔
657
                    block_notification.cumulative_proof_of_work,
4✔
658
                    sync_mode_threshold,
4✔
659
                );
4✔
660
                if cooldown_expired && exceeds_sync_mode_threshold {
4✔
661
                    debug!("sync mode criterion satisfied.");
1✔
662

663
                    if peer_state_info.sync_challenge.is_some() {
1✔
664
                        warn!("Cannot launch new sync challenge because one is already on-going.");
×
665
                        return Ok(KEEP_CONNECTION_ALIVE);
×
666
                    }
1✔
667

1✔
668
                    info!(
1✔
669
                        "Peer indicates block which satisfies sync mode criterion, issuing challenge."
×
670
                    );
671
                    let challenge = SyncChallenge::generate(
1✔
672
                        &block_notification,
1✔
673
                        tip_header.height,
1✔
674
                        self.rng.random(),
1✔
675
                    );
1✔
676
                    peer_state_info.sync_challenge = Some(IssuedSyncChallenge::new(
1✔
677
                        challenge,
1✔
678
                        block_notification.cumulative_proof_of_work,
1✔
679
                        self.now(),
1✔
680
                    ));
1✔
681

1✔
682
                    debug!("sending challenge ...");
1✔
683
                    peer.send(PeerMessage::SyncChallenge(challenge)).await?;
1✔
684

685
                    return Ok(KEEP_CONNECTION_ALIVE);
1✔
686
                }
3✔
687

3✔
688
                peer_state_info.highest_shared_block_height = block_notification.height;
3✔
689
                let block_is_new = tip_header.cumulative_proof_of_work
3✔
690
                    < block_notification.cumulative_proof_of_work;
3✔
691

3✔
692
                debug!("block_is_new: {}", block_is_new);
3✔
693

694
                if block_is_new
3✔
695
                    && peer_state_info.fork_reconciliation_blocks.is_empty()
3✔
696
                    && !sync_anchor_is_set
2✔
697
                    && !exceeds_sync_mode_threshold
2✔
698
                {
699
                    debug!(
1✔
700
                        "sending BlockRequestByHeight to peer for block with height {}",
×
701
                        block_notification.height
702
                    );
703
                    peer.send(PeerMessage::BlockRequestByHeight(block_notification.height))
1✔
704
                        .await?;
1✔
705
                } else {
706
                    debug!(
2✔
707
                        "ignoring peer block. height {}. new: {}, reconciling_fork: {}",
×
708
                        block_notification.height,
×
709
                        block_is_new,
×
710
                        !peer_state_info.fork_reconciliation_blocks.is_empty()
×
711
                    );
712
                }
713

714
                Ok(KEEP_CONNECTION_ALIVE)
3✔
715
            }
716
            PeerMessage::SyncChallenge(sync_challenge) => {
2✔
717
                let response = {
×
718
                    log_slow_scope!(fn_name!() + "::PeerMessage::SyncChallenge");
2✔
719

2✔
720
                    info!("Got sync challenge from {}", self.peer_address.ip());
2✔
721

722
                    let response = self
2✔
723
                        .global_state_lock
2✔
724
                        .lock_guard()
2✔
725
                        .await
2✔
726
                        .response_to_sync_challenge(sync_challenge)
2✔
727
                        .await;
2✔
728

729
                    match response {
2✔
730
                        Ok(resp) => resp,
×
731
                        Err(e) => {
2✔
732
                            warn!("could not generate sync challenge response:\n{e}");
2✔
733
                            self.punish(NegativePeerSanction::InvalidSyncChallenge)
2✔
734
                                .await?;
2✔
735
                            return Ok(KEEP_CONNECTION_ALIVE);
2✔
736
                        }
737
                    }
738
                };
739

740
                info!(
×
741
                    "Responding to sync challenge from {}",
×
742
                    self.peer_address.ip()
×
743
                );
744
                peer.send(PeerMessage::SyncChallengeResponse(Box::new(response)))
×
745
                    .await?;
×
746

747
                Ok(KEEP_CONNECTION_ALIVE)
×
748
            }
749
            PeerMessage::SyncChallengeResponse(challenge_response) => {
1✔
750
                const SYNC_RESPONSE_TIMEOUT: Timestamp = Timestamp::seconds(45);
751

752
                log_slow_scope!(fn_name!() + "::PeerMessage::SyncChallengeResponse");
1✔
753
                info!(
1✔
754
                    "Got sync challenge response from {}",
×
755
                    self.peer_address.ip()
×
756
                );
757

758
                // The purpose of the sync challenge and sync challenge response
759
                // is to avoid going into sync mode based on a malicious target
760
                // fork. Instead of verifying that the claimed proof-of-work
761
                // number is correct (which would require sending and verifying,
762
                // at least, all blocks between luca (whatever that is) and the
763
                // claimed tip), we use a heuristic that requires less
764
                // communication and less verification work. The downside of
765
                // using a heuristic here is a nonzero false positive and false
766
                // negative rate. Note that the false negative event
767
                // (maliciously sending someone into sync mode based on a bogus
768
                // fork) still requires a significant amount of work from the
769
                // attacker, *in addition* to being lucky. Also, down the line
770
                // succinctness (and more specifically, recursive block
771
                // validation) obviates this entire subprotocol.
772

773
                // Did we issue a challenge?
774
                let Some(issued_challenge) = peer_state_info.sync_challenge else {
1✔
775
                    warn!("Sync challenge response was not prompted.");
×
776
                    self.punish(NegativePeerSanction::UnexpectedSyncChallengeResponse)
×
777
                        .await?;
×
778
                    return Ok(KEEP_CONNECTION_ALIVE);
×
779
                };
780

781
                // Reset the challenge, regardless of the response's success.
782
                peer_state_info.sync_challenge = None;
1✔
783

1✔
784
                // Does response match issued challenge?
1✔
785
                if !challenge_response.matches(issued_challenge) {
1✔
786
                    self.punish(NegativePeerSanction::InvalidSyncChallengeResponse)
×
787
                        .await?;
×
788
                    return Ok(KEEP_CONNECTION_ALIVE);
×
789
                }
1✔
790

1✔
791
                // Does response verify?
1✔
792
                let claimed_tip_height = challenge_response.tip.header.height;
1✔
793
                let now = self.now();
1✔
794
                if !challenge_response.is_valid(now).await {
1✔
795
                    self.punish(NegativePeerSanction::InvalidSyncChallengeResponse)
×
796
                        .await?;
×
797
                    return Ok(KEEP_CONNECTION_ALIVE);
×
798
                }
1✔
799

800
                // Does cumulative proof-of-work evolve reasonably?
801
                let own_tip_header = *self
1✔
802
                    .global_state_lock
1✔
803
                    .lock_guard()
1✔
804
                    .await
1✔
805
                    .chain
806
                    .light_state()
1✔
807
                    .header();
1✔
808
                if !challenge_response
1✔
809
                    .check_pow(self.global_state_lock.cli().network, own_tip_header.height)
1✔
810
                {
811
                    self.punish(NegativePeerSanction::FishyPowEvolutionChallengeResponse)
×
812
                        .await?;
×
813
                    return Ok(KEEP_CONNECTION_ALIVE);
×
814
                }
1✔
815

1✔
816
                // Is there some specific (*i.e.*, not aggregate) proof of work?
1✔
817
                if !challenge_response.check_difficulty(own_tip_header.difficulty) {
1✔
818
                    self.punish(NegativePeerSanction::FishyDifficultiesChallengeResponse)
×
819
                        .await?;
×
820
                    return Ok(KEEP_CONNECTION_ALIVE);
×
821
                }
1✔
822

1✔
823
                // Did it come in time?
1✔
824
                if now - issued_challenge.issued_at > SYNC_RESPONSE_TIMEOUT {
1✔
825
                    self.punish(NegativePeerSanction::TimedOutSyncChallengeResponse)
×
826
                        .await?;
×
827
                    return Ok(KEEP_CONNECTION_ALIVE);
×
828
                }
1✔
829

1✔
830
                info!("Successful sync challenge response; relaying peer tip info to main loop.");
1✔
831
                peer_state_info.successful_sync_challenge_response_time = Some(now);
1✔
832

1✔
833
                let mut sync_mmra_anchor = challenge_response.tip.body.block_mmr_accumulator;
1✔
834
                sync_mmra_anchor.append(issued_challenge.challenge.tip_digest);
1✔
835

1✔
836
                // Inform main loop
1✔
837
                self.to_main_tx
1✔
838
                    .send(PeerTaskToMain::AddPeerMaxBlockHeight {
1✔
839
                        peer_address: self.peer_address,
1✔
840
                        claimed_height: claimed_tip_height,
1✔
841
                        claimed_cumulative_pow: issued_challenge.accumulated_pow,
1✔
842
                        claimed_block_mmra: sync_mmra_anchor,
1✔
843
                    })
1✔
844
                    .await?;
1✔
845

846
                Ok(KEEP_CONNECTION_ALIVE)
1✔
847
            }
848
            PeerMessage::BlockRequestByHash(block_digest) => {
×
NEW
849
                let block = self
×
850
                    .global_state_lock
×
851
                    .lock_guard()
×
852
                    .await
×
853
                    .chain
854
                    .archival_state()
×
855
                    .get_block(block_digest)
×
NEW
856
                    .await?;
×
857

NEW
858
                match block {
×
859
                    None => {
860
                        // TODO: Consider punishing here
861
                        warn!("Peer requested unknown block with hash {}", block_digest);
×
862
                        Ok(KEEP_CONNECTION_ALIVE)
×
863
                    }
864
                    Some(b) => {
×
865
                        peer.send(PeerMessage::Block(Box::new(b.try_into().unwrap())))
×
866
                            .await?;
×
867
                        Ok(KEEP_CONNECTION_ALIVE)
×
868
                    }
869
                }
870
            }
871
            PeerMessage::BlockRequestByHeight(block_height) => {
4✔
872
                let block_response = {
3✔
873
                    log_slow_scope!(fn_name!() + "::PeerMessage::BlockRequestByHeight");
4✔
874

4✔
875
                    debug!("Got BlockRequestByHeight of height {}", block_height);
4✔
876

877
                    let canonical_block_digest = self
4✔
878
                        .global_state_lock
4✔
879
                        .lock_guard()
4✔
880
                        .await
4✔
881
                        .chain
882
                        .archival_state()
4✔
883
                        .archival_block_mmr
4✔
884
                        .ammr()
4✔
885
                        .try_get_leaf(block_height.into())
4✔
886
                        .await;
4✔
887

888
                    let canonical_block_digest = match canonical_block_digest {
4✔
889
                        None => {
890
                            let own_tip_height = self
1✔
891
                                .global_state_lock
1✔
892
                                .lock_guard()
1✔
893
                                .await
1✔
894
                                .chain
895
                                .light_state()
1✔
896
                                .header()
1✔
897
                                .height;
1✔
898
                            warn!("Got block request by height ({block_height}) for unknown block. Own tip height is {own_tip_height}.");
1✔
899
                            self.punish(NegativePeerSanction::BlockRequestUnknownHeight)
1✔
900
                                .await?;
1✔
901

902
                            return Ok(KEEP_CONNECTION_ALIVE);
1✔
903
                        }
904
                        Some(digest) => digest,
3✔
905
                    };
906

907
                    let canonical_chain_block: Block = self
3✔
908
                        .global_state_lock
3✔
909
                        .lock_guard()
3✔
910
                        .await
3✔
911
                        .chain
912
                        .archival_state()
3✔
913
                        .get_block(canonical_block_digest)
3✔
914
                        .await?
3✔
915
                        .unwrap();
3✔
916

3✔
917
                    PeerMessage::Block(Box::new(canonical_chain_block.try_into().unwrap()))
3✔
918
                };
3✔
919

3✔
920
                debug!("Sending block");
3✔
921
                peer.send(block_response).await?;
3✔
922
                debug!("Sent block");
3✔
923
                Ok(KEEP_CONNECTION_ALIVE)
3✔
924
            }
925
            PeerMessage::Block(t_block) => {
20✔
926
                log_slow_scope!(fn_name!() + "::PeerMessage::Block");
20✔
927

20✔
928
                info!(
20✔
929
                    "Got new block from peer {}, height {}, mined {}",
×
930
                    self.peer_address,
×
931
                    t_block.header.height,
×
932
                    t_block.header.timestamp.standard_format()
×
933
                );
934
                let new_block_height = t_block.header.height;
20✔
935

936
                let block = match Block::try_from(*t_block) {
20✔
937
                    Ok(block) => Box::new(block),
20✔
938
                    Err(e) => {
×
939
                        warn!("Peer sent invalid block: {e:?}");
×
940
                        self.punish(NegativePeerSanction::InvalidTransferBlock)
×
941
                            .await?;
×
942

943
                        return Ok(KEEP_CONNECTION_ALIVE);
×
944
                    }
945
                };
946

947
                // Update the value for the highest known height that peer possesses iff
948
                // we are not in a fork reconciliation state.
949
                if peer_state_info.fork_reconciliation_blocks.is_empty() {
20✔
950
                    peer_state_info.highest_shared_block_height = new_block_height;
10✔
951
                }
10✔
952

953
                self.try_ensure_path(block, peer, peer_state_info).await?;
20✔
954

955
                // Reward happens as part of `try_ensure_path`
956

957
                Ok(KEEP_CONNECTION_ALIVE)
19✔
958
            }
959
            PeerMessage::BlockRequestBatch(BlockRequestBatch {
960
                known_blocks,
8✔
961
                max_response_len,
8✔
962
                anchor,
8✔
963
            }) => {
8✔
964
                debug!(
8✔
965
                    "Received BlockRequestBatch from peer {}, max_response_len: {max_response_len}",
×
966
                    self.peer_address
967
                );
968

969
                if known_blocks.len() > MAX_NUM_DIGESTS_IN_BATCH_REQUEST {
8✔
970
                    self.punish(NegativePeerSanction::BatchBlocksRequestTooManyDigests)
×
971
                        .await?;
×
972

973
                    return Ok(KEEP_CONNECTION_ALIVE);
×
974
                }
8✔
975

976
                // The last block in the list of the peers known block is the
977
                // earliest block, block with lowest height, the peer has
978
                // requested. If it does not belong to canonical chain, none of
979
                // the later will. So we can do an early abort in that case.
980
                let least_preferred = match known_blocks.last() {
8✔
981
                    Some(least_preferred) => *least_preferred,
8✔
982
                    None => {
983
                        self.punish(NegativePeerSanction::BatchBlocksRequestEmpty)
×
984
                            .await?;
×
985

986
                        return Ok(KEEP_CONNECTION_ALIVE);
×
987
                    }
988
                };
989

990
                let state = self.global_state_lock.lock_guard().await;
8✔
991
                let block_mmr_num_leafs = state.chain.light_state().header().height.next().into();
8✔
992
                let luca_is_known = state
8✔
993
                    .chain
8✔
994
                    .archival_state()
8✔
995
                    .block_belongs_to_canonical_chain(least_preferred)
8✔
996
                    .await;
8✔
997
                if !luca_is_known || anchor.num_leafs() > block_mmr_num_leafs {
8✔
998
                    drop(state);
×
999
                    self.punish(NegativePeerSanction::BatchBlocksUnknownRequest)
×
1000
                        .await?;
×
1001
                    peer.send(PeerMessage::UnableToSatisfyBatchRequest).await?;
×
1002

1003
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1004
                }
8✔
1005

1006
                // Happy case: At least *one* of the blocks referenced by peer
1007
                // is known to us.
1008
                let first_block_in_response = {
8✔
1009
                    let mut first_block_in_response: Option<BlockHeight> = None;
8✔
1010
                    for block_digest in known_blocks {
10✔
1011
                        if state
10✔
1012
                            .chain
10✔
1013
                            .archival_state()
10✔
1014
                            .block_belongs_to_canonical_chain(block_digest)
10✔
1015
                            .await
10✔
1016
                        {
1017
                            let height = state
8✔
1018
                                .chain
8✔
1019
                                .archival_state()
8✔
1020
                                .get_block_header(block_digest)
8✔
1021
                                .await
8✔
1022
                                .unwrap()
8✔
1023
                                .height;
8✔
1024
                            first_block_in_response = Some(height);
8✔
1025
                            debug!(
8✔
1026
                                "Found block in canonical chain for batch response: {}",
×
1027
                                block_digest
1028
                            );
1029
                            break;
8✔
1030
                        }
2✔
1031
                    }
1032

1033
                    first_block_in_response
8✔
1034
                        .expect("existence of LUCA should have been established already.")
8✔
1035
                };
8✔
1036

8✔
1037
                debug!(
8✔
1038
                    "Peer's most preferred block has height {first_block_in_response}.\
×
1039
                 Now building response from that height."
×
1040
                );
1041

1042
                // Get the relevant blocks, at most batch-size many, descending from the
1043
                // peer's (alleged) most canonical block. Don't exceed `max_response_len`
1044
                // or `STANDARD_BLOCK_BATCH_SIZE` number of blocks in response.
1045
                let max_response_len = cmp::min(
8✔
1046
                    max_response_len,
8✔
1047
                    self.global_state_lock.cli().sync_mode_threshold,
8✔
1048
                );
8✔
1049
                let max_response_len = cmp::max(max_response_len, MINIMUM_BLOCK_BATCH_SIZE);
8✔
1050
                let max_response_len = cmp::min(max_response_len, STANDARD_BLOCK_BATCH_SIZE);
8✔
1051

8✔
1052
                let mut digests_of_returned_blocks = Vec::with_capacity(max_response_len);
8✔
1053
                let response_start_height: u64 = first_block_in_response.into();
8✔
1054
                let mut i: u64 = 1;
8✔
1055
                while digests_of_returned_blocks.len() < max_response_len {
31✔
1056
                    let block_height = response_start_height + i;
31✔
1057
                    match state
31✔
1058
                        .chain
31✔
1059
                        .archival_state()
31✔
1060
                        .archival_block_mmr
31✔
1061
                        .ammr()
31✔
1062
                        .try_get_leaf(block_height)
31✔
1063
                        .await
31✔
1064
                    {
1065
                        Some(digest) => {
23✔
1066
                            digests_of_returned_blocks.push(digest);
23✔
1067
                        }
23✔
1068
                        None => break,
8✔
1069
                    }
1070
                    i += 1;
23✔
1071
                }
1072

1073
                let mut returned_blocks: Vec<Block> =
8✔
1074
                    Vec::with_capacity(digests_of_returned_blocks.len());
8✔
1075
                for block_digest in digests_of_returned_blocks {
31✔
1076
                    let block = state
23✔
1077
                        .chain
23✔
1078
                        .archival_state()
23✔
1079
                        .get_block(block_digest)
23✔
1080
                        .await?
23✔
1081
                        .unwrap();
23✔
1082
                    returned_blocks.push(block);
23✔
1083
                }
1084

1085
                let response = Self::batch_response(&state, returned_blocks, &anchor).await;
8✔
1086

1087
                // issue 457. do not hold lock across a peer.send(), nor self.punish()
1088
                drop(state);
8✔
1089

1090
                let Some(response) = response else {
8✔
1091
                    warn!("Unable to satisfy batch-block request");
×
1092
                    self.punish(NegativePeerSanction::BatchBlocksUnknownRequest)
×
1093
                        .await?;
×
1094
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1095
                };
1096

1097
                debug!("Returning {} blocks in batch response", response.len());
8✔
1098

1099
                let response = PeerMessage::BlockResponseBatch(response);
8✔
1100
                peer.send(response).await?;
8✔
1101

1102
                Ok(KEEP_CONNECTION_ALIVE)
8✔
1103
            }
1104
            PeerMessage::BlockResponseBatch(authenticated_blocks) => {
×
1105
                log_slow_scope!(fn_name!() + "::PeerMessage::BlockResponseBatch");
×
1106

×
1107
                debug!(
×
1108
                    "handling block response batch with {} blocks",
×
1109
                    authenticated_blocks.len()
×
1110
                );
1111

1112
                // (Alan:) why is there even a minimum?
1113
                if authenticated_blocks.len() < MINIMUM_BLOCK_BATCH_SIZE {
×
1114
                    warn!("Got smaller batch response than allowed");
×
1115
                    self.punish(NegativePeerSanction::TooShortBlockBatch)
×
1116
                        .await?;
×
1117
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1118
                }
×
1119

1120
                // Verify that we are in fact in syncing mode
1121
                // TODO: Separate peer messages into those allowed under syncing
1122
                // and those that are not
1123
                let Some(sync_anchor) = self
×
1124
                    .global_state_lock
×
1125
                    .lock_guard()
×
1126
                    .await
×
1127
                    .net
1128
                    .sync_anchor
1129
                    .clone()
×
1130
                else {
1131
                    warn!("Received a batch of blocks without being in syncing mode");
×
1132
                    self.punish(NegativePeerSanction::ReceivedBatchBlocksOutsideOfSync)
×
1133
                        .await?;
×
1134
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1135
                };
1136

1137
                // Verify that the response matches the current state
1138
                // We get the latest block from the DB here since this message is
1139
                // only valid for archival nodes.
1140
                let (first_block, _) = &authenticated_blocks[0];
×
1141
                let first_blocks_parent_digest: Digest = first_block.header.prev_block_digest;
×
1142
                let most_canonical_own_block_match: Option<Block> = self
×
1143
                    .global_state_lock
×
1144
                    .lock_guard()
×
1145
                    .await
×
1146
                    .chain
1147
                    .archival_state()
×
1148
                    .get_block(first_blocks_parent_digest)
×
1149
                    .await
×
1150
                    .expect("Block lookup must succeed");
×
1151
                let most_canonical_own_block_match: Block = match most_canonical_own_block_match {
×
1152
                    Some(block) => block,
×
1153
                    None => {
1154
                        warn!("Got batch response with invalid start block");
×
1155
                        self.punish(NegativePeerSanction::BatchBlocksInvalidStartHeight)
×
1156
                            .await?;
×
1157
                        return Ok(KEEP_CONNECTION_ALIVE);
×
1158
                    }
1159
                };
1160

1161
                // Convert all blocks to Block objects
1162
                debug!(
×
1163
                    "Found own block of height {} to match received batch",
×
1164
                    most_canonical_own_block_match.kernel.header.height
×
1165
                );
1166
                let mut received_blocks = vec![];
×
1167
                for (t_block, membership_proof) in authenticated_blocks {
×
1168
                    let Ok(block) = Block::try_from(t_block) else {
×
1169
                        warn!("Received invalid transfer block from peer");
×
1170
                        self.punish(NegativePeerSanction::InvalidTransferBlock)
×
1171
                            .await?;
×
1172
                        return Ok(KEEP_CONNECTION_ALIVE);
×
1173
                    };
1174

1175
                    if !membership_proof.verify(
×
1176
                        block.header().height.into(),
×
1177
                        block.hash(),
×
1178
                        &sync_anchor.block_mmr.peaks(),
×
1179
                        sync_anchor.block_mmr.num_leafs(),
×
1180
                    ) {
×
1181
                        warn!("Authentication of received block fails relative to anchor");
×
1182
                        self.punish(NegativePeerSanction::InvalidBlockMmrAuthentication)
×
1183
                            .await?;
×
1184
                        return Ok(KEEP_CONNECTION_ALIVE);
×
1185
                    }
×
1186

×
1187
                    received_blocks.push(block);
×
1188
                }
1189

1190
                // Get the latest block that we know of and handle all received blocks
1191
                self.handle_blocks(received_blocks, most_canonical_own_block_match)
×
1192
                    .await?;
×
1193

1194
                // Reward happens as part of `handle_blocks`.
1195

1196
                Ok(KEEP_CONNECTION_ALIVE)
×
1197
            }
1198
            PeerMessage::UnableToSatisfyBatchRequest => {
1199
                log_slow_scope!(fn_name!() + "::PeerMessage::UnableToSatisfyBatchRequest");
×
1200
                warn!(
×
1201
                    "Peer {} reports inability to satisfy batch request.",
×
1202
                    self.peer_address
1203
                );
1204

1205
                Ok(KEEP_CONNECTION_ALIVE)
×
1206
            }
1207
            PeerMessage::Handshake(_) => {
1208
                log_slow_scope!(fn_name!() + "::PeerMessage::Handshake");
×
1209

×
1210
                // The handshake should have been sent during connection
×
1211
                // initialization. Here it is out of order at best, malicious at
×
1212
                // worst.
×
1213
                self.punish(NegativePeerSanction::InvalidMessage).await?;
×
1214
                Ok(KEEP_CONNECTION_ALIVE)
×
1215
            }
1216
            PeerMessage::ConnectionStatus(_) => {
1217
                log_slow_scope!(fn_name!() + "::PeerMessage::ConnectionStatus");
×
1218

×
1219
                // The connection status should have been sent during connection
×
1220
                // initialization. Here it is out of order at best, malicious at
×
1221
                // worst.
×
1222

×
1223
                self.punish(NegativePeerSanction::InvalidMessage).await?;
×
1224
                Ok(KEEP_CONNECTION_ALIVE)
×
1225
            }
1226
            PeerMessage::Transaction(transaction) => {
2✔
1227
                log_slow_scope!(fn_name!() + "::PeerMessage::Transaction");
2✔
1228

2✔
1229
                debug!(
2✔
1230
                    "`peer_loop` received following transaction from peer. {} inputs, {} outputs. Synced to mutator set hash: {}",
×
1231
                    transaction.kernel.inputs.len(),
×
1232
                    transaction.kernel.outputs.len(),
×
1233
                    transaction.kernel.mutator_set_hash
×
1234
                );
1235

1236
                let transaction: Transaction = (*transaction).into();
2✔
1237

2✔
1238
                // 1. If transaction is invalid, punish.
2✔
1239
                if !transaction.is_valid().await {
2✔
1240
                    warn!("Received invalid tx");
×
1241
                    self.punish(NegativePeerSanction::InvalidTransaction)
×
1242
                        .await?;
×
1243
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1244
                }
2✔
1245

2✔
1246
                // 2. If transaction has coinbase, punish.
2✔
1247
                // Transactions received from peers have not been mined yet.
2✔
1248
                // Only the miner is allowed to produce transactions with non-empty coinbase fields.
2✔
1249
                if transaction.kernel.coinbase.is_some() {
2✔
1250
                    warn!("Received non-mined transaction with coinbase.");
×
1251
                    self.punish(NegativePeerSanction::NonMinedTransactionHasCoinbase)
×
1252
                        .await?;
×
1253
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1254
                }
2✔
1255

2✔
1256
                // 3. If negative fee, punish.
2✔
1257
                if transaction.kernel.fee.is_negative() {
2✔
1258
                    warn!("Received negative-fee transaction.");
×
1259
                    self.punish(NegativePeerSanction::TransactionWithNegativeFee)
×
1260
                        .await?;
×
1261
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1262
                }
2✔
1263

2✔
1264
                // 4. If transaction is already known, ignore.
2✔
1265
                if self
2✔
1266
                    .global_state_lock
2✔
1267
                    .lock_guard()
2✔
1268
                    .await
2✔
1269
                    .mempool
1270
                    .contains_with_higher_proof_quality(
1271
                        transaction.kernel.txid(),
2✔
1272
                        transaction.proof.proof_quality()?,
2✔
1273
                    )
1274
                {
1275
                    warn!("Received transaction that was already known");
×
1276

1277
                    // We received a transaction that we *probably* haven't requested.
1278
                    // Consider punishing here, if this is abused.
1279
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1280
                }
2✔
1281

1282
                // 5. if transaction is not confirmable, punish.
1283
                let (tip, mutator_set_accumulator_after) = {
2✔
1284
                    let state = self.global_state_lock.lock_guard().await;
2✔
1285

1286
                    (
2✔
1287
                        state.chain.light_state().hash(),
2✔
1288
                        state.chain.light_state().mutator_set_accumulator_after(),
2✔
1289
                    )
2✔
1290
                };
2✔
1291
                if !transaction.is_confirmable_relative_to(&mutator_set_accumulator_after) {
2✔
1292
                    warn!(
×
1293
                        "Received unconfirmable transaction with TXID {}. Unconfirmable because:",
×
1294
                        transaction.kernel.txid()
×
1295
                    );
1296
                    // get fine-grained error code for informative logging
1297
                    let confirmability_error_code = transaction
×
1298
                        .kernel
×
1299
                        .is_confirmable_relative_to(&mutator_set_accumulator_after);
×
1300
                    match confirmability_error_code {
×
1301
                        Ok(_) => unreachable!(),
×
1302
                        Err(TransactionConfirmabilityError::InvalidRemovalRecord(index)) => {
×
1303
                            warn!("invalid removal record (at index {index})");
×
1304
                            let invalid_removal_record = transaction.kernel.inputs[index].clone();
×
1305
                            let removal_record_error_code = invalid_removal_record
×
1306
                                .validate_inner(&mutator_set_accumulator_after);
×
1307
                            debug!(
×
1308
                                "Absolute index set of removal record {index}: {:?}",
×
1309
                                invalid_removal_record.absolute_indices
1310
                            );
1311
                            match removal_record_error_code {
×
1312
                                Ok(_) => unreachable!(),
×
1313
                                Err(RemovalRecordValidityError::AbsentAuthenticatedChunk) => {
1314
                                    debug!("invalid because membership proof is missing");
×
1315
                                }
1316
                                Err(RemovalRecordValidityError::InvalidSwbfiMmrMp {
1317
                                    chunk_index,
×
1318
                                }) => {
×
1319
                                    debug!("invalid because membership proof for chunk index {chunk_index} is invalid");
×
1320
                                }
1321
                            };
1322
                            self.punish(NegativePeerSanction::UnconfirmableTransaction)
×
1323
                                .await?;
×
1324
                            return Ok(KEEP_CONNECTION_ALIVE);
×
1325
                        }
1326
                        Err(TransactionConfirmabilityError::DuplicateInputs) => {
1327
                            warn!("duplicate inputs");
×
1328
                            self.punish(NegativePeerSanction::DoubleSpendingTransaction)
×
1329
                                .await?;
×
1330
                            return Ok(KEEP_CONNECTION_ALIVE);
×
1331
                        }
1332
                        Err(TransactionConfirmabilityError::AlreadySpentInput(index)) => {
×
1333
                            warn!("already spent input (at index {index})");
×
1334
                            self.punish(NegativePeerSanction::DoubleSpendingTransaction)
×
1335
                                .await?;
×
1336
                            return Ok(KEEP_CONNECTION_ALIVE);
×
1337
                        }
1338
                    };
1339
                }
2✔
1340

2✔
1341
                // If transaction cannot be applied to mutator set, punish.
2✔
1342
                // I don't think this can happen when above checks pass but we include
2✔
1343
                // the check to ensure that transaction can be applied.
2✔
1344
                let ms_update = MutatorSetUpdate::new(
2✔
1345
                    transaction.kernel.inputs.clone(),
2✔
1346
                    transaction.kernel.outputs.clone(),
2✔
1347
                );
2✔
1348
                let can_apply = ms_update
2✔
1349
                    .apply_to_accumulator(&mut mutator_set_accumulator_after.clone())
2✔
1350
                    .is_ok();
2✔
1351
                if !can_apply {
2✔
1352
                    warn!("Cannot apply transaction to current mutator set.");
×
1353
                    warn!("Transaction ID: {}", transaction.kernel.txid());
×
1354
                    self.punish(NegativePeerSanction::CannotApplyTransactionToMutatorSet)
×
1355
                        .await?;
×
1356
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1357
                }
2✔
1358

2✔
1359
                let tx_timestamp = transaction.kernel.timestamp;
2✔
1360

2✔
1361
                // 6. Ignore if transaction is too old
2✔
1362
                let now = self.now();
2✔
1363
                if tx_timestamp < now - Timestamp::seconds(MEMPOOL_TX_THRESHOLD_AGE_IN_SECS) {
2✔
1364
                    // TODO: Consider punishing here
1365
                    warn!("Received too old tx");
×
1366
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1367
                }
2✔
1368

2✔
1369
                // 7. Ignore if transaction is too far into the future
2✔
1370
                if tx_timestamp
2✔
1371
                    > now + Timestamp::seconds(MEMPOOL_IGNORE_TRANSACTIONS_THIS_MANY_SECS_AHEAD)
2✔
1372
                {
1373
                    // TODO: Consider punishing here
1374
                    warn!("Received tx too far into the future. Got timestamp: {tx_timestamp:?}");
×
1375
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1376
                }
2✔
1377

2✔
1378
                // Otherwise, relay to main
2✔
1379
                let pt2m_transaction = PeerTaskToMainTransaction {
2✔
1380
                    transaction,
2✔
1381
                    confirmable_for_block: tip,
2✔
1382
                };
2✔
1383
                self.to_main_tx
2✔
1384
                    .send(PeerTaskToMain::Transaction(Box::new(pt2m_transaction)))
2✔
1385
                    .await?;
2✔
1386

1387
                Ok(KEEP_CONNECTION_ALIVE)
2✔
1388
            }
1389
            PeerMessage::TransactionNotification(tx_notification) => {
6✔
1390
                // addresses #457
6✔
1391
                // new scope for state read-lock to avoid holding across peer.send()
6✔
1392
                {
6✔
1393
                    log_slow_scope!(fn_name!() + "::PeerMessage::TransactionNotification");
6✔
1394

1395
                    // 1. Ignore if we already know this transaction, and
1396
                    // the proof quality is not higher than what we already know.
1397
                    let state = self.global_state_lock.lock_guard().await;
6✔
1398
                    let transaction_of_same_or_higher_proof_quality_is_known =
6✔
1399
                        state.mempool.contains_with_higher_proof_quality(
6✔
1400
                            tx_notification.txid,
6✔
1401
                            tx_notification.proof_quality,
6✔
1402
                        );
6✔
1403
                    if transaction_of_same_or_higher_proof_quality_is_known {
6✔
1404
                        debug!("transaction with same or higher proof quality was already known");
4✔
1405
                        return Ok(KEEP_CONNECTION_ALIVE);
4✔
1406
                    }
2✔
1407

2✔
1408
                    // Only accept transactions that do not require executing
2✔
1409
                    // `update`.
2✔
1410
                    if state
2✔
1411
                        .chain
2✔
1412
                        .light_state()
2✔
1413
                        .mutator_set_accumulator_after()
2✔
1414
                        .hash()
2✔
1415
                        != tx_notification.mutator_set_hash
2✔
1416
                    {
1417
                        debug!("transaction refers to non-canonical mutator set state");
×
1418
                        return Ok(KEEP_CONNECTION_ALIVE);
×
1419
                    }
2✔
1420
                }
2✔
1421

2✔
1422
                // 2. Request the actual `Transaction` from peer
2✔
1423
                debug!("requesting transaction from peer");
2✔
1424
                peer.send(PeerMessage::TransactionRequest(tx_notification.txid))
2✔
1425
                    .await?;
2✔
1426

1427
                Ok(KEEP_CONNECTION_ALIVE)
2✔
1428
            }
1429
            PeerMessage::TransactionRequest(transaction_identifier) => {
2✔
1430
                let state = self.global_state_lock.lock_guard().await;
2✔
1431
                let Some(transaction) = state.mempool.get(transaction_identifier) else {
2✔
1432
                    return Ok(KEEP_CONNECTION_ALIVE);
1✔
1433
                };
1434

1435
                let Ok(transfer_transaction) = transaction.try_into() else {
1✔
1436
                    warn!("Peer requested transaction that cannot be converted to transfer object");
×
1437
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1438
                };
1439

1440
                // Drop state immediately to prevent holding over a response.
1441
                drop(state);
1✔
1442

1✔
1443
                peer.send(PeerMessage::Transaction(Box::new(transfer_transaction)))
1✔
1444
                    .await?;
1✔
1445

1446
                Ok(KEEP_CONNECTION_ALIVE)
1✔
1447
            }
1448
            PeerMessage::BlockProposalNotification(block_proposal_notification) => {
1✔
1449
                let verdict = self
1✔
1450
                    .global_state_lock
1✔
1451
                    .lock_guard()
1✔
1452
                    .await
1✔
1453
                    .favor_incoming_block_proposal(
1✔
1454
                        block_proposal_notification.height,
1✔
1455
                        block_proposal_notification.guesser_fee,
1✔
1456
                    );
1✔
1457
                match verdict {
1✔
1458
                    Ok(_) => {
1459
                        peer.send(PeerMessage::BlockProposalRequest(
1✔
1460
                            BlockProposalRequest::new(block_proposal_notification.body_mast_hash),
1✔
1461
                        ))
1✔
1462
                        .await?
1✔
1463
                    }
1464
                    Err(reject_reason) => {
×
1465
                        info!(
×
1466
                        "Rejecting notification of block proposal with guesser fee {} from peer \
×
1467
                        {}. Reason:\n{reject_reason}",
×
1468
                        block_proposal_notification.guesser_fee.display_n_decimals(5),
×
1469
                        self.peer_address
1470
                    )
1471
                    }
1472
                }
1473

1474
                Ok(KEEP_CONNECTION_ALIVE)
1✔
1475
            }
1476
            PeerMessage::BlockProposalRequest(block_proposal_request) => {
×
1477
                let matching_proposal = self
×
1478
                    .global_state_lock
×
1479
                    .lock_guard()
×
1480
                    .await
×
1481
                    .mining_state
1482
                    .block_proposal
1483
                    .filter(|x| x.body().mast_hash() == block_proposal_request.body_mast_hash)
×
1484
                    .map(|x| x.to_owned());
×
1485
                if let Some(proposal) = matching_proposal {
×
1486
                    peer.send(PeerMessage::BlockProposal(Box::new(proposal)))
×
1487
                        .await?;
×
1488
                } else {
1489
                    self.punish(NegativePeerSanction::BlockProposalNotFound)
×
1490
                        .await?;
×
1491
                }
1492

1493
                Ok(KEEP_CONNECTION_ALIVE)
×
1494
            }
1495
            PeerMessage::BlockProposal(block) => {
1✔
1496
                info!("Got block proposal from peer.");
1✔
1497

1498
                let should_punish = {
1✔
1499
                    log_slow_scope!(fn_name!() + "::PeerMessage::BlockProposal::should_punish");
1✔
1500

1501
                    let (verdict, tip) = {
1✔
1502
                        let state = self.global_state_lock.lock_guard().await;
1✔
1503

1504
                        let verdict = state.favor_incoming_block_proposal(
1✔
1505
                            block.header().height,
1✔
1506
                            block.total_guesser_reward(),
1✔
1507
                        );
1✔
1508
                        let tip = state.chain.light_state().to_owned();
1✔
1509
                        (verdict, tip)
1✔
1510
                    };
1511

1512
                    if let Err(rejection_reason) = verdict {
1✔
1513
                        match rejection_reason {
×
1514
                            // no need to punish and log if the fees are equal.  we just ignore the incoming proposal.
1515
                            BlockProposalRejectError::InsufficientFee { current, received }
×
1516
                                if Some(received) == current =>
×
1517
                            {
×
1518
                                debug!("ignoring new block proposal because the fee is equal to the present one");
×
1519
                                None
×
1520
                            }
1521
                            _ => {
1522
                                warn!("Rejecting new block proposal:\n{rejection_reason}");
×
1523
                                Some(NegativePeerSanction::NonFavorableBlockProposal)
×
1524
                            }
1525
                        }
1526
                    } else {
1527
                        // Verify validity and that proposal is child of current tip
1528
                        if block.is_valid(&tip, self.now()).await {
1✔
1529
                            None // all is well, no punishment.
1✔
1530
                        } else {
1531
                            Some(NegativePeerSanction::InvalidBlockProposal)
×
1532
                        }
1533
                    }
1534
                };
1535

1536
                if let Some(sanction) = should_punish {
1✔
1537
                    self.punish(sanction).await?;
×
1538
                } else {
1539
                    self.send_to_main(PeerTaskToMain::BlockProposal(block), line!())
1✔
1540
                        .await?;
1✔
1541

1542
                    // Valuable, new, hard-to-produce information. Reward peer.
1543
                    self.reward(PositivePeerSanction::NewBlockProposal).await?;
1✔
1544
                }
1545

1546
                Ok(KEEP_CONNECTION_ALIVE)
1✔
1547
            }
1548
        }
1549
    }
93✔
1550

1551
    /// send msg to main via mpsc channel `to_main_tx` and logs if slow.
1552
    ///
1553
    /// the channel could potentially fill up in which case the send() will
1554
    /// block until there is capacity.  we wrap the send() so we can log if
1555
    /// that ever happens to the extent it passes slow-scope threshold.
1556
    async fn send_to_main(
1✔
1557
        &self,
1✔
1558
        msg: PeerTaskToMain,
1✔
1559
        line: u32,
1✔
1560
    ) -> Result<(), tokio::sync::mpsc::error::SendError<PeerTaskToMain>> {
1✔
1561
        // we measure across the send() in case the channel ever fills up.
1✔
1562
        log_slow_scope!(fn_name!() + &format!("peer_loop.rs:{}", line));
1✔
1563

1✔
1564
        self.to_main_tx.send(msg).await
1✔
1565
    }
1✔
1566

1567
    /// Handle message from main task. The boolean return value indicates if
1568
    /// the connection should be closed.
1569
    ///
1570
    /// Locking:
1571
    ///   * acquires `global_state_lock` for write via Self::punish()
1572
    async fn handle_main_task_message<S>(
×
1573
        &mut self,
×
1574
        msg: MainToPeerTask,
×
1575
        peer: &mut S,
×
1576
        peer_state_info: &mut MutablePeerState,
×
1577
    ) -> Result<bool>
×
1578
    where
×
1579
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
×
1580
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
×
1581
        <S as TryStream>::Error: std::error::Error,
×
1582
    {
×
1583
        debug!("Handling {} message from main in peer loop", msg.get_type());
×
1584
        match msg {
×
1585
            MainToPeerTask::Block(block) => {
×
1586
                // We don't currently differentiate whether a new block came from a peer, or from our
×
1587
                // own miner. It's always shared through this logic.
×
1588
                let new_block_height = block.kernel.header.height;
×
1589
                if new_block_height > peer_state_info.highest_shared_block_height {
×
1590
                    debug!("Sending PeerMessage::BlockNotification");
×
1591
                    peer_state_info.highest_shared_block_height = new_block_height;
×
1592
                    peer.send(PeerMessage::BlockNotification(block.as_ref().into()))
×
1593
                        .await?;
×
1594
                    debug!("Sent PeerMessage::BlockNotification");
×
1595
                }
×
1596
                Ok(KEEP_CONNECTION_ALIVE)
×
1597
            }
1598
            MainToPeerTask::RequestBlockBatch(batch_block_request) => {
×
1599
                // Only ask one of the peers about the batch of blocks
×
1600
                if batch_block_request.peer_addr_target != self.peer_address {
×
1601
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1602
                }
×
1603

×
1604
                let max_response_len = std::cmp::min(
×
1605
                    STANDARD_BLOCK_BATCH_SIZE,
×
1606
                    self.global_state_lock.cli().sync_mode_threshold,
×
1607
                );
×
1608

×
1609
                peer.send(PeerMessage::BlockRequestBatch(BlockRequestBatch {
×
1610
                    known_blocks: batch_block_request.known_blocks,
×
1611
                    max_response_len,
×
1612
                    anchor: batch_block_request.anchor_mmr,
×
1613
                }))
×
1614
                .await?;
×
1615

1616
                Ok(KEEP_CONNECTION_ALIVE)
×
1617
            }
1618
            MainToPeerTask::PeerSynchronizationTimeout(socket_addr) => {
×
1619
                log_slow_scope!(fn_name!() + "::MainToPeerTask::PeerSynchronizationTimeout");
×
1620

×
1621
                if self.peer_address != socket_addr {
×
1622
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1623
                }
×
1624

×
1625
                self.punish(NegativePeerSanction::SynchronizationTimeout)
×
1626
                    .await?;
×
1627

1628
                // If this peer failed the last synchronization attempt, we only
1629
                // sanction, we don't disconnect.
1630
                Ok(KEEP_CONNECTION_ALIVE)
×
1631
            }
1632
            MainToPeerTask::MakePeerDiscoveryRequest => {
1633
                peer.send(PeerMessage::PeerListRequest).await?;
×
1634
                Ok(KEEP_CONNECTION_ALIVE)
×
1635
            }
1636
            MainToPeerTask::Disconnect(peer_address) => {
×
1637
                log_slow_scope!(fn_name!() + "::MainToPeerTask::Disconnect");
×
1638

×
1639
                // Only disconnect from the peer the main task requested a disconnect for.
×
1640
                if peer_address != self.peer_address {
×
1641
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1642
                }
×
1643
                self.register_peer_disconnection().await;
×
1644

1645
                Ok(DISCONNECT_CONNECTION)
×
1646
            }
1647
            MainToPeerTask::DisconnectAll() => {
1648
                self.register_peer_disconnection().await;
×
1649

1650
                Ok(DISCONNECT_CONNECTION)
×
1651
            }
1652
            MainToPeerTask::MakeSpecificPeerDiscoveryRequest(target_socket_addr) => {
×
1653
                if target_socket_addr == self.peer_address {
×
1654
                    peer.send(PeerMessage::PeerListRequest).await?;
×
1655
                }
×
1656
                Ok(KEEP_CONNECTION_ALIVE)
×
1657
            }
1658
            MainToPeerTask::TransactionNotification(transaction_notification) => {
×
1659
                debug!("Sending PeerMessage::TransactionNotification");
×
1660
                peer.send(PeerMessage::TransactionNotification(
×
1661
                    transaction_notification,
×
1662
                ))
×
1663
                .await?;
×
1664
                debug!("Sent PeerMessage::TransactionNotification");
×
1665
                Ok(KEEP_CONNECTION_ALIVE)
×
1666
            }
1667
            MainToPeerTask::BlockProposalNotification(block_proposal_notification) => {
×
1668
                debug!("Sending PeerMessage::BlockProposalNotification");
×
1669
                peer.send(PeerMessage::BlockProposalNotification(
×
1670
                    block_proposal_notification,
×
1671
                ))
×
1672
                .await?;
×
1673
                debug!("Sent PeerMessage::BlockProposalNotification");
×
1674
                Ok(KEEP_CONNECTION_ALIVE)
×
1675
            }
1676
        }
1677
    }
×
1678

1679
    /// Loop for the peer tasks. Awaits either a message from the peer over TCP,
1680
    /// or a message from main over the main-to-peer-tasks broadcast channel.
1681
    async fn run<S>(
41✔
1682
        &mut self,
41✔
1683
        mut peer: S,
41✔
1684
        mut from_main_rx: broadcast::Receiver<MainToPeerTask>,
41✔
1685
        peer_state_info: &mut MutablePeerState,
41✔
1686
    ) -> Result<()>
41✔
1687
    where
41✔
1688
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
41✔
1689
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
41✔
1690
        <S as TryStream>::Error: std::error::Error,
41✔
1691
    {
41✔
1692
        loop {
1693
            select! {
93✔
1694
                // Handle peer messages
1695
                peer_message = peer.try_next() => {
93✔
1696
                    let peer_address = self.peer_address;
93✔
1697
                    let peer_message = match peer_message {
93✔
1698
                        Ok(message) => message,
93✔
1699
                        Err(err) => {
×
1700
                            let msg = format!("Error when receiving from peer: {peer_address}");
×
1701
                            error!("{msg}. Error: {err}");
×
1702
                            bail!("{msg}. Closing connection.");
×
1703
                        }
1704
                    };
1705
                    let Some(peer_message) = peer_message else {
93✔
1706
                        info!("Peer {peer_address} closed connection.");
×
1707
                        break;
×
1708
                    };
1709

1710
                    let syncing =
93✔
1711
                        self.global_state_lock.lock(|s| s.net.sync_anchor.is_some()).await;
93✔
1712
                    let message_type = peer_message.get_type();
93✔
1713
                    if peer_message.ignore_during_sync() && syncing {
93✔
1714
                        debug!(
×
1715
                            "Ignoring {message_type} message when syncing, from {peer_address}",
×
1716
                        );
1717
                        continue;
×
1718
                    }
93✔
1719
                    if peer_message.ignore_when_not_sync() && !syncing {
93✔
1720
                        debug!(
×
1721
                            "Ignoring {message_type} message when not syncing, from {peer_address}",
×
1722
                        );
1723
                        continue;
×
1724
                    }
93✔
1725

93✔
1726
                    match self
93✔
1727
                        .handle_peer_message(peer_message, &mut peer, peer_state_info)
93✔
1728
                        .await
93✔
1729
                    {
1730
                        Ok(false) => {}
52✔
1731
                        Ok(true) => {
1732
                            info!("Closing connection to {peer_address}");
40✔
1733
                            break;
40✔
1734
                        }
1735
                        Err(err) => {
1✔
1736
                            warn!("Closing connection to {peer_address} because of error {err}.");
1✔
1737
                            bail!("{err}");
1✔
1738
                        }
1739
                    };
1740
                }
1741

1742
                // Handle messages from main task
1743
                main_msg_res = from_main_rx.recv() => {
93✔
1744
                    let main_msg = main_msg_res
×
1745
                        .unwrap_or_else(|e| panic!("Failed to read from main loop: {e}"));
×
1746
                    let close_connection = self
×
1747
                        .handle_main_task_message(main_msg, &mut peer, peer_state_info)
×
1748
                        .await
×
1749
                        .unwrap_or_else(|err| {
×
1750
                            warn!("handle_main_task_message returned an error: {err}");
×
1751
                            true
×
1752
                        });
×
1753

×
1754
                    if close_connection {
×
1755
                        info!(
×
1756
                            "handle_main_task_message is closing the connection to {}",
×
1757
                            self.peer_address
1758
                        );
1759
                        break;
×
1760
                    }
×
1761
                }
1762
            }
1763
        }
1764

1765
        Ok(())
40✔
1766
    }
41✔
1767

1768
    /// Function called before entering the peer loop. Reads the potentially stored
1769
    /// peer standing from the database and does other book-keeping before entering
1770
    /// its final resting place: the `peer_loop`. Note that the peer has already been
1771
    /// accepted for a connection for this loop to be entered. So we don't need
1772
    /// to check the standing again.
1773
    ///
1774
    /// Locking:
1775
    ///   * acquires `global_state_lock` for write
1776
    pub(crate) async fn run_wrapper<S>(
32✔
1777
        &mut self,
32✔
1778
        mut peer: S,
32✔
1779
        from_main_rx: broadcast::Receiver<MainToPeerTask>,
32✔
1780
    ) -> Result<()>
32✔
1781
    where
32✔
1782
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
32✔
1783
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
32✔
1784
        <S as TryStream>::Error: std::error::Error,
32✔
1785
    {
32✔
1786
        const TIME_DIFFERENCE_WARN_THRESHOLD_IN_SECONDS: i128 = 120;
1787

1788
        let cli_args = self.global_state_lock.cli().clone();
32✔
1789

1790
        let standing = self
32✔
1791
            .global_state_lock
32✔
1792
            .lock_guard()
32✔
1793
            .await
32✔
1794
            .net
1795
            .peer_databases
1796
            .peer_standings
1797
            .get(self.peer_address.ip())
32✔
1798
            .await
32✔
1799
            .unwrap_or_else(|| PeerStanding::new(cli_args.peer_tolerance));
31✔
1800

31✔
1801
        // Add peer to peer map
31✔
1802
        let peer_connection_info = PeerConnectionInfo::new(
31✔
1803
            self.peer_handshake_data.listen_port,
31✔
1804
            self.peer_address,
31✔
1805
            self.inbound_connection,
31✔
1806
        );
31✔
1807
        let new_peer = PeerInfo::new(
31✔
1808
            peer_connection_info,
31✔
1809
            &self.peer_handshake_data,
31✔
1810
            SystemTime::now(),
31✔
1811
            cli_args.peer_tolerance,
31✔
1812
        )
31✔
1813
        .with_standing(standing);
31✔
1814

31✔
1815
        // If timestamps are different, we currently just log a warning.
31✔
1816
        let peer_clock_ahead_in_seconds = new_peer.time_difference_in_seconds();
31✔
1817
        let own_clock_ahead_in_seconds = -peer_clock_ahead_in_seconds;
31✔
1818
        if peer_clock_ahead_in_seconds > TIME_DIFFERENCE_WARN_THRESHOLD_IN_SECONDS
31✔
1819
            || own_clock_ahead_in_seconds > TIME_DIFFERENCE_WARN_THRESHOLD_IN_SECONDS
31✔
1820
        {
1821
            let own_datetime_utc: DateTime<Utc> =
×
1822
                new_peer.own_timestamp_connection_established.into();
×
1823
            let peer_datetime_utc: DateTime<Utc> =
×
1824
                new_peer.peer_timestamp_connection_established.into();
×
1825
            warn!(
×
1826
                "New peer {} disagrees with us about time. Peer reports time {} but our clock at handshake was {}.",
×
1827
                new_peer.connected_address(),
×
1828
                peer_datetime_utc.format("%Y-%m-%d %H:%M:%S"),
×
1829
                own_datetime_utc.format("%Y-%m-%d %H:%M:%S"));
×
1830
        }
31✔
1831

1832
        // Multiple tasks might attempt to set up a connection concurrently. So
1833
        // even though we've checked that this connection is allowed, this check
1834
        // could have been invalidated by another task, for one accepting an
1835
        // incoming connection from a peer we're currently connecting to. So we
1836
        // need to make the a check again while holding a write-lock, since
1837
        // we're modifying `peer_map` here. Holding a read-lock doesn't work
1838
        // since it would have to be dropped before acquiring the write-lock.
1839
        {
1840
            let mut global_state = self.global_state_lock.lock_guard_mut().await;
31✔
1841
            let peer_map = &mut global_state.net.peer_map;
31✔
1842
            if peer_map
31✔
1843
                .values()
31✔
1844
                .any(|pi| pi.instance_id() == self.peer_handshake_data.instance_id)
31✔
1845
            {
1846
                bail!("Attempted to connect to already connected peer. Aborting connection.");
×
1847
            }
31✔
1848

31✔
1849
            if peer_map.len() >= cli_args.max_num_peers {
31✔
1850
                bail!("Attempted to connect to more peers than allowed. Aborting connection.");
×
1851
            }
31✔
1852

31✔
1853
            if peer_map.contains_key(&self.peer_address) {
31✔
1854
                // This shouldn't be possible, unless the peer reports a different instance ID than
1855
                // for the other connection. Only a malignant client would do that.
1856
                bail!("Already connected to peer. Aborting connection");
×
1857
            }
31✔
1858

31✔
1859
            peer_map.insert(self.peer_address, new_peer);
31✔
1860
        }
31✔
1861

31✔
1862
        // `MutablePeerState` contains the part of the peer-loop's state that is mutable
31✔
1863
        let mut peer_state = MutablePeerState::new(self.peer_handshake_data.tip_header.height);
31✔
1864

31✔
1865
        // If peer indicates more canonical block, request a block notification to catch up ASAP
31✔
1866
        if self.peer_handshake_data.tip_header.cumulative_proof_of_work
31✔
1867
            > self
31✔
1868
                .global_state_lock
31✔
1869
                .lock_guard()
31✔
1870
                .await
31✔
1871
                .chain
1872
                .light_state()
31✔
1873
                .kernel
1874
                .header
1875
                .cumulative_proof_of_work
1876
        {
1877
            // Send block notification request to catch up ASAP, in case we're
1878
            // behind the newly-connected peer.
1879
            peer.send(PeerMessage::BlockNotificationRequest).await?;
×
1880
        }
31✔
1881

1882
        let res = self.run(peer, from_main_rx, &mut peer_state).await;
31✔
1883
        debug!("Exited peer loop for {}", self.peer_address);
31✔
1884

1885
        close_peer_connected_callback(
31✔
1886
            self.global_state_lock.clone(),
31✔
1887
            self.peer_address,
31✔
1888
            &self.to_main_tx,
31✔
1889
        )
31✔
1890
        .await;
31✔
1891

1892
        debug!("Ending peer loop for {}", self.peer_address);
31✔
1893

1894
        // Return any error that `run` returned. Returning and not suppressing errors is a quite nice
1895
        // feature to have for testing purposes.
1896
        res
31✔
1897
    }
31✔
1898

1899
    /// Register graceful peer disconnection in the global state.
1900
    ///
1901
    /// See also [`NetworkingState::register_peer_disconnection`][1].
1902
    ///
1903
    /// # Locking:
1904
    ///   * acquires `global_state_lock` for write
1905
    ///
1906
    /// [1]: crate::models::state::networking_state::NetworkingState::register_peer_disconnection
1907
    async fn register_peer_disconnection(&mut self) {
×
1908
        let peer_id = self.peer_handshake_data.instance_id;
×
1909
        self.global_state_lock
×
1910
            .lock_guard_mut()
×
1911
            .await
×
1912
            .net
1913
            .register_peer_disconnection(peer_id, SystemTime::now());
×
1914
    }
×
1915
}
1916

1917
#[cfg(test)]
1918
mod peer_loop_tests {
1919
    use rand::rngs::StdRng;
1920
    use rand::Rng;
1921
    use rand::SeedableRng;
1922
    use tokio::sync::mpsc::error::TryRecvError;
1923
    use tracing_test::traced_test;
1924

1925
    use super::*;
1926
    use crate::config_models::cli_args;
1927
    use crate::config_models::network::Network;
1928
    use crate::job_queue::triton_vm::TritonVmJobQueue;
1929
    use crate::models::blockchain::block::block_header::TARGET_BLOCK_INTERVAL;
1930
    use crate::models::blockchain::type_scripts::native_currency_amount::NativeCurrencyAmount;
1931
    use crate::models::peer::peer_block_notifications::PeerBlockNotification;
1932
    use crate::models::peer::transaction_notification::TransactionNotification;
1933
    use crate::models::state::mempool::TransactionOrigin;
1934
    use crate::models::state::tx_proving_capability::TxProvingCapability;
1935
    use crate::models::state::wallet::utxo_notification::UtxoNotificationMedium;
1936
    use crate::models::state::wallet::wallet_entropy::WalletEntropy;
1937
    use crate::tests::shared::fake_valid_block_for_tests;
1938
    use crate::tests::shared::fake_valid_sequence_of_blocks_for_tests;
1939
    use crate::tests::shared::get_dummy_handshake_data_for_genesis;
1940
    use crate::tests::shared::get_dummy_peer_connection_data_genesis;
1941
    use crate::tests::shared::get_dummy_socket_address;
1942
    use crate::tests::shared::get_test_genesis_setup;
1943
    use crate::tests::shared::invalid_empty_single_proof_transaction;
1944
    use crate::tests::shared::Action;
1945
    use crate::tests::shared::Mock;
1946

1947
    #[traced_test]
×
1948
    #[tokio::test]
1949
    async fn test_peer_loop_bye() -> Result<()> {
1✔
1950
        let mock = Mock::new(vec![Action::Read(PeerMessage::Bye)]);
1✔
1951

1✔
1952
        let (peer_broadcast_tx, _from_main_rx_clone, to_main_tx, _to_main_rx1, state_lock, hsd) =
1✔
1953
            get_test_genesis_setup(Network::Alpha, 2, cli_args::Args::default()).await?;
1✔
1954

1✔
1955
        let peer_address = get_dummy_socket_address(2);
1✔
1956
        let from_main_rx_clone = peer_broadcast_tx.subscribe();
1✔
1957
        let mut peer_loop_handler =
1✔
1958
            PeerLoopHandler::new(to_main_tx, state_lock.clone(), peer_address, hsd, true, 1);
1✔
1959
        peer_loop_handler
1✔
1960
            .run_wrapper(mock, from_main_rx_clone)
1✔
1961
            .await?;
1✔
1962

1✔
1963
        assert_eq!(
1✔
1964
            2,
1✔
1965
            state_lock.lock_guard().await.net.peer_map.len(),
1✔
1966
            "peer map length must be back to 2 after goodbye"
1✔
1967
        );
1✔
1968

1✔
1969
        Ok(())
1✔
1970
    }
1✔
1971

1972
    #[traced_test]
×
1973
    #[tokio::test]
1974
    async fn test_peer_loop_peer_list() {
1✔
1975
        let (peer_broadcast_tx, _from_main_rx_clone, to_main_tx, _to_main_rx1, state_lock, _hsd) =
1✔
1976
            get_test_genesis_setup(Network::Alpha, 2, cli_args::Args::default())
1✔
1977
                .await
1✔
1978
                .unwrap();
1✔
1979

1✔
1980
        let mut peer_infos = state_lock
1✔
1981
            .lock_guard()
1✔
1982
            .await
1✔
1983
            .net
1✔
1984
            .peer_map
1✔
1985
            .clone()
1✔
1986
            .into_values()
1✔
1987
            .collect::<Vec<_>>();
1✔
1988
        peer_infos.sort_by_cached_key(|x| x.connected_address());
2✔
1989
        let (peer_address0, instance_id0) = (
1✔
1990
            peer_infos[0].connected_address(),
1✔
1991
            peer_infos[0].instance_id(),
1✔
1992
        );
1✔
1993
        let (peer_address1, instance_id1) = (
1✔
1994
            peer_infos[1].connected_address(),
1✔
1995
            peer_infos[1].instance_id(),
1✔
1996
        );
1✔
1997

1✔
1998
        let (hsd2, sa2) = get_dummy_peer_connection_data_genesis(Network::Alpha, 2);
1✔
1999
        let expected_response = vec![
1✔
2000
            (peer_address0, instance_id0),
1✔
2001
            (peer_address1, instance_id1),
1✔
2002
            (sa2, hsd2.instance_id),
1✔
2003
        ];
1✔
2004
        let mock = Mock::new(vec![
1✔
2005
            Action::Read(PeerMessage::PeerListRequest),
1✔
2006
            Action::Write(PeerMessage::PeerListResponse(expected_response)),
1✔
2007
            Action::Read(PeerMessage::Bye),
1✔
2008
        ]);
1✔
2009

1✔
2010
        let from_main_rx_clone = peer_broadcast_tx.subscribe();
1✔
2011

1✔
2012
        let mut peer_loop_handler =
1✔
2013
            PeerLoopHandler::new(to_main_tx, state_lock.clone(), sa2, hsd2, true, 0);
1✔
2014
        peer_loop_handler
1✔
2015
            .run_wrapper(mock, from_main_rx_clone)
1✔
2016
            .await
1✔
2017
            .unwrap();
1✔
2018

1✔
2019
        assert_eq!(
1✔
2020
            2,
1✔
2021
            state_lock.lock_guard().await.net.peer_map.len(),
1✔
2022
            "peer map must have length 2 after saying goodbye to peer 2"
1✔
2023
        );
1✔
2024
    }
1✔
2025

2026
    #[traced_test]
×
2027
    #[tokio::test]
2028
    async fn different_genesis_test() -> Result<()> {
1✔
2029
        // In this scenario a peer provides another genesis block than what has been
1✔
2030
        // hardcoded. This should lead to the closing of the connection to this peer
1✔
2031
        // and a ban.
1✔
2032

1✔
2033
        let network = Network::Main;
1✔
2034
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, mut to_main_rx1, state_lock, hsd) =
1✔
2035
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
2036
        assert_eq!(1000, state_lock.cli().peer_tolerance);
1✔
2037
        let peer_address = get_dummy_socket_address(0);
1✔
2038

1✔
2039
        // Although the database is empty, `get_latest_block` still returns the genesis block,
1✔
2040
        // since that block is hardcoded.
1✔
2041
        let mut different_genesis_block = state_lock
1✔
2042
            .lock_guard()
1✔
2043
            .await
1✔
2044
            .chain
1✔
2045
            .archival_state()
1✔
2046
            .get_tip()
1✔
2047
            .await;
1✔
2048

1✔
2049
        different_genesis_block.set_header_nonce(StdRng::seed_from_u64(5550001).random());
1✔
2050
        let [block_1_with_different_genesis] = fake_valid_sequence_of_blocks_for_tests(
1✔
2051
            &different_genesis_block,
1✔
2052
            Timestamp::hours(1),
1✔
2053
            StdRng::seed_from_u64(5550001).random(),
1✔
2054
        )
1✔
2055
        .await;
1✔
2056
        let mock = Mock::new(vec![Action::Read(PeerMessage::Block(Box::new(
1✔
2057
            block_1_with_different_genesis.try_into().unwrap(),
1✔
2058
        )))]);
1✔
2059

1✔
2060
        let mut peer_loop_handler = PeerLoopHandler::new(
1✔
2061
            to_main_tx.clone(),
1✔
2062
            state_lock.clone(),
1✔
2063
            peer_address,
1✔
2064
            hsd,
1✔
2065
            true,
1✔
2066
            1,
1✔
2067
        );
1✔
2068
        let res = peer_loop_handler
1✔
2069
            .run_wrapper(mock, from_main_rx_clone)
1✔
2070
            .await;
1✔
2071
        assert!(
1✔
2072
            res.is_err(),
1✔
2073
            "run_wrapper must return failure when genesis is different"
1✔
2074
        );
1✔
2075

1✔
2076
        match to_main_rx1.recv().await {
1✔
2077
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
1✔
2078
            _ => bail!("Must receive remove of peer block max height"),
1✔
2079
        }
1✔
2080

1✔
2081
        // Verify that no further message was sent to main loop
1✔
2082
        match to_main_rx1.try_recv() {
1✔
2083
            Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (),
1✔
2084
            _ => bail!("Block notification must not be sent for block with invalid PoW"),
1✔
2085
        };
1✔
2086

1✔
2087
        drop(to_main_tx);
1✔
2088

1✔
2089
        let peer_standing = state_lock
1✔
2090
            .lock_guard()
1✔
2091
            .await
1✔
2092
            .net
1✔
2093
            .get_peer_standing_from_database(peer_address.ip())
1✔
2094
            .await;
1✔
2095
        assert_eq!(
1✔
2096
            -i32::from(state_lock.cli().peer_tolerance),
1✔
2097
            peer_standing.unwrap().standing
1✔
2098
        );
1✔
2099
        assert_eq!(
1✔
2100
            NegativePeerSanction::DifferentGenesis,
1✔
2101
            peer_standing.unwrap().latest_punishment.unwrap().0
1✔
2102
        );
1✔
2103

1✔
2104
        Ok(())
1✔
2105
    }
1✔
2106

2107
    #[traced_test]
×
2108
    #[tokio::test]
2109
    async fn node_does_not_record_disconnection_time_when_peer_initiates_disconnect() -> Result<()>
1✔
2110
    {
1✔
2111
        let args = cli_args::Args::default();
1✔
2112
        let network = args.network;
1✔
2113
        let (from_main_tx, from_main_rx, to_main_tx, to_main_rx, state_lock, _) =
1✔
2114
            get_test_genesis_setup(network, 0, args).await?;
1✔
2115

1✔
2116
        let peer_address = get_dummy_socket_address(0);
1✔
2117
        let peer_handshake_data = get_dummy_handshake_data_for_genesis(network);
1✔
2118
        let peer_id = peer_handshake_data.instance_id;
1✔
2119
        let mut peer_loop_handler = PeerLoopHandler::new(
1✔
2120
            to_main_tx,
1✔
2121
            state_lock.clone(),
1✔
2122
            peer_address,
1✔
2123
            peer_handshake_data,
1✔
2124
            true,
1✔
2125
            1,
1✔
2126
        );
1✔
2127
        let mock = Mock::new(vec![Action::Read(PeerMessage::Bye)]);
1✔
2128
        peer_loop_handler.run_wrapper(mock, from_main_rx).await?;
1✔
2129

1✔
2130
        let global_state = state_lock.lock_guard().await;
1✔
2131
        assert!(global_state
1✔
2132
            .net
1✔
2133
            .last_disconnection_time_of_peer(peer_id)
1✔
2134
            .is_none());
1✔
2135

1✔
2136
        drop(to_main_rx);
1✔
2137
        drop(from_main_tx);
1✔
2138

1✔
2139
        Ok(())
1✔
2140
    }
1✔
2141

2142
    #[traced_test]
×
2143
    #[tokio::test]
2144
    async fn block_without_valid_pow_test() -> Result<()> {
1✔
2145
        // In this scenario, a block without a valid PoW is received. This block should be rejected
1✔
2146
        // by the peer loop and a notification should never reach the main loop.
1✔
2147

1✔
2148
        let network = Network::Main;
1✔
2149
        let (peer_broadcast_tx, _from_main_rx_clone, to_main_tx, mut to_main_rx1, state_lock, hsd) =
1✔
2150
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
2151
        let peer_address = get_dummy_socket_address(0);
1✔
2152
        let genesis_block: Block = state_lock
1✔
2153
            .lock_guard()
1✔
2154
            .await
1✔
2155
            .chain
1✔
2156
            .archival_state()
1✔
2157
            .get_tip()
1✔
2158
            .await;
1✔
2159

1✔
2160
        // Make a with hash above what the implied threshold from
1✔
2161
        let [mut block_without_valid_pow] = fake_valid_sequence_of_blocks_for_tests(
1✔
2162
            &genesis_block,
1✔
2163
            Timestamp::hours(1),
1✔
2164
            StdRng::seed_from_u64(5550001).random(),
1✔
2165
        )
1✔
2166
        .await;
1✔
2167

1✔
2168
        // This *probably* is invalid PoW -- and needs to be for this test to
1✔
2169
        // work.
1✔
2170
        block_without_valid_pow.set_header_nonce(Digest::default());
1✔
2171

1✔
2172
        // Sending an invalid block will not necessarily result in a ban. This depends on the peer
1✔
2173
        // tolerance that is set in the client. For this reason, we include a "Bye" here.
1✔
2174
        let mock = Mock::new(vec![
1✔
2175
            Action::Read(PeerMessage::Block(Box::new(
1✔
2176
                block_without_valid_pow.clone().try_into().unwrap(),
1✔
2177
            ))),
1✔
2178
            Action::Read(PeerMessage::Bye),
1✔
2179
        ]);
1✔
2180

1✔
2181
        let from_main_rx_clone = peer_broadcast_tx.subscribe();
1✔
2182

1✔
2183
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
2184
            to_main_tx.clone(),
1✔
2185
            state_lock.clone(),
1✔
2186
            peer_address,
1✔
2187
            hsd,
1✔
2188
            true,
1✔
2189
            1,
1✔
2190
            block_without_valid_pow.header().timestamp,
1✔
2191
        );
1✔
2192
        peer_loop_handler
1✔
2193
            .run_wrapper(mock, from_main_rx_clone)
1✔
2194
            .await
1✔
2195
            .expect("sending (one) invalid block should not result in closed connection");
1✔
2196

1✔
2197
        match to_main_rx1.recv().await {
1✔
2198
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
1✔
2199
            _ => bail!("Must receive remove of peer block max height"),
1✔
2200
        }
1✔
2201

1✔
2202
        // Verify that no further message was sent to main loop
1✔
2203
        match to_main_rx1.try_recv() {
1✔
2204
            Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (),
1✔
2205
            _ => bail!("Block notification must not be sent for block with invalid PoW"),
1✔
2206
        };
1✔
2207

1✔
2208
        // We need to have the transmitter in scope until we have received from it
1✔
2209
        // otherwise the receiver will report the disconnected error when we attempt
1✔
2210
        // to read from it. And the purpose is to verify that the channel is empty,
1✔
2211
        // not that it has been closed.
1✔
2212
        drop(to_main_tx);
1✔
2213

1✔
2214
        // Verify that peer standing was stored in database
1✔
2215
        let standing = state_lock
1✔
2216
            .lock_guard()
1✔
2217
            .await
1✔
2218
            .net
1✔
2219
            .peer_databases
1✔
2220
            .peer_standings
1✔
2221
            .get(peer_address.ip())
1✔
2222
            .await
1✔
2223
            .unwrap();
1✔
2224
        assert!(
1✔
2225
            standing.standing < 0,
1✔
2226
            "Peer must be sanctioned for sending a bad block"
1✔
2227
        );
1✔
2228

1✔
2229
        Ok(())
1✔
2230
    }
1✔
2231

2232
    #[traced_test]
×
2233
    #[tokio::test]
2234
    async fn test_peer_loop_block_with_block_in_db() -> Result<()> {
1✔
2235
        // The scenario tested here is that a client receives a block that is already
1✔
2236
        // known and stored. The expected behavior is to ignore the block and not send
1✔
2237
        // a message to the main task.
1✔
2238

1✔
2239
        let network = Network::Main;
1✔
2240
        let (peer_broadcast_tx, _from_main_rx_clone, to_main_tx, mut to_main_rx1, mut alice, hsd) =
1✔
2241
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
2242
        let peer_address = get_dummy_socket_address(0);
1✔
2243
        let genesis_block: Block = Block::genesis(network);
1✔
2244

1✔
2245
        let now = genesis_block.header().timestamp + Timestamp::hours(1);
1✔
2246
        let block_1 =
1✔
2247
            fake_valid_block_for_tests(&alice, StdRng::seed_from_u64(5550001).random()).await;
1✔
2248
        assert!(
1✔
2249
            block_1.is_valid(&genesis_block, now).await,
1✔
2250
            "Block must be valid for this test to make sense"
1✔
2251
        );
1✔
2252
        alice.set_new_tip(block_1.clone()).await?;
1✔
2253

1✔
2254
        let mock_peer_messages = Mock::new(vec![
1✔
2255
            Action::Read(PeerMessage::Block(Box::new(
1✔
2256
                block_1.clone().try_into().unwrap(),
1✔
2257
            ))),
1✔
2258
            Action::Read(PeerMessage::Bye),
1✔
2259
        ]);
1✔
2260

1✔
2261
        let from_main_rx_clone = peer_broadcast_tx.subscribe();
1✔
2262

1✔
2263
        let mut alice_peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
2264
            to_main_tx.clone(),
1✔
2265
            alice.clone(),
1✔
2266
            peer_address,
1✔
2267
            hsd,
1✔
2268
            false,
1✔
2269
            1,
1✔
2270
            block_1.header().timestamp,
1✔
2271
        );
1✔
2272
        alice_peer_loop_handler
1✔
2273
            .run_wrapper(mock_peer_messages, from_main_rx_clone)
1✔
2274
            .await?;
1✔
2275

1✔
2276
        match to_main_rx1.recv().await {
1✔
2277
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
1✔
2278
            other => bail!("Must receive remove of peer block max height. Got:\n {other:?}"),
1✔
2279
        }
1✔
2280
        match to_main_rx1.try_recv() {
1✔
2281
            Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (),
1✔
2282
            _ => bail!("Block notification must not be sent for block with invalid PoW"),
1✔
2283
        };
1✔
2284
        drop(to_main_tx);
1✔
2285

1✔
2286
        if !alice.lock_guard().await.net.peer_map.is_empty() {
1✔
2287
            bail!("peer map must be empty after closing connection gracefully");
1✔
2288
        }
1✔
2289

1✔
2290
        Ok(())
1✔
2291
    }
1✔
2292

2293
    #[traced_test]
×
2294
    #[tokio::test]
2295
    async fn block_request_batch_simple() {
1✔
2296
        // Scenario: Six blocks (including genesis) are known. Peer requests
1✔
2297
        // from all possible starting points, and client responds with the
1✔
2298
        // correct list of blocks.
1✔
2299
        let network = Network::Main;
1✔
2300
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, mut state_lock, hsd) =
1✔
2301
            get_test_genesis_setup(network, 0, cli_args::Args::default())
1✔
2302
                .await
1✔
2303
                .unwrap();
1✔
2304
        let genesis_block: Block = Block::genesis(network);
1✔
2305
        let peer_address = get_dummy_socket_address(0);
1✔
2306
        let [block_1, block_2, block_3, block_4, block_5] =
1✔
2307
            fake_valid_sequence_of_blocks_for_tests(
1✔
2308
                &genesis_block,
1✔
2309
                Timestamp::hours(1),
1✔
2310
                StdRng::seed_from_u64(5550001).random(),
1✔
2311
            )
1✔
2312
            .await;
1✔
2313
        let blocks = vec![
1✔
2314
            genesis_block,
1✔
2315
            block_1,
1✔
2316
            block_2,
1✔
2317
            block_3,
1✔
2318
            block_4,
1✔
2319
            block_5.clone(),
1✔
2320
        ];
1✔
2321
        for block in blocks.iter().skip(1) {
5✔
2322
            state_lock.set_new_tip(block.to_owned()).await.unwrap();
5✔
2323
        }
1✔
2324

1✔
2325
        let mmra = state_lock
1✔
2326
            .lock_guard()
1✔
2327
            .await
1✔
2328
            .chain
1✔
2329
            .archival_state()
1✔
2330
            .archival_block_mmr
1✔
2331
            .ammr()
1✔
2332
            .to_accumulator_async()
1✔
2333
            .await;
1✔
2334
        for i in 0..=4 {
6✔
2335
            let expected_response = {
5✔
2336
                let state = state_lock.lock_guard().await;
5✔
2337
                let blocks_for_response = blocks.iter().skip(i + 1).cloned().collect_vec();
5✔
2338
                PeerLoopHandler::batch_response(&state, blocks_for_response, &mmra)
5✔
2339
                    .await
5✔
2340
                    .unwrap()
5✔
2341
            };
5✔
2342
            let mock = Mock::new(vec![
5✔
2343
                Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch {
5✔
2344
                    known_blocks: vec![blocks[i].hash()],
5✔
2345
                    max_response_len: 14,
5✔
2346
                    anchor: mmra.clone(),
5✔
2347
                })),
5✔
2348
                Action::Write(PeerMessage::BlockResponseBatch(expected_response)),
5✔
2349
                Action::Read(PeerMessage::Bye),
5✔
2350
            ]);
5✔
2351
            let mut peer_loop_handler = PeerLoopHandler::new(
5✔
2352
                to_main_tx.clone(),
5✔
2353
                state_lock.clone(),
5✔
2354
                peer_address,
5✔
2355
                hsd.clone(),
5✔
2356
                false,
5✔
2357
                1,
5✔
2358
            );
5✔
2359

5✔
2360
            peer_loop_handler
5✔
2361
                .run_wrapper(mock, from_main_rx_clone.resubscribe())
5✔
2362
                .await
5✔
2363
                .unwrap();
5✔
2364
        }
1✔
2365
    }
1✔
2366

2367
    #[traced_test]
×
2368
    #[tokio::test]
2369
    async fn block_request_batch_in_order_test() -> Result<()> {
1✔
2370
        // Scenario: A fork began at block 2, node knows two blocks of height 2 and two of height 3.
1✔
2371
        // A peer requests a batch of blocks starting from block 1. Ensure that the correct blocks
1✔
2372
        // are returned.
1✔
2373

1✔
2374
        let network = Network::Main;
1✔
2375
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, mut state_lock, hsd) =
1✔
2376
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
2377
        let genesis_block: Block = Block::genesis(network);
1✔
2378
        let peer_address = get_dummy_socket_address(0);
1✔
2379
        let [block_1, block_2_a, block_3_a] = fake_valid_sequence_of_blocks_for_tests(
1✔
2380
            &genesis_block,
1✔
2381
            Timestamp::hours(1),
1✔
2382
            StdRng::seed_from_u64(5550001).random(),
1✔
2383
        )
1✔
2384
        .await;
1✔
2385
        let [block_2_b, block_3_b] = fake_valid_sequence_of_blocks_for_tests(
1✔
2386
            &block_1,
1✔
2387
            Timestamp::hours(1),
1✔
2388
            StdRng::seed_from_u64(5550002).random(),
1✔
2389
        )
1✔
2390
        .await;
1✔
2391
        assert_ne!(block_2_b.hash(), block_2_a.hash());
1✔
2392

1✔
2393
        state_lock.set_new_tip(block_1.clone()).await?;
1✔
2394
        state_lock.set_new_tip(block_2_a.clone()).await?;
1✔
2395
        state_lock.set_new_tip(block_2_b.clone()).await?;
1✔
2396
        state_lock.set_new_tip(block_3_b.clone()).await?;
1✔
2397
        state_lock.set_new_tip(block_3_a.clone()).await?;
1✔
2398

1✔
2399
        let anchor = state_lock
1✔
2400
            .lock_guard()
1✔
2401
            .await
1✔
2402
            .chain
1✔
2403
            .archival_state()
1✔
2404
            .archival_block_mmr
1✔
2405
            .ammr()
1✔
2406
            .to_accumulator_async()
1✔
2407
            .await;
1✔
2408
        let response_1 = {
1✔
2409
            let state_lock = state_lock.lock_guard().await;
1✔
2410
            PeerLoopHandler::batch_response(
1✔
2411
                &state_lock,
1✔
2412
                vec![block_1.clone(), block_2_a.clone(), block_3_a.clone()],
1✔
2413
                &anchor,
1✔
2414
            )
1✔
2415
            .await
1✔
2416
            .unwrap()
1✔
2417
        };
1✔
2418

1✔
2419
        let mut mock = Mock::new(vec![
1✔
2420
            Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch {
1✔
2421
                known_blocks: vec![genesis_block.hash()],
1✔
2422
                max_response_len: 14,
1✔
2423
                anchor: anchor.clone(),
1✔
2424
            })),
1✔
2425
            Action::Write(PeerMessage::BlockResponseBatch(response_1)),
1✔
2426
            Action::Read(PeerMessage::Bye),
1✔
2427
        ]);
1✔
2428

1✔
2429
        let mut peer_loop_handler_1 = PeerLoopHandler::with_mocked_time(
1✔
2430
            to_main_tx.clone(),
1✔
2431
            state_lock.clone(),
1✔
2432
            peer_address,
1✔
2433
            hsd.clone(),
1✔
2434
            false,
1✔
2435
            1,
1✔
2436
            block_3_a.header().timestamp,
1✔
2437
        );
1✔
2438

1✔
2439
        peer_loop_handler_1
1✔
2440
            .run_wrapper(mock, from_main_rx_clone.resubscribe())
1✔
2441
            .await?;
1✔
2442

1✔
2443
        // Peer knows block 2_b, verify that canonical chain with 2_a is returned
1✔
2444
        let response_2 = {
1✔
2445
            let state_lock = state_lock.lock_guard().await;
1✔
2446
            PeerLoopHandler::batch_response(
1✔
2447
                &state_lock,
1✔
2448
                vec![block_2_a, block_3_a.clone()],
1✔
2449
                &anchor,
1✔
2450
            )
1✔
2451
            .await
1✔
2452
            .unwrap()
1✔
2453
        };
1✔
2454
        mock = Mock::new(vec![
1✔
2455
            Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch {
1✔
2456
                known_blocks: vec![block_2_b.hash(), block_1.hash(), genesis_block.hash()],
1✔
2457
                max_response_len: 14,
1✔
2458
                anchor,
1✔
2459
            })),
1✔
2460
            Action::Write(PeerMessage::BlockResponseBatch(response_2)),
1✔
2461
            Action::Read(PeerMessage::Bye),
1✔
2462
        ]);
1✔
2463

1✔
2464
        let mut peer_loop_handler_2 = PeerLoopHandler::with_mocked_time(
1✔
2465
            to_main_tx.clone(),
1✔
2466
            state_lock.clone(),
1✔
2467
            peer_address,
1✔
2468
            hsd,
1✔
2469
            false,
1✔
2470
            1,
1✔
2471
            block_3_a.header().timestamp,
1✔
2472
        );
1✔
2473

1✔
2474
        peer_loop_handler_2
1✔
2475
            .run_wrapper(mock, from_main_rx_clone)
1✔
2476
            .await?;
1✔
2477

1✔
2478
        Ok(())
1✔
2479
    }
1✔
2480

2481
    #[traced_test]
×
2482
    #[tokio::test]
2483
    async fn block_request_batch_out_of_order_test() -> Result<()> {
1✔
2484
        // Scenario: A fork began at block 2, node knows two blocks of height 2 and two of height 3.
1✔
2485
        // A peer requests a batch of blocks starting from block 1, but the peer supplies their
1✔
2486
        // hashes in a wrong order. Ensure that the correct blocks are returned, in the right order.
1✔
2487
        // The blocks will be supplied in the correct order but starting from the first digest in
1✔
2488
        // the list that is known and canonical.
1✔
2489

1✔
2490
        let network = Network::Main;
1✔
2491
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, mut state_lock, hsd) =
1✔
2492
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
2493
        let genesis_block = Block::genesis(network);
1✔
2494
        let peer_address = get_dummy_socket_address(0);
1✔
2495
        let [block_1, block_2_a, block_3_a] = fake_valid_sequence_of_blocks_for_tests(
1✔
2496
            &genesis_block,
1✔
2497
            Timestamp::hours(1),
1✔
2498
            StdRng::seed_from_u64(5550001).random(),
1✔
2499
        )
1✔
2500
        .await;
1✔
2501
        let [block_2_b, block_3_b] = fake_valid_sequence_of_blocks_for_tests(
1✔
2502
            &block_1,
1✔
2503
            Timestamp::hours(1),
1✔
2504
            StdRng::seed_from_u64(5550002).random(),
1✔
2505
        )
1✔
2506
        .await;
1✔
2507
        assert_ne!(block_2_a.hash(), block_2_b.hash());
1✔
2508

1✔
2509
        state_lock.set_new_tip(block_1.clone()).await?;
1✔
2510
        state_lock.set_new_tip(block_2_a.clone()).await?;
1✔
2511
        state_lock.set_new_tip(block_2_b.clone()).await?;
1✔
2512
        state_lock.set_new_tip(block_3_b.clone()).await?;
1✔
2513
        state_lock.set_new_tip(block_3_a.clone()).await?;
1✔
2514

1✔
2515
        // Peer knows block 2_b, verify that canonical chain with 2_a is returned
1✔
2516
        let mut expected_anchor = block_3_a.body().block_mmr_accumulator.clone();
1✔
2517
        expected_anchor.append(block_3_a.hash());
1✔
2518
        let state_anchor = state_lock
1✔
2519
            .lock_guard()
1✔
2520
            .await
1✔
2521
            .chain
1✔
2522
            .archival_state()
1✔
2523
            .archival_block_mmr
1✔
2524
            .ammr()
1✔
2525
            .to_accumulator_async()
1✔
2526
            .await;
1✔
2527
        assert_eq!(
1✔
2528
            expected_anchor, state_anchor,
1✔
2529
            "Catching assumption about MMRA in tip and in archival state"
1✔
2530
        );
1✔
2531

1✔
2532
        let response = {
1✔
2533
            let state_lock = state_lock.lock_guard().await;
1✔
2534
            PeerLoopHandler::batch_response(
1✔
2535
                &state_lock,
1✔
2536
                vec![block_1.clone(), block_2_a, block_3_a.clone()],
1✔
2537
                &expected_anchor,
1✔
2538
            )
1✔
2539
            .await
1✔
2540
            .unwrap()
1✔
2541
        };
1✔
2542
        let mock = Mock::new(vec![
1✔
2543
            Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch {
1✔
2544
                known_blocks: vec![block_2_b.hash(), genesis_block.hash(), block_1.hash()],
1✔
2545
                max_response_len: 14,
1✔
2546
                anchor: expected_anchor,
1✔
2547
            })),
1✔
2548
            // Since genesis block is the 1st known in the list of known blocks,
1✔
2549
            // it's immediate descendent, block_1, is the first one returned.
1✔
2550
            Action::Write(PeerMessage::BlockResponseBatch(response)),
1✔
2551
            Action::Read(PeerMessage::Bye),
1✔
2552
        ]);
1✔
2553

1✔
2554
        let mut peer_loop_handler_2 = PeerLoopHandler::with_mocked_time(
1✔
2555
            to_main_tx.clone(),
1✔
2556
            state_lock.clone(),
1✔
2557
            peer_address,
1✔
2558
            hsd,
1✔
2559
            false,
1✔
2560
            1,
1✔
2561
            block_3_a.header().timestamp,
1✔
2562
        );
1✔
2563

1✔
2564
        peer_loop_handler_2
1✔
2565
            .run_wrapper(mock, from_main_rx_clone)
1✔
2566
            .await?;
1✔
2567

1✔
2568
        Ok(())
1✔
2569
    }
1✔
2570

2571
    #[traced_test]
×
2572
    #[tokio::test]
2573
    async fn request_unknown_height_doesnt_crash() {
1✔
2574
        // Scenario: Only genesis block is known. Peer requests block of height
1✔
2575
        // 2.
1✔
2576
        let network = Network::Main;
1✔
2577
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, state_lock, hsd) =
1✔
2578
            get_test_genesis_setup(network, 0, cli_args::Args::default())
1✔
2579
                .await
1✔
2580
                .unwrap();
1✔
2581
        let peer_address = get_dummy_socket_address(0);
1✔
2582
        let mock = Mock::new(vec![
1✔
2583
            Action::Read(PeerMessage::BlockRequestByHeight(2.into())),
1✔
2584
            Action::Read(PeerMessage::Bye),
1✔
2585
        ]);
1✔
2586

1✔
2587
        let mut peer_loop_handler = PeerLoopHandler::new(
1✔
2588
            to_main_tx.clone(),
1✔
2589
            state_lock.clone(),
1✔
2590
            peer_address,
1✔
2591
            hsd,
1✔
2592
            false,
1✔
2593
            1,
1✔
2594
        );
1✔
2595

1✔
2596
        // This will return error if seen read/write order does not match that of the
1✔
2597
        // mocked object.
1✔
2598
        peer_loop_handler
1✔
2599
            .run_wrapper(mock, from_main_rx_clone)
1✔
2600
            .await
1✔
2601
            .unwrap();
1✔
2602

1✔
2603
        // Verify that peer is sanctioned for this nonsense.
1✔
2604
        assert!(state_lock
1✔
2605
            .lock_guard()
1✔
2606
            .await
1✔
2607
            .net
1✔
2608
            .get_peer_standing_from_database(peer_address.ip())
1✔
2609
            .await
1✔
2610
            .unwrap()
1✔
2611
            .standing
1✔
2612
            .is_negative());
1✔
2613
    }
1✔
2614

2615
    #[traced_test]
×
2616
    #[tokio::test]
2617
    async fn find_canonical_chain_when_multiple_blocks_at_same_height_test() -> Result<()> {
1✔
2618
        // Scenario: A fork began at block 2, node knows two blocks of height 2 and two of height 3.
1✔
2619
        // A peer requests a block at height 2. Verify that the correct block at height 2 is
1✔
2620
        // returned.
1✔
2621

1✔
2622
        let network = Network::Main;
1✔
2623
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, mut state_lock, hsd) =
1✔
2624
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
2625
        let genesis_block = Block::genesis(network);
1✔
2626
        let peer_address = get_dummy_socket_address(0);
1✔
2627

1✔
2628
        let [block_1, block_2_a, block_3_a] = fake_valid_sequence_of_blocks_for_tests(
1✔
2629
            &genesis_block,
1✔
2630
            Timestamp::hours(1),
1✔
2631
            StdRng::seed_from_u64(5550001).random(),
1✔
2632
        )
1✔
2633
        .await;
1✔
2634
        let [block_2_b, block_3_b] = fake_valid_sequence_of_blocks_for_tests(
1✔
2635
            &block_1,
1✔
2636
            Timestamp::hours(1),
1✔
2637
            StdRng::seed_from_u64(5550002).random(),
1✔
2638
        )
1✔
2639
        .await;
1✔
2640
        assert_ne!(block_2_a.hash(), block_2_b.hash());
1✔
2641

1✔
2642
        state_lock.set_new_tip(block_1.clone()).await?;
1✔
2643
        state_lock.set_new_tip(block_2_a.clone()).await?;
1✔
2644
        state_lock.set_new_tip(block_2_b.clone()).await?;
1✔
2645
        state_lock.set_new_tip(block_3_b.clone()).await?;
1✔
2646
        state_lock.set_new_tip(block_3_a.clone()).await?;
1✔
2647

1✔
2648
        let mock = Mock::new(vec![
1✔
2649
            Action::Read(PeerMessage::BlockRequestByHeight(2.into())),
1✔
2650
            Action::Write(PeerMessage::Block(Box::new(block_2_a.try_into().unwrap()))),
1✔
2651
            Action::Read(PeerMessage::BlockRequestByHeight(3.into())),
1✔
2652
            Action::Write(PeerMessage::Block(Box::new(
1✔
2653
                block_3_a.clone().try_into().unwrap(),
1✔
2654
            ))),
1✔
2655
            Action::Read(PeerMessage::Bye),
1✔
2656
        ]);
1✔
2657

1✔
2658
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
2659
            to_main_tx.clone(),
1✔
2660
            state_lock.clone(),
1✔
2661
            peer_address,
1✔
2662
            hsd,
1✔
2663
            false,
1✔
2664
            1,
1✔
2665
            block_3_a.header().timestamp,
1✔
2666
        );
1✔
2667

1✔
2668
        // This will return error if seen read/write order does not match that of the
1✔
2669
        // mocked object.
1✔
2670
        peer_loop_handler
1✔
2671
            .run_wrapper(mock, from_main_rx_clone)
1✔
2672
            .await?;
1✔
2673

1✔
2674
        Ok(())
1✔
2675
    }
1✔
2676

2677
    #[traced_test]
×
2678
    #[tokio::test]
2679
    async fn receival_of_block_notification_height_1() {
1✔
2680
        // Scenario: client only knows genesis block. Then receives block
1✔
2681
        // notification of height 1. Must request block 1.
1✔
2682
        let network = Network::Main;
1✔
2683
        let mut rng = StdRng::seed_from_u64(5552401);
1✔
2684
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, to_main_rx1, state_lock, hsd) =
1✔
2685
            get_test_genesis_setup(network, 0, cli_args::Args::default())
1✔
2686
                .await
1✔
2687
                .unwrap();
1✔
2688
        let block_1 = fake_valid_block_for_tests(&state_lock, rng.random()).await;
1✔
2689
        let notification_height1 = (&block_1).into();
1✔
2690
        let mock = Mock::new(vec![
1✔
2691
            Action::Read(PeerMessage::BlockNotification(notification_height1)),
1✔
2692
            Action::Write(PeerMessage::BlockRequestByHeight(1u64.into())),
1✔
2693
            Action::Read(PeerMessage::Bye),
1✔
2694
        ]);
1✔
2695

1✔
2696
        let peer_address = get_dummy_socket_address(0);
1✔
2697
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
2698
            to_main_tx.clone(),
1✔
2699
            state_lock.clone(),
1✔
2700
            peer_address,
1✔
2701
            hsd,
1✔
2702
            false,
1✔
2703
            1,
1✔
2704
            block_1.header().timestamp,
1✔
2705
        );
1✔
2706
        peer_loop_handler
1✔
2707
            .run_wrapper(mock, from_main_rx_clone)
1✔
2708
            .await
1✔
2709
            .unwrap();
1✔
2710

1✔
2711
        drop(to_main_rx1);
1✔
2712
    }
1✔
2713

2714
    #[traced_test]
×
2715
    #[tokio::test]
2716
    async fn receive_block_request_by_height_block_7() {
1✔
2717
        // Scenario: client only knows blocks up to height 7. Then receives block-
1✔
2718
        // request-by-height for height 7. Must respond with block 7.
1✔
2719
        let network = Network::Main;
1✔
2720
        let mut rng = StdRng::seed_from_u64(5552401);
1✔
2721
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, to_main_rx1, mut state_lock, hsd) =
1✔
2722
            get_test_genesis_setup(network, 0, cli_args::Args::default())
1✔
2723
                .await
1✔
2724
                .unwrap();
1✔
2725
        let genesis_block = Block::genesis(network);
1✔
2726
        let blocks: [Block; 7] = fake_valid_sequence_of_blocks_for_tests(
1✔
2727
            &genesis_block,
1✔
2728
            Timestamp::hours(1),
1✔
2729
            rng.random(),
1✔
2730
        )
1✔
2731
        .await;
1✔
2732
        let block7 = blocks.last().unwrap().to_owned();
1✔
2733
        let tip_height: u64 = block7.header().height.into();
1✔
2734
        assert_eq!(7, tip_height);
1✔
2735

1✔
2736
        for block in &blocks {
8✔
2737
            state_lock.set_new_tip(block.to_owned()).await.unwrap();
7✔
2738
        }
1✔
2739

1✔
2740
        let block7_response = PeerMessage::Block(Box::new(block7.try_into().unwrap()));
1✔
2741
        let mock = Mock::new(vec![
1✔
2742
            Action::Read(PeerMessage::BlockRequestByHeight(7u64.into())),
1✔
2743
            Action::Write(block7_response),
1✔
2744
            Action::Read(PeerMessage::Bye),
1✔
2745
        ]);
1✔
2746

1✔
2747
        let peer_address = get_dummy_socket_address(0);
1✔
2748
        let mut peer_loop_handler = PeerLoopHandler::new(
1✔
2749
            to_main_tx.clone(),
1✔
2750
            state_lock.clone(),
1✔
2751
            peer_address,
1✔
2752
            hsd,
1✔
2753
            false,
1✔
2754
            1,
1✔
2755
        );
1✔
2756
        peer_loop_handler
1✔
2757
            .run_wrapper(mock, from_main_rx_clone)
1✔
2758
            .await
1✔
2759
            .unwrap();
1✔
2760

1✔
2761
        drop(to_main_rx1);
1✔
2762
    }
1✔
2763

2764
    #[traced_test]
×
2765
    #[tokio::test]
2766
    async fn test_peer_loop_receival_of_first_block() -> Result<()> {
1✔
2767
        // Scenario: client only knows genesis block. Then receives block 1.
1✔
2768

1✔
2769
        let network = Network::Main;
1✔
2770
        let mut rng = StdRng::seed_from_u64(5550001);
1✔
2771
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, mut to_main_rx1, state_lock, hsd) =
1✔
2772
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
2773
        let peer_address = get_dummy_socket_address(0);
1✔
2774

1✔
2775
        let block_1 = fake_valid_block_for_tests(&state_lock, rng.random()).await;
1✔
2776
        let mock = Mock::new(vec![
1✔
2777
            Action::Read(PeerMessage::Block(Box::new(
1✔
2778
                block_1.clone().try_into().unwrap(),
1✔
2779
            ))),
1✔
2780
            Action::Read(PeerMessage::Bye),
1✔
2781
        ]);
1✔
2782

1✔
2783
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
2784
            to_main_tx.clone(),
1✔
2785
            state_lock.clone(),
1✔
2786
            peer_address,
1✔
2787
            hsd,
1✔
2788
            false,
1✔
2789
            1,
1✔
2790
            block_1.header().timestamp,
1✔
2791
        );
1✔
2792
        peer_loop_handler
1✔
2793
            .run_wrapper(mock, from_main_rx_clone)
1✔
2794
            .await?;
1✔
2795

1✔
2796
        // Verify that a block was sent to `main_loop`
1✔
2797
        match to_main_rx1.recv().await {
1✔
2798
            Some(PeerTaskToMain::NewBlocks(_block)) => (),
1✔
2799
            _ => bail!("Did not find msg sent to main task"),
1✔
2800
        };
1✔
2801

1✔
2802
        match to_main_rx1.recv().await {
1✔
2803
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
1✔
2804
            _ => bail!("Must receive remove of peer block max height"),
1✔
2805
        }
1✔
2806

1✔
2807
        if !state_lock.lock_guard().await.net.peer_map.is_empty() {
1✔
2808
            bail!("peer map must be empty after closing connection gracefully");
1✔
2809
        }
1✔
2810

1✔
2811
        Ok(())
1✔
2812
    }
1✔
2813

2814
    #[traced_test]
×
2815
    #[tokio::test]
2816
    async fn test_peer_loop_receival_of_second_block_no_blocks_in_db() -> Result<()> {
1✔
2817
        // In this scenario, the client only knows the genesis block (block 0) and then
1✔
2818
        // receives block 2, meaning that block 1 will have to be requested.
1✔
2819

1✔
2820
        let network = Network::Main;
1✔
2821
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, mut to_main_rx1, state_lock, hsd) =
1✔
2822
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
2823
        let peer_address = get_dummy_socket_address(0);
1✔
2824
        let genesis_block: Block = state_lock
1✔
2825
            .lock_guard()
1✔
2826
            .await
1✔
2827
            .chain
1✔
2828
            .archival_state()
1✔
2829
            .get_tip()
1✔
2830
            .await;
1✔
2831
        let [block_1, block_2] = fake_valid_sequence_of_blocks_for_tests(
1✔
2832
            &genesis_block,
1✔
2833
            Timestamp::hours(1),
1✔
2834
            StdRng::seed_from_u64(5550001).random(),
1✔
2835
        )
1✔
2836
        .await;
1✔
2837

1✔
2838
        let mock = Mock::new(vec![
1✔
2839
            Action::Read(PeerMessage::Block(Box::new(
1✔
2840
                block_2.clone().try_into().unwrap(),
1✔
2841
            ))),
1✔
2842
            Action::Write(PeerMessage::BlockRequestByHash(block_1.hash())),
1✔
2843
            Action::Read(PeerMessage::Block(Box::new(
1✔
2844
                block_1.clone().try_into().unwrap(),
1✔
2845
            ))),
1✔
2846
            Action::Read(PeerMessage::Bye),
1✔
2847
        ]);
1✔
2848

1✔
2849
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
2850
            to_main_tx.clone(),
1✔
2851
            state_lock.clone(),
1✔
2852
            peer_address,
1✔
2853
            hsd,
1✔
2854
            true,
1✔
2855
            1,
1✔
2856
            block_2.header().timestamp,
1✔
2857
        );
1✔
2858
        peer_loop_handler
1✔
2859
            .run_wrapper(mock, from_main_rx_clone)
1✔
2860
            .await?;
1✔
2861

1✔
2862
        match to_main_rx1.recv().await {
1✔
2863
            Some(PeerTaskToMain::NewBlocks(blocks)) => {
1✔
2864
                if blocks[0].hash() != block_1.hash() {
1✔
2865
                    bail!("1st received block by main loop must be block 1");
1✔
2866
                }
1✔
2867
                if blocks[1].hash() != block_2.hash() {
1✔
2868
                    bail!("2nd received block by main loop must be block 2");
1✔
2869
                }
1✔
2870
            }
1✔
2871
            _ => bail!("Did not find msg sent to main task 1"),
1✔
2872
        };
1✔
2873
        match to_main_rx1.recv().await {
1✔
2874
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
1✔
2875
            _ => bail!("Must receive remove of peer block max height"),
1✔
2876
        }
1✔
2877

1✔
2878
        if !state_lock.lock_guard().await.net.peer_map.is_empty() {
1✔
2879
            bail!("peer map must be empty after closing connection gracefully");
1✔
2880
        }
1✔
2881

1✔
2882
        Ok(())
1✔
2883
    }
1✔
2884

2885
    #[traced_test]
×
2886
    #[tokio::test]
2887
    async fn prevent_ram_exhaustion_test() -> Result<()> {
1✔
2888
        // In this scenario the peer sends more blocks than the client allows to store in the
1✔
2889
        // fork-reconciliation field. This should result in abandonment of the fork-reconciliation
1✔
2890
        // process as the alternative is that the program will crash because it runs out of RAM.
1✔
2891

1✔
2892
        let network = Network::Main;
1✔
2893
        let mut rng = StdRng::seed_from_u64(5550001);
1✔
2894
        let (
1✔
2895
            _peer_broadcast_tx,
1✔
2896
            from_main_rx_clone,
1✔
2897
            to_main_tx,
1✔
2898
            mut to_main_rx1,
1✔
2899
            mut state_lock,
1✔
2900
            _hsd,
1✔
2901
        ) = get_test_genesis_setup(network, 1, cli_args::Args::default()).await?;
1✔
2902
        let genesis_block = Block::genesis(network);
1✔
2903

1✔
2904
        // Restrict max number of blocks held in memory to 2.
1✔
2905
        let mut cli = state_lock.cli().clone();
1✔
2906
        cli.sync_mode_threshold = 2;
1✔
2907
        state_lock.set_cli(cli).await;
1✔
2908

1✔
2909
        let (hsd1, peer_address1) = get_dummy_peer_connection_data_genesis(Network::Alpha, 1);
1✔
2910
        let [block_1, _block_2, block_3, block_4] = fake_valid_sequence_of_blocks_for_tests(
1✔
2911
            &genesis_block,
1✔
2912
            Timestamp::hours(1),
1✔
2913
            rng.random(),
1✔
2914
        )
1✔
2915
        .await;
1✔
2916
        state_lock.set_new_tip(block_1.clone()).await?;
1✔
2917

1✔
2918
        let mock = Mock::new(vec![
1✔
2919
            Action::Read(PeerMessage::Block(Box::new(
1✔
2920
                block_4.clone().try_into().unwrap(),
1✔
2921
            ))),
1✔
2922
            Action::Write(PeerMessage::BlockRequestByHash(block_3.hash())),
1✔
2923
            Action::Read(PeerMessage::Block(Box::new(
1✔
2924
                block_3.clone().try_into().unwrap(),
1✔
2925
            ))),
1✔
2926
            Action::Read(PeerMessage::Bye),
1✔
2927
        ]);
1✔
2928

1✔
2929
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
2930
            to_main_tx.clone(),
1✔
2931
            state_lock.clone(),
1✔
2932
            peer_address1,
1✔
2933
            hsd1,
1✔
2934
            true,
1✔
2935
            1,
1✔
2936
            block_4.header().timestamp,
1✔
2937
        );
1✔
2938
        peer_loop_handler
1✔
2939
            .run_wrapper(mock, from_main_rx_clone)
1✔
2940
            .await?;
1✔
2941

1✔
2942
        match to_main_rx1.recv().await {
1✔
2943
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
1✔
2944
            _ => bail!("Must receive remove of peer block max height"),
1✔
2945
        }
1✔
2946

1✔
2947
        // Verify that no block is sent to main loop.
1✔
2948
        match to_main_rx1.try_recv() {
1✔
2949
            Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (),
1✔
2950
            _ => bail!("Peer must not handle more fork-reconciliation blocks than specified in CLI arguments"),
1✔
2951
        };
1✔
2952
        drop(to_main_tx);
1✔
2953

1✔
2954
        // Verify that peer is sanctioned for failed fork reconciliation attempt
1✔
2955
        assert!(state_lock
1✔
2956
            .lock_guard()
1✔
2957
            .await
1✔
2958
            .net
1✔
2959
            .get_peer_standing_from_database(peer_address1.ip())
1✔
2960
            .await
1✔
2961
            .unwrap()
1✔
2962
            .standing
1✔
2963
            .is_negative());
1✔
2964

1✔
2965
        Ok(())
1✔
2966
    }
1✔
2967

2968
    #[traced_test]
×
2969
    #[tokio::test]
2970
    async fn test_peer_loop_receival_of_fourth_block_one_block_in_db() {
1✔
2971
        // In this scenario, the client know the genesis block (block 0) and block 1, it
1✔
2972
        // then receives block 4, meaning that block 3 and 2 will have to be requested.
1✔
2973

1✔
2974
        let network = Network::Main;
1✔
2975
        let (
1✔
2976
            _peer_broadcast_tx,
1✔
2977
            from_main_rx_clone,
1✔
2978
            to_main_tx,
1✔
2979
            mut to_main_rx1,
1✔
2980
            mut state_lock,
1✔
2981
            hsd,
1✔
2982
        ) = get_test_genesis_setup(network, 0, cli_args::Args::default())
1✔
2983
            .await
1✔
2984
            .unwrap();
1✔
2985
        let peer_address: SocketAddr = get_dummy_socket_address(0);
1✔
2986
        let genesis_block = Block::genesis(network);
1✔
2987
        let [block_1, block_2, block_3, block_4] = fake_valid_sequence_of_blocks_for_tests(
1✔
2988
            &genesis_block,
1✔
2989
            Timestamp::hours(1),
1✔
2990
            StdRng::seed_from_u64(5550001).random(),
1✔
2991
        )
1✔
2992
        .await;
1✔
2993
        state_lock.set_new_tip(block_1.clone()).await.unwrap();
1✔
2994

1✔
2995
        let mock = Mock::new(vec![
1✔
2996
            Action::Read(PeerMessage::Block(Box::new(
1✔
2997
                block_4.clone().try_into().unwrap(),
1✔
2998
            ))),
1✔
2999
            Action::Write(PeerMessage::BlockRequestByHash(block_3.hash())),
1✔
3000
            Action::Read(PeerMessage::Block(Box::new(
1✔
3001
                block_3.clone().try_into().unwrap(),
1✔
3002
            ))),
1✔
3003
            Action::Write(PeerMessage::BlockRequestByHash(block_2.hash())),
1✔
3004
            Action::Read(PeerMessage::Block(Box::new(
1✔
3005
                block_2.clone().try_into().unwrap(),
1✔
3006
            ))),
1✔
3007
            Action::Read(PeerMessage::Bye),
1✔
3008
        ]);
1✔
3009

1✔
3010
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
3011
            to_main_tx.clone(),
1✔
3012
            state_lock.clone(),
1✔
3013
            peer_address,
1✔
3014
            hsd,
1✔
3015
            true,
1✔
3016
            1,
1✔
3017
            block_4.header().timestamp,
1✔
3018
        );
1✔
3019
        peer_loop_handler
1✔
3020
            .run_wrapper(mock, from_main_rx_clone)
1✔
3021
            .await
1✔
3022
            .unwrap();
1✔
3023

1✔
3024
        let Some(PeerTaskToMain::NewBlocks(blocks)) = to_main_rx1.recv().await else {
1✔
3025
            panic!("Did not find msg sent to main task");
1✔
3026
        };
1✔
3027
        assert_eq!(blocks[0].hash(), block_2.hash());
1✔
3028
        assert_eq!(blocks[1].hash(), block_3.hash());
1✔
3029
        assert_eq!(blocks[2].hash(), block_4.hash());
1✔
3030

1✔
3031
        let Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) = to_main_rx1.recv().await else {
1✔
3032
            panic!("Must receive remove of peer block max height");
1✔
3033
        };
1✔
3034

1✔
3035
        assert!(
1✔
3036
            state_lock.lock_guard().await.net.peer_map.is_empty(),
1✔
3037
            "peer map must be empty after closing connection gracefully"
1✔
3038
        );
1✔
3039
    }
1✔
3040

3041
    #[traced_test]
×
3042
    #[tokio::test]
3043
    async fn test_peer_loop_receival_of_third_block_no_blocks_in_db() -> Result<()> {
1✔
3044
        // In this scenario, the client only knows the genesis block (block 0) and then
1✔
3045
        // receives block 3, meaning that block 2 and 1 will have to be requested.
1✔
3046

1✔
3047
        let network = Network::Main;
1✔
3048
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, mut to_main_rx1, state_lock, hsd) =
1✔
3049
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
3050
        let peer_address = get_dummy_socket_address(0);
1✔
3051
        let genesis_block = Block::genesis(network);
1✔
3052

1✔
3053
        let [block_1, block_2, block_3] = fake_valid_sequence_of_blocks_for_tests(
1✔
3054
            &genesis_block,
1✔
3055
            Timestamp::hours(1),
1✔
3056
            StdRng::seed_from_u64(5550001).random(),
1✔
3057
        )
1✔
3058
        .await;
1✔
3059

1✔
3060
        let mock = Mock::new(vec![
1✔
3061
            Action::Read(PeerMessage::Block(Box::new(
1✔
3062
                block_3.clone().try_into().unwrap(),
1✔
3063
            ))),
1✔
3064
            Action::Write(PeerMessage::BlockRequestByHash(block_2.hash())),
1✔
3065
            Action::Read(PeerMessage::Block(Box::new(
1✔
3066
                block_2.clone().try_into().unwrap(),
1✔
3067
            ))),
1✔
3068
            Action::Write(PeerMessage::BlockRequestByHash(block_1.hash())),
1✔
3069
            Action::Read(PeerMessage::Block(Box::new(
1✔
3070
                block_1.clone().try_into().unwrap(),
1✔
3071
            ))),
1✔
3072
            Action::Read(PeerMessage::Bye),
1✔
3073
        ]);
1✔
3074

1✔
3075
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
3076
            to_main_tx.clone(),
1✔
3077
            state_lock.clone(),
1✔
3078
            peer_address,
1✔
3079
            hsd,
1✔
3080
            true,
1✔
3081
            1,
1✔
3082
            block_3.header().timestamp,
1✔
3083
        );
1✔
3084
        peer_loop_handler
1✔
3085
            .run_wrapper(mock, from_main_rx_clone)
1✔
3086
            .await?;
1✔
3087

1✔
3088
        match to_main_rx1.recv().await {
1✔
3089
            Some(PeerTaskToMain::NewBlocks(blocks)) => {
1✔
3090
                if blocks[0].hash() != block_1.hash() {
1✔
3091
                    bail!("1st received block by main loop must be block 1");
1✔
3092
                }
1✔
3093
                if blocks[1].hash() != block_2.hash() {
1✔
3094
                    bail!("2nd received block by main loop must be block 2");
1✔
3095
                }
1✔
3096
                if blocks[2].hash() != block_3.hash() {
1✔
3097
                    bail!("3rd received block by main loop must be block 3");
1✔
3098
                }
1✔
3099
            }
1✔
3100
            _ => bail!("Did not find msg sent to main task"),
1✔
3101
        };
1✔
3102
        match to_main_rx1.recv().await {
1✔
3103
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
1✔
3104
            _ => bail!("Must receive remove of peer block max height"),
1✔
3105
        }
1✔
3106

1✔
3107
        if !state_lock.lock_guard().await.net.peer_map.is_empty() {
1✔
3108
            bail!("peer map must be empty after closing connection gracefully");
1✔
3109
        }
1✔
3110

1✔
3111
        Ok(())
1✔
3112
    }
1✔
3113

3114
    #[traced_test]
×
3115
    #[tokio::test]
3116
    async fn test_block_reconciliation_interrupted_by_block_notification() -> Result<()> {
1✔
3117
        // In this scenario, the client know the genesis block (block 0) and block 1, it
1✔
3118
        // then receives block 4, meaning that block 3, 2, and 1 will have to be requested.
1✔
3119
        // But the requests are interrupted by the peer sending another message: a new block
1✔
3120
        // notification.
1✔
3121

1✔
3122
        let network = Network::Main;
1✔
3123
        let (
1✔
3124
            _peer_broadcast_tx,
1✔
3125
            from_main_rx_clone,
1✔
3126
            to_main_tx,
1✔
3127
            mut to_main_rx1,
1✔
3128
            mut state_lock,
1✔
3129
            hsd,
1✔
3130
        ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
3131
        let peer_socket_address: SocketAddr = get_dummy_socket_address(0);
1✔
3132
        let genesis_block: Block = state_lock
1✔
3133
            .lock_guard()
1✔
3134
            .await
1✔
3135
            .chain
1✔
3136
            .archival_state()
1✔
3137
            .get_tip()
1✔
3138
            .await;
1✔
3139

1✔
3140
        let [block_1, block_2, block_3, block_4, block_5] =
1✔
3141
            fake_valid_sequence_of_blocks_for_tests(
1✔
3142
                &genesis_block,
1✔
3143
                Timestamp::hours(1),
1✔
3144
                StdRng::seed_from_u64(5550001).random(),
1✔
3145
            )
1✔
3146
            .await;
1✔
3147
        state_lock.set_new_tip(block_1.clone()).await?;
1✔
3148

1✔
3149
        let mock = Mock::new(vec![
1✔
3150
            Action::Read(PeerMessage::Block(Box::new(
1✔
3151
                block_4.clone().try_into().unwrap(),
1✔
3152
            ))),
1✔
3153
            Action::Write(PeerMessage::BlockRequestByHash(block_3.hash())),
1✔
3154
            Action::Read(PeerMessage::Block(Box::new(
1✔
3155
                block_3.clone().try_into().unwrap(),
1✔
3156
            ))),
1✔
3157
            Action::Write(PeerMessage::BlockRequestByHash(block_2.hash())),
1✔
3158
            //
1✔
3159
            // Now make the interruption of the block reconciliation process
1✔
3160
            Action::Read(PeerMessage::BlockNotification((&block_5).into())),
1✔
3161
            //
1✔
3162
            // Complete the block reconciliation process by requesting the last block
1✔
3163
            // in this process, to get back to a mutually known block.
1✔
3164
            Action::Read(PeerMessage::Block(Box::new(
1✔
3165
                block_2.clone().try_into().unwrap(),
1✔
3166
            ))),
1✔
3167
            //
1✔
3168
            // Then anticipate the request of the block that was announced
1✔
3169
            // in the interruption.
1✔
3170
            // Note that we cannot anticipate the response, as only the main
1✔
3171
            // task writes to the database. And the database needs to be updated
1✔
3172
            // for the handling of block 5 to be done correctly.
1✔
3173
            Action::Write(PeerMessage::BlockRequestByHeight(
1✔
3174
                block_5.kernel.header.height,
1✔
3175
            )),
1✔
3176
            Action::Read(PeerMessage::Bye),
1✔
3177
        ]);
1✔
3178

1✔
3179
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
3180
            to_main_tx.clone(),
1✔
3181
            state_lock.clone(),
1✔
3182
            peer_socket_address,
1✔
3183
            hsd,
1✔
3184
            false,
1✔
3185
            1,
1✔
3186
            block_5.header().timestamp,
1✔
3187
        );
1✔
3188
        peer_loop_handler
1✔
3189
            .run_wrapper(mock, from_main_rx_clone)
1✔
3190
            .await?;
1✔
3191

1✔
3192
        match to_main_rx1.recv().await {
1✔
3193
            Some(PeerTaskToMain::NewBlocks(blocks)) => {
1✔
3194
                if blocks[0].hash() != block_2.hash() {
1✔
3195
                    bail!("1st received block by main loop must be block 1");
1✔
3196
                }
1✔
3197
                if blocks[1].hash() != block_3.hash() {
1✔
3198
                    bail!("2nd received block by main loop must be block 2");
1✔
3199
                }
1✔
3200
                if blocks[2].hash() != block_4.hash() {
1✔
3201
                    bail!("3rd received block by main loop must be block 3");
1✔
3202
                }
1✔
3203
            }
1✔
3204
            _ => bail!("Did not find msg sent to main task"),
1✔
3205
        };
1✔
3206
        match to_main_rx1.recv().await {
1✔
3207
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
1✔
3208
            _ => bail!("Must receive remove of peer block max height"),
1✔
3209
        }
1✔
3210

1✔
3211
        if !state_lock.lock_guard().await.net.peer_map.is_empty() {
1✔
3212
            bail!("peer map must be empty after closing connection gracefully");
1✔
3213
        }
1✔
3214

1✔
3215
        Ok(())
1✔
3216
    }
1✔
3217

3218
    #[traced_test]
×
3219
    #[tokio::test]
3220
    async fn test_block_reconciliation_interrupted_by_peer_list_request() -> Result<()> {
1✔
3221
        // In this scenario, the client knows the genesis block (block 0) and block 1, it
1✔
3222
        // then receives block 4, meaning that block 3, 2, and 1 will have to be requested.
1✔
3223
        // But the requests are interrupted by the peer sending another message: a request
1✔
3224
        // for a list of peers.
1✔
3225

1✔
3226
        let network = Network::Main;
1✔
3227
        let (
1✔
3228
            _peer_broadcast_tx,
1✔
3229
            from_main_rx_clone,
1✔
3230
            to_main_tx,
1✔
3231
            mut to_main_rx1,
1✔
3232
            mut state_lock,
1✔
3233
            _hsd,
1✔
3234
        ) = get_test_genesis_setup(network, 1, cli_args::Args::default()).await?;
1✔
3235
        let genesis_block = Block::genesis(network);
1✔
3236
        let peer_infos: Vec<PeerInfo> = state_lock
1✔
3237
            .lock_guard()
1✔
3238
            .await
1✔
3239
            .net
1✔
3240
            .peer_map
1✔
3241
            .clone()
1✔
3242
            .into_values()
1✔
3243
            .collect::<Vec<_>>();
1✔
3244

1✔
3245
        let [block_1, block_2, block_3, block_4] = fake_valid_sequence_of_blocks_for_tests(
1✔
3246
            &genesis_block,
1✔
3247
            Timestamp::hours(1),
1✔
3248
            StdRng::seed_from_u64(5550001).random(),
1✔
3249
        )
1✔
3250
        .await;
1✔
3251
        state_lock.set_new_tip(block_1.clone()).await?;
1✔
3252

1✔
3253
        let (hsd_1, sa_1) = get_dummy_peer_connection_data_genesis(network, 1);
1✔
3254
        let expected_peer_list_resp = vec![
1✔
3255
            (
1✔
3256
                peer_infos[0].listen_address().unwrap(),
1✔
3257
                peer_infos[0].instance_id(),
1✔
3258
            ),
1✔
3259
            (sa_1, hsd_1.instance_id),
1✔
3260
        ];
1✔
3261
        let mock = Mock::new(vec![
1✔
3262
            Action::Read(PeerMessage::Block(Box::new(
1✔
3263
                block_4.clone().try_into().unwrap(),
1✔
3264
            ))),
1✔
3265
            Action::Write(PeerMessage::BlockRequestByHash(block_3.hash())),
1✔
3266
            Action::Read(PeerMessage::Block(Box::new(
1✔
3267
                block_3.clone().try_into().unwrap(),
1✔
3268
            ))),
1✔
3269
            Action::Write(PeerMessage::BlockRequestByHash(block_2.hash())),
1✔
3270
            //
1✔
3271
            // Now make the interruption of the block reconciliation process
1✔
3272
            Action::Read(PeerMessage::PeerListRequest),
1✔
3273
            //
1✔
3274
            // Answer the request for a peer list
1✔
3275
            Action::Write(PeerMessage::PeerListResponse(expected_peer_list_resp)),
1✔
3276
            //
1✔
3277
            // Complete the block reconciliation process by requesting the last block
1✔
3278
            // in this process, to get back to a mutually known block.
1✔
3279
            Action::Read(PeerMessage::Block(Box::new(
1✔
3280
                block_2.clone().try_into().unwrap(),
1✔
3281
            ))),
1✔
3282
            Action::Read(PeerMessage::Bye),
1✔
3283
        ]);
1✔
3284

1✔
3285
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
3286
            to_main_tx,
1✔
3287
            state_lock.clone(),
1✔
3288
            sa_1,
1✔
3289
            hsd_1,
1✔
3290
            true,
1✔
3291
            1,
1✔
3292
            block_4.header().timestamp,
1✔
3293
        );
1✔
3294
        peer_loop_handler
1✔
3295
            .run_wrapper(mock, from_main_rx_clone)
1✔
3296
            .await?;
1✔
3297

1✔
3298
        // Verify that blocks are sent to `main_loop` in expected ordering
1✔
3299
        match to_main_rx1.recv().await {
1✔
3300
            Some(PeerTaskToMain::NewBlocks(blocks)) => {
1✔
3301
                if blocks[0].hash() != block_2.hash() {
1✔
3302
                    bail!("1st received block by main loop must be block 1");
1✔
3303
                }
1✔
3304
                if blocks[1].hash() != block_3.hash() {
1✔
3305
                    bail!("2nd received block by main loop must be block 2");
1✔
3306
                }
1✔
3307
                if blocks[2].hash() != block_4.hash() {
1✔
3308
                    bail!("3rd received block by main loop must be block 3");
1✔
3309
                }
1✔
3310
            }
1✔
3311
            _ => bail!("Did not find msg sent to main task"),
1✔
3312
        };
1✔
3313

1✔
3314
        assert_eq!(
1✔
3315
            1,
1✔
3316
            state_lock.lock_guard().await.net.peer_map.len(),
1✔
3317
            "One peer must remain in peer list after peer_1 closed gracefully"
1✔
3318
        );
1✔
3319

1✔
3320
        Ok(())
1✔
3321
    }
1✔
3322

3323
    #[traced_test]
×
3324
    #[tokio::test]
3325
    async fn receive_transaction_request() {
1✔
3326
        let network = Network::Main;
1✔
3327
        let dummy_tx = invalid_empty_single_proof_transaction();
1✔
3328
        let txid = dummy_tx.kernel.txid();
1✔
3329

1✔
3330
        for transaction_is_known in [false, true] {
3✔
3331
            let (_peer_broadcast_tx, from_main_rx, to_main_tx, _, mut state_lock, _hsd) =
2✔
3332
                get_test_genesis_setup(network, 1, cli_args::Args::default())
2✔
3333
                    .await
2✔
3334
                    .unwrap();
2✔
3335
            if transaction_is_known {
2✔
3336
                state_lock
1✔
3337
                    .lock_guard_mut()
1✔
3338
                    .await
1✔
3339
                    .mempool_insert(dummy_tx.clone(), TransactionOrigin::Own)
1✔
3340
                    .await;
1✔
3341
            }
1✔
3342

1✔
3343
            let mock = if transaction_is_known {
2✔
3344
                Mock::new(vec![
1✔
3345
                    Action::Read(PeerMessage::TransactionRequest(txid)),
1✔
3346
                    Action::Write(PeerMessage::Transaction(Box::new(
1✔
3347
                        (&dummy_tx).try_into().unwrap(),
1✔
3348
                    ))),
1✔
3349
                    Action::Read(PeerMessage::Bye),
1✔
3350
                ])
1✔
3351
            } else {
1✔
3352
                Mock::new(vec![
1✔
3353
                    Action::Read(PeerMessage::TransactionRequest(txid)),
1✔
3354
                    Action::Read(PeerMessage::Bye),
1✔
3355
                ])
1✔
3356
            };
1✔
3357

1✔
3358
            let hsd = get_dummy_handshake_data_for_genesis(network);
2✔
3359
            let mut peer_state = MutablePeerState::new(hsd.tip_header.height);
2✔
3360
            let mut peer_loop_handler = PeerLoopHandler::new(
2✔
3361
                to_main_tx,
2✔
3362
                state_lock,
2✔
3363
                get_dummy_socket_address(0),
2✔
3364
                hsd,
2✔
3365
                true,
2✔
3366
                1,
2✔
3367
            );
2✔
3368

2✔
3369
            peer_loop_handler
2✔
3370
                .run(mock, from_main_rx, &mut peer_state)
2✔
3371
                .await
2✔
3372
                .unwrap();
2✔
3373
        }
1✔
3374
    }
1✔
3375

3376
    #[traced_test]
×
3377
    #[tokio::test]
3378
    async fn empty_mempool_request_tx_test() {
1✔
3379
        // In this scenario the client receives a transaction notification from
1✔
3380
        // a peer of a transaction it doesn't know; the client must then request it.
1✔
3381

1✔
3382
        let network = Network::Main;
1✔
3383
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, mut to_main_rx1, state_lock, _hsd) =
1✔
3384
            get_test_genesis_setup(network, 1, cli_args::Args::default())
1✔
3385
                .await
1✔
3386
                .unwrap();
1✔
3387

1✔
3388
        let spending_key = state_lock
1✔
3389
            .lock_guard()
1✔
3390
            .await
1✔
3391
            .wallet_state
1✔
3392
            .wallet_entropy
1✔
3393
            .nth_symmetric_key_for_tests(0);
1✔
3394
        let genesis_block = Block::genesis(network);
1✔
3395
        let now = genesis_block.kernel.header.timestamp;
1✔
3396
        let (transaction_1, _, _change_output) = state_lock
1✔
3397
            .lock_guard()
1✔
3398
            .await
1✔
3399
            .create_transaction_with_prover_capability(
1✔
3400
                Default::default(),
1✔
3401
                spending_key.into(),
1✔
3402
                UtxoNotificationMedium::OffChain,
1✔
3403
                NativeCurrencyAmount::coins(0),
1✔
3404
                now,
1✔
3405
                TxProvingCapability::ProofCollection,
1✔
3406
                &TritonVmJobQueue::dummy(),
1✔
3407
            )
1✔
3408
            .await
1✔
3409
            .unwrap();
1✔
3410

1✔
3411
        // Build the resulting transaction notification
1✔
3412
        let tx_notification: TransactionNotification = (&transaction_1).try_into().unwrap();
1✔
3413
        let mock = Mock::new(vec![
1✔
3414
            Action::Read(PeerMessage::TransactionNotification(tx_notification)),
1✔
3415
            Action::Write(PeerMessage::TransactionRequest(tx_notification.txid)),
1✔
3416
            Action::Read(PeerMessage::Transaction(Box::new(
1✔
3417
                (&transaction_1).try_into().unwrap(),
1✔
3418
            ))),
1✔
3419
            Action::Read(PeerMessage::Bye),
1✔
3420
        ]);
1✔
3421

1✔
3422
        let (hsd_1, _sa_1) = get_dummy_peer_connection_data_genesis(network, 1);
1✔
3423

1✔
3424
        // Mock a timestamp to allow transaction to be considered valid
1✔
3425
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
3426
            to_main_tx,
1✔
3427
            state_lock.clone(),
1✔
3428
            get_dummy_socket_address(0),
1✔
3429
            hsd_1.clone(),
1✔
3430
            true,
1✔
3431
            1,
1✔
3432
            now,
1✔
3433
        );
1✔
3434

1✔
3435
        let mut peer_state = MutablePeerState::new(hsd_1.tip_header.height);
1✔
3436

1✔
3437
        assert!(
1✔
3438
            state_lock.lock_guard().await.mempool.is_empty(),
1✔
3439
            "Mempool must be empty at init"
1✔
3440
        );
1✔
3441
        peer_loop_handler
1✔
3442
            .run(mock, from_main_rx_clone, &mut peer_state)
1✔
3443
            .await
1✔
3444
            .unwrap();
1✔
3445

1✔
3446
        // Transaction must be sent to `main_loop`. The transaction is stored to the mempool
1✔
3447
        // by the `main_loop`.
1✔
3448
        match to_main_rx1.recv().await {
1✔
3449
            Some(PeerTaskToMain::Transaction(_)) => (),
1✔
3450
            _ => panic!("Must receive remove of peer block max height"),
1✔
3451
        };
1✔
3452
    }
1✔
3453

3454
    #[traced_test]
×
3455
    #[tokio::test]
3456
    async fn populated_mempool_request_tx_test() -> Result<()> {
1✔
3457
        // In this scenario the peer is informed of a transaction that it already knows
1✔
3458

1✔
3459
        let network = Network::Main;
1✔
3460
        let (
1✔
3461
            _peer_broadcast_tx,
1✔
3462
            from_main_rx_clone,
1✔
3463
            to_main_tx,
1✔
3464
            mut to_main_rx1,
1✔
3465
            mut state_lock,
1✔
3466
            _hsd,
1✔
3467
        ) = get_test_genesis_setup(network, 1, cli_args::Args::default())
1✔
3468
            .await
1✔
3469
            .unwrap();
1✔
3470
        let spending_key = state_lock
1✔
3471
            .lock_guard()
1✔
3472
            .await
1✔
3473
            .wallet_state
1✔
3474
            .wallet_entropy
1✔
3475
            .nth_symmetric_key_for_tests(0);
1✔
3476

1✔
3477
        let genesis_block = Block::genesis(network);
1✔
3478
        let now = genesis_block.kernel.header.timestamp;
1✔
3479
        let (transaction_1, _, _change_output) = state_lock
1✔
3480
            .lock_guard()
1✔
3481
            .await
1✔
3482
            .create_transaction_with_prover_capability(
1✔
3483
                Default::default(),
1✔
3484
                spending_key.into(),
1✔
3485
                UtxoNotificationMedium::OffChain,
1✔
3486
                NativeCurrencyAmount::coins(0),
1✔
3487
                now,
1✔
3488
                TxProvingCapability::ProofCollection,
1✔
3489
                &TritonVmJobQueue::dummy(),
1✔
3490
            )
1✔
3491
            .await
1✔
3492
            .unwrap();
1✔
3493

1✔
3494
        let (hsd_1, _sa_1) = get_dummy_peer_connection_data_genesis(network, 1);
1✔
3495
        let mut peer_loop_handler = PeerLoopHandler::new(
1✔
3496
            to_main_tx,
1✔
3497
            state_lock.clone(),
1✔
3498
            get_dummy_socket_address(0),
1✔
3499
            hsd_1.clone(),
1✔
3500
            true,
1✔
3501
            1,
1✔
3502
        );
1✔
3503
        let mut peer_state = MutablePeerState::new(hsd_1.tip_header.height);
1✔
3504

1✔
3505
        assert!(
1✔
3506
            state_lock.lock_guard().await.mempool.is_empty(),
1✔
3507
            "Mempool must be empty at init"
1✔
3508
        );
1✔
3509
        state_lock
1✔
3510
            .lock_guard_mut()
1✔
3511
            .await
1✔
3512
            .mempool_insert(transaction_1.clone(), TransactionOrigin::Foreign)
1✔
3513
            .await;
1✔
3514
        assert!(
1✔
3515
            !state_lock.lock_guard().await.mempool.is_empty(),
1✔
3516
            "Mempool must be non-empty after insertion"
1✔
3517
        );
1✔
3518

1✔
3519
        // Run the peer loop and verify expected exchange -- namely that the
1✔
3520
        // tx notification is received and the the transaction is *not*
1✔
3521
        // requested.
1✔
3522
        let tx_notification: TransactionNotification = (&transaction_1).try_into().unwrap();
1✔
3523
        let mock = Mock::new(vec![
1✔
3524
            Action::Read(PeerMessage::TransactionNotification(tx_notification)),
1✔
3525
            Action::Read(PeerMessage::Bye),
1✔
3526
        ]);
1✔
3527
        peer_loop_handler
1✔
3528
            .run(mock, from_main_rx_clone, &mut peer_state)
1✔
3529
            .await
1✔
3530
            .unwrap();
1✔
3531

1✔
3532
        // nothing is allowed to be sent to `main_loop`
1✔
3533
        match to_main_rx1.try_recv() {
1✔
3534
            Err(TryRecvError::Empty) => (),
1✔
3535
            Err(TryRecvError::Disconnected) => panic!("to_main channel must still be open"),
1✔
3536
            Ok(_) => panic!("to_main channel must be empty"),
1✔
3537
        };
1✔
3538
        Ok(())
1✔
3539
    }
1✔
3540

3541
    mod block_proposals {
3542
        use super::*;
3543
        use crate::tests::shared::get_dummy_handshake_data_for_genesis;
3544

3545
        struct TestSetup {
3546
            peer_loop_handler: PeerLoopHandler,
3547
            to_main_rx: mpsc::Receiver<PeerTaskToMain>,
3548
            from_main_rx: broadcast::Receiver<MainToPeerTask>,
3549
            peer_state: MutablePeerState,
3550
            to_main_tx: mpsc::Sender<PeerTaskToMain>,
3551
            genesis_block: Block,
3552
            peer_broadcast_tx: broadcast::Sender<MainToPeerTask>,
3553
        }
3554

3555
        async fn genesis_setup(network: Network) -> TestSetup {
2✔
3556
            let (peer_broadcast_tx, from_main_rx, to_main_tx, to_main_rx, alice, _hsd) =
2✔
3557
                get_test_genesis_setup(network, 0, cli_args::Args::default())
2✔
3558
                    .await
2✔
3559
                    .unwrap();
2✔
3560
            let peer_hsd = get_dummy_handshake_data_for_genesis(network);
2✔
3561
            let peer_loop_handler = PeerLoopHandler::new(
2✔
3562
                to_main_tx.clone(),
2✔
3563
                alice.clone(),
2✔
3564
                get_dummy_socket_address(0),
2✔
3565
                peer_hsd.clone(),
2✔
3566
                true,
2✔
3567
                1,
2✔
3568
            );
2✔
3569
            let peer_state = MutablePeerState::new(peer_hsd.tip_header.height);
2✔
3570

2✔
3571
            // (peer_loop_handler, to_main_rx1)
2✔
3572
            TestSetup {
2✔
3573
                peer_broadcast_tx,
2✔
3574
                peer_loop_handler,
2✔
3575
                to_main_rx,
2✔
3576
                from_main_rx,
2✔
3577
                peer_state,
2✔
3578
                to_main_tx,
2✔
3579
                genesis_block: Block::genesis(network),
2✔
3580
            }
2✔
3581
        }
2✔
3582

3583
        #[traced_test]
×
3584
        #[tokio::test]
3585
        async fn accept_block_proposal_height_one() {
1✔
3586
            // Node knows genesis block, receives a block proposal for block 1
1✔
3587
            // and must accept this. Verify that main loop is informed of block
1✔
3588
            // proposal.
1✔
3589
            let TestSetup {
1✔
3590
                peer_broadcast_tx,
1✔
3591
                mut peer_loop_handler,
1✔
3592
                mut to_main_rx,
1✔
3593
                from_main_rx,
1✔
3594
                mut peer_state,
1✔
3595
                to_main_tx,
1✔
3596
                genesis_block,
1✔
3597
            } = genesis_setup(Network::Main).await;
1✔
3598
            let block1 = fake_valid_block_for_tests(
1✔
3599
                &peer_loop_handler.global_state_lock,
1✔
3600
                StdRng::seed_from_u64(5550001).random(),
1✔
3601
            )
1✔
3602
            .await;
1✔
3603

1✔
3604
            let mock = Mock::new(vec![
1✔
3605
                Action::Read(PeerMessage::BlockProposal(Box::new(block1))),
1✔
3606
                Action::Read(PeerMessage::Bye),
1✔
3607
            ]);
1✔
3608
            peer_loop_handler
1✔
3609
                .run(mock, from_main_rx, &mut peer_state)
1✔
3610
                .await
1✔
3611
                .unwrap();
1✔
3612

1✔
3613
            match to_main_rx.try_recv().unwrap() {
1✔
3614
                PeerTaskToMain::BlockProposal(block) => {
1✔
3615
                    assert_eq!(genesis_block.hash(), block.header().prev_block_digest);
1✔
3616
                }
1✔
3617
                _ => panic!("Expected main loop to be informed of block proposal"),
1✔
3618
            };
1✔
3619

1✔
3620
            drop(to_main_tx);
1✔
3621
            drop(peer_broadcast_tx);
1✔
3622
        }
1✔
3623

3624
        #[traced_test]
×
3625
        #[tokio::test]
3626
        async fn accept_block_proposal_notification_height_one() {
1✔
3627
            // Node knows genesis block, receives a block proposal notification
1✔
3628
            // for block 1 and must accept this by requesting the block
1✔
3629
            // proposal from peer.
1✔
3630
            let TestSetup {
1✔
3631
                peer_broadcast_tx,
1✔
3632
                mut peer_loop_handler,
1✔
3633
                to_main_rx: _,
1✔
3634
                from_main_rx,
1✔
3635
                mut peer_state,
1✔
3636
                to_main_tx,
1✔
3637
                ..
1✔
3638
            } = genesis_setup(Network::Main).await;
1✔
3639
            let block1 = fake_valid_block_for_tests(
1✔
3640
                &peer_loop_handler.global_state_lock,
1✔
3641
                StdRng::seed_from_u64(5550001).random(),
1✔
3642
            )
1✔
3643
            .await;
1✔
3644

1✔
3645
            let mock = Mock::new(vec![
1✔
3646
                Action::Read(PeerMessage::BlockProposalNotification((&block1).into())),
1✔
3647
                Action::Write(PeerMessage::BlockProposalRequest(
1✔
3648
                    BlockProposalRequest::new(block1.body().mast_hash()),
1✔
3649
                )),
1✔
3650
                Action::Read(PeerMessage::Bye),
1✔
3651
            ]);
1✔
3652
            peer_loop_handler
1✔
3653
                .run(mock, from_main_rx, &mut peer_state)
1✔
3654
                .await
1✔
3655
                .unwrap();
1✔
3656

1✔
3657
            drop(to_main_tx);
1✔
3658
            drop(peer_broadcast_tx);
1✔
3659
        }
1✔
3660
    }
3661

3662
    mod proof_qualities {
3663
        use strum::IntoEnumIterator;
3664

3665
        use super::*;
3666
        use crate::config_models::cli_args;
3667
        use crate::models::blockchain::transaction::Transaction;
3668
        use crate::models::peer::transfer_transaction::TransactionProofQuality;
3669
        use crate::tests::shared::mock_genesis_global_state;
3670

3671
        async fn tx_of_proof_quality(
2✔
3672
            network: Network,
2✔
3673
            quality: TransactionProofQuality,
2✔
3674
        ) -> Transaction {
2✔
3675
            let wallet_secret = WalletEntropy::devnet_wallet();
2✔
3676
            let alice_key = wallet_secret.nth_generation_spending_key_for_tests(0);
2✔
3677
            let alice =
2✔
3678
                mock_genesis_global_state(network, 1, wallet_secret, cli_args::Args::default())
2✔
3679
                    .await;
2✔
3680
            let alice = alice.lock_guard().await;
2✔
3681
            let genesis_block = alice.chain.light_state();
2✔
3682
            let in_seven_months = genesis_block.header().timestamp + Timestamp::months(7);
2✔
3683
            let prover_capability = match quality {
2✔
3684
                TransactionProofQuality::ProofCollection => TxProvingCapability::ProofCollection,
1✔
3685
                TransactionProofQuality::SingleProof => TxProvingCapability::SingleProof,
1✔
3686
            };
3687
            alice
2✔
3688
                .create_transaction_with_prover_capability(
2✔
3689
                    vec![].into(),
2✔
3690
                    alice_key.into(),
2✔
3691
                    UtxoNotificationMedium::OffChain,
2✔
3692
                    NativeCurrencyAmount::coins(1),
2✔
3693
                    in_seven_months,
2✔
3694
                    prover_capability,
2✔
3695
                    &TritonVmJobQueue::dummy(),
2✔
3696
                )
2✔
3697
                .await
2✔
3698
                .unwrap()
2✔
3699
                .0
2✔
3700
        }
2✔
3701

3702
        #[traced_test]
×
3703
        #[tokio::test]
3704
        async fn client_favors_higher_proof_quality() {
1✔
3705
            // In this scenario the peer is informed of a transaction that it
1✔
3706
            // already knows, and it's tested that it checks the proof quality
1✔
3707
            // field and verifies that it exceeds the proof in the mempool
1✔
3708
            // before requesting the transasction.
1✔
3709
            let network = Network::Main;
1✔
3710
            let proof_collection_tx =
1✔
3711
                tx_of_proof_quality(network, TransactionProofQuality::ProofCollection).await;
1✔
3712
            let single_proof_tx =
1✔
3713
                tx_of_proof_quality(network, TransactionProofQuality::SingleProof).await;
1✔
3714

1✔
3715
            for (own_tx_pq, new_tx_pq) in
4✔
3716
                TransactionProofQuality::iter().cartesian_product(TransactionProofQuality::iter())
1✔
3717
            {
1✔
3718
                use TransactionProofQuality::*;
1✔
3719

1✔
3720
                let (
1✔
3721
                    _peer_broadcast_tx,
4✔
3722
                    from_main_rx_clone,
4✔
3723
                    to_main_tx,
4✔
3724
                    mut to_main_rx1,
4✔
3725
                    mut alice,
4✔
3726
                    handshake_data,
4✔
3727
                ) = get_test_genesis_setup(network, 1, cli_args::Args::default())
4✔
3728
                    .await
4✔
3729
                    .unwrap();
4✔
3730

1✔
3731
                let (own_tx, new_tx) = match (own_tx_pq, new_tx_pq) {
4✔
3732
                    (ProofCollection, ProofCollection) => {
1✔
3733
                        (&proof_collection_tx, &proof_collection_tx)
1✔
3734
                    }
1✔
3735
                    (ProofCollection, SingleProof) => (&proof_collection_tx, &single_proof_tx),
1✔
3736
                    (SingleProof, ProofCollection) => (&single_proof_tx, &proof_collection_tx),
1✔
3737
                    (SingleProof, SingleProof) => (&single_proof_tx, &single_proof_tx),
1✔
3738
                };
1✔
3739

1✔
3740
                alice
4✔
3741
                    .lock_guard_mut()
4✔
3742
                    .await
4✔
3743
                    .mempool_insert(own_tx.to_owned(), TransactionOrigin::Foreign)
4✔
3744
                    .await;
4✔
3745

1✔
3746
                let tx_notification: TransactionNotification = new_tx.try_into().unwrap();
4✔
3747

4✔
3748
                let own_proof_is_supreme = own_tx_pq >= new_tx_pq;
4✔
3749
                let mock = if own_proof_is_supreme {
4✔
3750
                    Mock::new(vec![
3✔
3751
                        Action::Read(PeerMessage::TransactionNotification(tx_notification)),
3✔
3752
                        Action::Read(PeerMessage::Bye),
3✔
3753
                    ])
3✔
3754
                } else {
1✔
3755
                    Mock::new(vec![
1✔
3756
                        Action::Read(PeerMessage::TransactionNotification(tx_notification)),
1✔
3757
                        Action::Write(PeerMessage::TransactionRequest(tx_notification.txid)),
1✔
3758
                        Action::Read(PeerMessage::Transaction(Box::new(
1✔
3759
                            new_tx.try_into().unwrap(),
1✔
3760
                        ))),
1✔
3761
                        Action::Read(PeerMessage::Bye),
1✔
3762
                    ])
1✔
3763
                };
1✔
3764

1✔
3765
                let now = proof_collection_tx.kernel.timestamp;
4✔
3766
                let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
4✔
3767
                    to_main_tx,
4✔
3768
                    alice.clone(),
4✔
3769
                    get_dummy_socket_address(0),
4✔
3770
                    handshake_data.clone(),
4✔
3771
                    true,
4✔
3772
                    1,
4✔
3773
                    now,
4✔
3774
                );
4✔
3775
                let mut peer_state = MutablePeerState::new(handshake_data.tip_header.height);
4✔
3776

4✔
3777
                peer_loop_handler
4✔
3778
                    .run(mock, from_main_rx_clone, &mut peer_state)
4✔
3779
                    .await
4✔
3780
                    .unwrap();
4✔
3781

4✔
3782
                if own_proof_is_supreme {
4✔
3783
                    match to_main_rx1.try_recv() {
3✔
3784
                        Err(TryRecvError::Empty) => (),
3✔
3785
                        Err(TryRecvError::Disconnected) => {
1✔
3786
                            panic!("to_main channel must still be open")
1✔
3787
                        }
1✔
3788
                        Ok(_) => panic!("to_main channel must be empty"),
1✔
3789
                    }
1✔
3790
                } else {
1✔
3791
                    match to_main_rx1.try_recv() {
1✔
3792
                        Err(TryRecvError::Empty) => panic!("Transaction must be sent to main loop"),
1✔
3793
                        Err(TryRecvError::Disconnected) => {
1✔
3794
                            panic!("to_main channel must still be open")
1✔
3795
                        }
1✔
3796
                        Ok(PeerTaskToMain::Transaction(_)) => (),
1✔
3797
                        _ => panic!("Unexpected result from channel"),
1✔
3798
                    }
1✔
3799
                }
1✔
3800
            }
1✔
3801
        }
1✔
3802
    }
3803

3804
    mod sync_challenges {
3805
        use super::*;
3806
        use crate::tests::shared::fake_valid_sequence_of_blocks_for_tests_dyn;
3807

3808
        #[traced_test]
×
3809
        #[tokio::test]
3810
        async fn bad_sync_challenge_height_greater_than_tip() {
1✔
3811
            // Criterium: Challenge height may not exceed that of tip in the
1✔
3812
            // request.
1✔
3813

1✔
3814
            let network = Network::Main;
1✔
3815
            let (
1✔
3816
                _alice_main_to_peer_tx,
1✔
3817
                alice_main_to_peer_rx,
1✔
3818
                alice_peer_to_main_tx,
1✔
3819
                alice_peer_to_main_rx,
1✔
3820
                mut alice,
1✔
3821
                alice_hsd,
1✔
3822
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default())
1✔
3823
                .await
1✔
3824
                .unwrap();
1✔
3825
            let genesis_block: Block = Block::genesis(network);
1✔
3826
            let blocks: [Block; 11] = fake_valid_sequence_of_blocks_for_tests(
1✔
3827
                &genesis_block,
1✔
3828
                Timestamp::hours(1),
1✔
3829
                [0u8; 32],
1✔
3830
            )
1✔
3831
            .await;
1✔
3832
            for block in &blocks {
12✔
3833
                alice.set_new_tip(block.clone()).await.unwrap();
11✔
3834
            }
1✔
3835

1✔
3836
            let bh12 = blocks.last().unwrap().header().height;
1✔
3837
            let sync_challenge = SyncChallenge {
1✔
3838
                tip_digest: blocks[9].hash(),
1✔
3839
                challenges: [bh12; 10],
1✔
3840
            };
1✔
3841
            let alice_p2p_messages = Mock::new(vec![
1✔
3842
                Action::Read(PeerMessage::SyncChallenge(sync_challenge)),
1✔
3843
                Action::Read(PeerMessage::Bye),
1✔
3844
            ]);
1✔
3845

1✔
3846
            let peer_address = get_dummy_socket_address(0);
1✔
3847
            let mut alice_peer_loop_handler = PeerLoopHandler::new(
1✔
3848
                alice_peer_to_main_tx.clone(),
1✔
3849
                alice.clone(),
1✔
3850
                peer_address,
1✔
3851
                alice_hsd,
1✔
3852
                false,
1✔
3853
                1,
1✔
3854
            );
1✔
3855
            alice_peer_loop_handler
1✔
3856
                .run_wrapper(alice_p2p_messages, alice_main_to_peer_rx)
1✔
3857
                .await
1✔
3858
                .unwrap();
1✔
3859

1✔
3860
            drop(alice_peer_to_main_rx);
1✔
3861

1✔
3862
            let latest_sanction = alice
1✔
3863
                .lock_guard()
1✔
3864
                .await
1✔
3865
                .net
1✔
3866
                .get_peer_standing_from_database(peer_address.ip())
1✔
3867
                .await
1✔
3868
                .unwrap();
1✔
3869
            assert_eq!(
1✔
3870
                NegativePeerSanction::InvalidSyncChallenge,
1✔
3871
                latest_sanction
1✔
3872
                    .latest_punishment
1✔
3873
                    .expect("peer must be sanctioned")
1✔
3874
                    .0
1✔
3875
            );
1✔
3876
        }
1✔
3877

3878
        #[traced_test]
×
3879
        #[tokio::test]
3880
        async fn bad_sync_challenge_genesis_block_doesnt_crash_client() {
1✔
3881
            // Criterium: Challenge may not point to genesis block, or block 1, as
1✔
3882
            // tip.
1✔
3883

1✔
3884
            let network = Network::Main;
1✔
3885
            let genesis_block: Block = Block::genesis(network);
1✔
3886

1✔
3887
            let alice_cli = cli_args::Args::default();
1✔
3888
            let (
1✔
3889
                _alice_main_to_peer_tx,
1✔
3890
                alice_main_to_peer_rx,
1✔
3891
                alice_peer_to_main_tx,
1✔
3892
                alice_peer_to_main_rx,
1✔
3893
                alice,
1✔
3894
                alice_hsd,
1✔
3895
            ) = get_test_genesis_setup(network, 0, alice_cli).await.unwrap();
1✔
3896

1✔
3897
            let sync_challenge = SyncChallenge {
1✔
3898
                tip_digest: genesis_block.hash(),
1✔
3899
                challenges: [BlockHeight::genesis(); 10],
1✔
3900
            };
1✔
3901

1✔
3902
            let alice_p2p_messages = Mock::new(vec![
1✔
3903
                Action::Read(PeerMessage::SyncChallenge(sync_challenge)),
1✔
3904
                Action::Read(PeerMessage::Bye),
1✔
3905
            ]);
1✔
3906

1✔
3907
            let peer_address = get_dummy_socket_address(0);
1✔
3908
            let mut alice_peer_loop_handler = PeerLoopHandler::new(
1✔
3909
                alice_peer_to_main_tx.clone(),
1✔
3910
                alice.clone(),
1✔
3911
                peer_address,
1✔
3912
                alice_hsd,
1✔
3913
                false,
1✔
3914
                1,
1✔
3915
            );
1✔
3916
            alice_peer_loop_handler
1✔
3917
                .run_wrapper(alice_p2p_messages, alice_main_to_peer_rx)
1✔
3918
                .await
1✔
3919
                .unwrap();
1✔
3920

1✔
3921
            drop(alice_peer_to_main_rx);
1✔
3922

1✔
3923
            let latest_sanction = alice
1✔
3924
                .lock_guard()
1✔
3925
                .await
1✔
3926
                .net
1✔
3927
                .get_peer_standing_from_database(peer_address.ip())
1✔
3928
                .await
1✔
3929
                .unwrap();
1✔
3930
            assert_eq!(
1✔
3931
                NegativePeerSanction::InvalidSyncChallenge,
1✔
3932
                latest_sanction
1✔
3933
                    .latest_punishment
1✔
3934
                    .expect("peer must be sanctioned")
1✔
3935
                    .0
1✔
3936
            );
1✔
3937
        }
1✔
3938

3939
        #[traced_test]
×
3940
        #[tokio::test]
3941
        async fn sync_challenge_happy_path() -> Result<()> {
1✔
3942
            // Bob notifies Alice of a block whose parameters satisfy the sync mode
1✔
3943
            // criterion. Alice issues a challenge. Bob responds. Alice enters into
1✔
3944
            // sync mode.
1✔
3945

1✔
3946
            let mut rng = rand::rng();
1✔
3947
            let network = Network::Main;
1✔
3948
            let genesis_block: Block = Block::genesis(network);
1✔
3949

1✔
3950
            const ALICE_SYNC_MODE_THRESHOLD: usize = 10;
1✔
3951
            let alice_cli = cli_args::Args {
1✔
3952
                sync_mode_threshold: ALICE_SYNC_MODE_THRESHOLD,
1✔
3953
                ..Default::default()
1✔
3954
            };
1✔
3955
            let (
1✔
3956
                _alice_main_to_peer_tx,
1✔
3957
                alice_main_to_peer_rx,
1✔
3958
                alice_peer_to_main_tx,
1✔
3959
                mut alice_peer_to_main_rx,
1✔
3960
                mut alice,
1✔
3961
                alice_hsd,
1✔
3962
            ) = get_test_genesis_setup(network, 0, alice_cli).await?;
1✔
3963
            let _alice_socket_address = get_dummy_socket_address(0);
1✔
3964

1✔
3965
            let (
1✔
3966
                _bob_main_to_peer_tx,
1✔
3967
                _bob_main_to_peer_rx,
1✔
3968
                _bob_peer_to_main_tx,
1✔
3969
                _bob_peer_to_main_rx,
1✔
3970
                mut bob,
1✔
3971
                _bob_hsd,
1✔
3972
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
3973
            let bob_socket_address = get_dummy_socket_address(0);
1✔
3974

1✔
3975
            let now = genesis_block.header().timestamp + Timestamp::hours(1);
1✔
3976
            let block_1 = fake_valid_block_for_tests(&alice, rng.random()).await;
1✔
3977
            assert!(
1✔
3978
                block_1.is_valid(&genesis_block, now).await,
1✔
3979
                "Block must be valid for this test to make sense"
1✔
3980
            );
1✔
3981
            let alice_tip = &block_1;
1✔
3982
            alice.set_new_tip(block_1.clone()).await?;
1✔
3983
            bob.set_new_tip(block_1.clone()).await?;
1✔
3984

1✔
3985
            // produce enough blocks to ensure alice needs to go into sync mode
1✔
3986
            // with this block notification.
1✔
3987
            let blocks = fake_valid_sequence_of_blocks_for_tests_dyn(
1✔
3988
                &block_1,
1✔
3989
                TARGET_BLOCK_INTERVAL,
1✔
3990
                rng.random(),
1✔
3991
                rng.random_range(ALICE_SYNC_MODE_THRESHOLD + 1..20),
1✔
3992
            )
1✔
3993
            .await;
1✔
3994
            for block in &blocks {
16✔
3995
                bob.set_new_tip(block.clone()).await?;
15✔
3996
            }
1✔
3997
            let bob_tip = blocks.last().unwrap();
1✔
3998

1✔
3999
            let block_notification_from_bob = PeerBlockNotification {
1✔
4000
                hash: bob_tip.hash(),
1✔
4001
                height: bob_tip.header().height,
1✔
4002
                cumulative_proof_of_work: bob_tip.header().cumulative_proof_of_work,
1✔
4003
            };
1✔
4004

1✔
4005
            let alice_rng_seed = rng.random::<[u8; 32]>();
1✔
4006
            let mut alice_rng_clone = StdRng::from_seed(alice_rng_seed);
1✔
4007
            let sync_challenge_from_alice = SyncChallenge::generate(
1✔
4008
                &block_notification_from_bob,
1✔
4009
                alice_tip.header().height,
1✔
4010
                alice_rng_clone.random(),
1✔
4011
            );
1✔
4012

1✔
4013
            println!(
1✔
4014
                "sync challenge from alice:\n{:?}",
1✔
4015
                sync_challenge_from_alice
1✔
4016
            );
1✔
4017

1✔
4018
            let sync_challenge_response_from_bob = bob
1✔
4019
                .lock_guard()
1✔
4020
                .await
1✔
4021
                .response_to_sync_challenge(sync_challenge_from_alice)
1✔
4022
                .await
1✔
4023
                .expect("should be able to respond to sync challenge");
1✔
4024

1✔
4025
            let alice_p2p_messages = Mock::new(vec![
1✔
4026
                Action::Read(PeerMessage::BlockNotification(block_notification_from_bob)),
1✔
4027
                Action::Write(PeerMessage::SyncChallenge(sync_challenge_from_alice)),
1✔
4028
                Action::Read(PeerMessage::SyncChallengeResponse(Box::new(
1✔
4029
                    sync_challenge_response_from_bob,
1✔
4030
                ))),
1✔
4031
                Action::Read(PeerMessage::BlockNotification(block_notification_from_bob)),
1✔
4032
                // Ensure no 2nd sync challenge is sent, as timeout has not yet passed.
1✔
4033
                // The absence of a Write here checks that a 2nd challenge isn't sent
1✔
4034
                // when a successful was just received.
1✔
4035
                Action::Read(PeerMessage::Bye),
1✔
4036
            ]);
1✔
4037

1✔
4038
            let mut alice_peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
4039
                alice_peer_to_main_tx.clone(),
1✔
4040
                alice.clone(),
1✔
4041
                bob_socket_address,
1✔
4042
                alice_hsd,
1✔
4043
                false,
1✔
4044
                1,
1✔
4045
                bob_tip.header().timestamp,
1✔
4046
            );
1✔
4047
            alice_peer_loop_handler.set_rng(StdRng::from_seed(alice_rng_seed));
1✔
4048
            alice_peer_loop_handler
1✔
4049
                .run_wrapper(alice_p2p_messages, alice_main_to_peer_rx)
1✔
4050
                .await?;
1✔
4051

1✔
4052
            // AddPeerMaxBlockHeight message triggered *after* sync challenge
1✔
4053
            let mut expected_anchor_mmra = bob_tip.body().block_mmr_accumulator.clone();
1✔
4054
            expected_anchor_mmra.append(bob_tip.hash());
1✔
4055
            let expected_message_from_alice_peer_loop = PeerTaskToMain::AddPeerMaxBlockHeight {
1✔
4056
                peer_address: bob_socket_address,
1✔
4057
                claimed_height: bob_tip.header().height,
1✔
4058
                claimed_cumulative_pow: bob_tip.header().cumulative_proof_of_work,
1✔
4059
                claimed_block_mmra: expected_anchor_mmra,
1✔
4060
            };
1✔
4061
            let observed_message_from_alice_peer_loop = alice_peer_to_main_rx.recv().await.unwrap();
1✔
4062
            assert_eq!(
1✔
4063
                expected_message_from_alice_peer_loop,
1✔
4064
                observed_message_from_alice_peer_loop
1✔
4065
            );
1✔
4066

1✔
4067
            Ok(())
1✔
4068
        }
1✔
4069
    }
4070
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc