• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Neptune-Crypto / neptune-core / 14119571714

28 Mar 2025 12:45AM UTC coverage: 84.479% (+0.09%) from 84.386%
14119571714

push

github

Sword-Smith
fix(peer_loop): Don't hold read lock responding with transction

Prior to this commit, a read-lock was held while a message was sent to a
peer. That is totally disallowed since the connection can timeout in
which case this read lock will be held for up to two minutes.

Any and all locks must be released before sending anything to a peer!

Cf. #534.

58 of 61 new or added lines in 1 file covered. (95.08%)

4 existing lines in 3 files now uncovered.

51882 of 61414 relevant lines covered (84.48%)

175454.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.9
/src/peer_loop.rs
1
use std::cmp;
2
use std::marker::Unpin;
3
use std::net::SocketAddr;
4
use std::time::SystemTime;
5

6
use anyhow::bail;
7
use anyhow::Result;
8
use chrono::DateTime;
9
use chrono::Utc;
10
use futures::sink::Sink;
11
use futures::sink::SinkExt;
12
use futures::stream::TryStream;
13
use futures::stream::TryStreamExt;
14
use itertools::Itertools;
15
use rand::rngs::StdRng;
16
use rand::Rng;
17
use rand::SeedableRng;
18
use tasm_lib::triton_vm::prelude::Digest;
19
use tasm_lib::twenty_first::prelude::Mmr;
20
use tasm_lib::twenty_first::prelude::MmrMembershipProof;
21
use tasm_lib::twenty_first::util_types::mmr::mmr_accumulator::MmrAccumulator;
22
use tokio::select;
23
use tokio::sync::broadcast;
24
use tokio::sync::mpsc;
25
use tracing::debug;
26
use tracing::error;
27
use tracing::info;
28
use tracing::warn;
29

30
use crate::connect_to_peers::close_peer_connected_callback;
31
use crate::macros::fn_name;
32
use crate::macros::log_slow_scope;
33
use crate::main_loop::MAX_NUM_DIGESTS_IN_BATCH_REQUEST;
34
use crate::models::blockchain::block::block_height::BlockHeight;
35
use crate::models::blockchain::block::mutator_set_update::MutatorSetUpdate;
36
use crate::models::blockchain::block::Block;
37
use crate::models::blockchain::transaction::transaction_kernel::TransactionConfirmabilityError;
38
use crate::models::blockchain::transaction::Transaction;
39
use crate::models::channel::MainToPeerTask;
40
use crate::models::channel::PeerTaskToMain;
41
use crate::models::channel::PeerTaskToMainTransaction;
42
use crate::models::peer::handshake_data::HandshakeData;
43
use crate::models::peer::peer_info::PeerConnectionInfo;
44
use crate::models::peer::peer_info::PeerInfo;
45
use crate::models::peer::transfer_block::TransferBlock;
46
use crate::models::peer::BlockProposalRequest;
47
use crate::models::peer::BlockRequestBatch;
48
use crate::models::peer::IssuedSyncChallenge;
49
use crate::models::peer::MutablePeerState;
50
use crate::models::peer::NegativePeerSanction;
51
use crate::models::peer::PeerMessage;
52
use crate::models::peer::PeerSanction;
53
use crate::models::peer::PeerStanding;
54
use crate::models::peer::PositivePeerSanction;
55
use crate::models::peer::SyncChallenge;
56
use crate::models::proof_abstractions::mast_hash::MastHash;
57
use crate::models::proof_abstractions::timestamp::Timestamp;
58
use crate::models::state::block_proposal::BlockProposalRejectError;
59
use crate::models::state::mempool::MEMPOOL_IGNORE_TRANSACTIONS_THIS_MANY_SECS_AHEAD;
60
use crate::models::state::mempool::MEMPOOL_TX_THRESHOLD_AGE_IN_SECS;
61
use crate::models::state::GlobalState;
62
use crate::models::state::GlobalStateLock;
63
use crate::util_types::mutator_set::removal_record::RemovalRecordValidityError;
64

65
const STANDARD_BLOCK_BATCH_SIZE: usize = 250;
66
const MAX_PEER_LIST_LENGTH: usize = 10;
67
const MINIMUM_BLOCK_BATCH_SIZE: usize = 2;
68

69
const KEEP_CONNECTION_ALIVE: bool = false;
70
const DISCONNECT_CONNECTION: bool = true;
71

72
pub type PeerStandingNumber = i32;
73

74
/// Handles messages from peers via TCP
75
///
76
/// also handles messages from main task over the main-to-peer-tasks broadcast
77
/// channel.
78
#[derive(Debug, Clone)]
79
pub struct PeerLoopHandler {
80
    to_main_tx: mpsc::Sender<PeerTaskToMain>,
81
    global_state_lock: GlobalStateLock,
82
    peer_address: SocketAddr,
83
    peer_handshake_data: HandshakeData,
84
    inbound_connection: bool,
85
    distance: u8,
86
    rng: StdRng,
87
    #[cfg(test)]
88
    mock_now: Option<Timestamp>,
89
}
90

91
impl PeerLoopHandler {
92
    pub(crate) fn new(
22✔
93
        to_main_tx: mpsc::Sender<PeerTaskToMain>,
22✔
94
        global_state_lock: GlobalStateLock,
22✔
95
        peer_address: SocketAddr,
22✔
96
        peer_handshake_data: HandshakeData,
22✔
97
        inbound_connection: bool,
22✔
98
        distance: u8,
22✔
99
    ) -> Self {
22✔
100
        Self {
22✔
101
            to_main_tx,
22✔
102
            global_state_lock,
22✔
103
            peer_address,
22✔
104
            peer_handshake_data,
22✔
105
            inbound_connection,
22✔
106
            distance,
22✔
107
            rng: StdRng::from_rng(&mut rand::rng()),
22✔
108
            #[cfg(test)]
22✔
109
            mock_now: None,
22✔
110
        }
22✔
111
    }
22✔
112

113
    /// Allows for mocked timestamps such that time dependencies may be tested.
114
    #[cfg(test)]
115
    pub(crate) fn with_mocked_time(
20✔
116
        to_main_tx: mpsc::Sender<PeerTaskToMain>,
20✔
117
        global_state_lock: GlobalStateLock,
20✔
118
        peer_address: SocketAddr,
20✔
119
        peer_handshake_data: HandshakeData,
20✔
120
        inbound_connection: bool,
20✔
121
        distance: u8,
20✔
122
        mocked_time: Timestamp,
20✔
123
    ) -> Self {
20✔
124
        Self {
20✔
125
            to_main_tx,
20✔
126
            global_state_lock,
20✔
127
            peer_address,
20✔
128
            peer_handshake_data,
20✔
129
            inbound_connection,
20✔
130
            distance,
20✔
131
            mock_now: Some(mocked_time),
20✔
132
            rng: StdRng::from_rng(&mut rand::rng()),
20✔
133
        }
20✔
134
    }
20✔
135

136
    /// Overwrite the random number generator object with a specific one.
137
    ///
138
    /// Useful for derandomizing tests.
139
    #[cfg(test)]
140
    fn set_rng(&mut self, rng: StdRng) {
1✔
141
        self.rng = rng;
1✔
142
    }
1✔
143

144
    fn now(&self) -> Timestamp {
27✔
145
        #[cfg(not(test))]
27✔
146
        {
27✔
147
            Timestamp::now()
27✔
148
        }
27✔
149
        #[cfg(test)]
27✔
150
        {
27✔
151
            self.mock_now.unwrap_or(Timestamp::now())
27✔
152
        }
27✔
153
    }
27✔
154

155
    /// Punish a peer for bad behavior.
156
    ///
157
    /// Return `Err` if the peer in question is (now) banned.
158
    ///
159
    /// # Locking:
160
    ///   * acquires `global_state_lock` for write
161
    async fn punish(&mut self, reason: NegativePeerSanction) -> Result<()> {
6✔
162
        let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
6✔
163
        warn!("Punishing peer {} for {:?}", self.peer_address.ip(), reason);
6✔
164
        debug!(
6✔
165
            "Peer standing before punishment is {}",
×
166
            global_state_mut
×
167
                .net
×
168
                .peer_map
×
169
                .get(&self.peer_address)
×
170
                .unwrap()
×
171
                .standing
172
        );
173

174
        let Some(peer_info) = global_state_mut.net.peer_map.get_mut(&self.peer_address) else {
6✔
175
            bail!("Could not read peer map.");
×
176
        };
177
        let sanction_result = peer_info.standing.sanction(PeerSanction::Negative(reason));
6✔
178
        if let Err(err) = sanction_result {
6✔
179
            warn!("Banning peer: {err}");
1✔
180
        }
5✔
181

182
        sanction_result.map_err(|err| anyhow::anyhow!("Banning peer: {err}"))
6✔
183
    }
6✔
184

185
    /// Reward a peer for good behavior.
186
    ///
187
    /// Return `Err` if the peer in question is banned.
188
    ///
189
    /// # Locking:
190
    ///   * acquires `global_state_lock` for write
191
    async fn reward(&mut self, reason: PositivePeerSanction) -> Result<()> {
7✔
192
        let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
7✔
193
        info!("Rewarding peer {} for {:?}", self.peer_address.ip(), reason);
7✔
194
        let Some(peer_info) = global_state_mut.net.peer_map.get_mut(&self.peer_address) else {
7✔
195
            error!("Could not read peer map.");
1✔
196
            return Ok(());
1✔
197
        };
198
        let sanction_result = peer_info.standing.sanction(PeerSanction::Positive(reason));
6✔
199
        if sanction_result.is_err() {
6✔
200
            error!("Cannot reward banned peer");
×
201
        }
6✔
202

203
        sanction_result.map_err(|err| anyhow::anyhow!("Cannot reward banned peer: {err}"))
6✔
204
    }
7✔
205

206
    /// Construct a batch response, with blocks and their MMR membership proofs
207
    /// relative to a specified anchor.
208
    ///
209
    /// Returns `None` if the anchor has a lower leaf count than the blocks, or
210
    /// a block height of the response exceeds own tip height.
211
    async fn batch_response(
16✔
212
        state: &GlobalState,
16✔
213
        blocks: Vec<Block>,
16✔
214
        anchor: &MmrAccumulator,
16✔
215
    ) -> Option<Vec<(TransferBlock, MmrMembershipProof)>> {
16✔
216
        let own_tip_height = state.chain.light_state().header().height;
16✔
217
        let block_heights_match_anchor = blocks
16✔
218
            .iter()
16✔
219
            .all(|bl| bl.header().height < anchor.num_leafs().into());
46✔
220
        let block_heights_known = blocks.iter().all(|bl| bl.header().height <= own_tip_height);
46✔
221
        if !block_heights_match_anchor || !block_heights_known {
16✔
222
            let max_block_height = match blocks.iter().map(|bl| bl.header().height).max() {
×
223
                Some(height) => height.to_string(),
×
224
                None => "None".to_owned(),
×
225
            };
226

227
            debug!("max_block_height: {max_block_height}");
×
228
            debug!("own_tip_height: {own_tip_height}");
×
229
            debug!("anchor.num_leafs(): {}", anchor.num_leafs());
×
230
            debug!("block_heights_match_anchor: {block_heights_match_anchor}");
×
231
            debug!("block_heights_known: {block_heights_known}");
×
232
            return None;
×
233
        }
16✔
234

16✔
235
        let mut ret = vec![];
16✔
236
        for block in blocks {
62✔
237
            let mmr_mp = state
46✔
238
                .chain
46✔
239
                .archival_state()
46✔
240
                .archival_block_mmr
46✔
241
                .ammr()
46✔
242
                .prove_membership_relative_to_smaller_mmr(
46✔
243
                    block.header().height.into(),
46✔
244
                    anchor.num_leafs(),
46✔
245
                )
46✔
246
                .await;
46✔
247
            let block: TransferBlock = block.try_into().unwrap();
46✔
248
            ret.push((block, mmr_mp));
46✔
249
        }
250

251
        Some(ret)
16✔
252
    }
16✔
253

254
    /// Handle validation and send all blocks to the main task if they're all
255
    /// valid. Use with a list of blocks or a single block. When the
256
    /// `received_blocks` is a list, the parent of the `i+1`th block in the
257
    /// list is the `i`th block. The parent of element zero in this list is
258
    /// `parent_of_first_block`.
259
    ///
260
    /// # Return Value
261
    ///  - `Err` when the connection should be closed;
262
    ///  - `Ok(None)` if some block is invalid
263
    ///  - `Ok(None)` if the last block has insufficient cumulative PoW and we
264
    ///    are not syncing;
265
    ///  - `Ok(None)` if the last block has insufficient height and we are
266
    ///    syncing;
267
    ///  - `Ok(Some(block_height))` otherwise, referring to the block with the
268
    ///    highest height in the batch.
269
    ///
270
    /// A return value of Ok(Some(_)) means that the message was passed on to
271
    /// main loop.
272
    ///
273
    /// # Locking
274
    ///   * Acquires `global_state_lock` for write via `self.punish(..)` and
275
    ///     `self.reward(..)`.
276
    ///
277
    /// # Panics
278
    ///
279
    ///  - Panics if called with the empty list.
280
    async fn handle_blocks(
8✔
281
        &mut self,
8✔
282
        received_blocks: Vec<Block>,
8✔
283
        parent_of_first_block: Block,
8✔
284
    ) -> Result<Option<BlockHeight>> {
8✔
285
        debug!(
8✔
286
            "attempting to validate {} {}",
×
287
            received_blocks.len(),
×
288
            if received_blocks.len() == 1 {
×
289
                "block"
×
290
            } else {
291
                "blocks"
×
292
            }
293
        );
294
        let now = self.now();
8✔
295
        debug!("validating with respect to current timestamp {now}");
8✔
296
        let mut previous_block = &parent_of_first_block;
8✔
297
        for new_block in &received_blocks {
24✔
298
            let new_block_has_proof_of_work = new_block.has_proof_of_work(previous_block.header());
17✔
299
            debug!("new block has proof of work? {new_block_has_proof_of_work}");
17✔
300
            let new_block_is_valid = new_block.is_valid(previous_block, now).await;
17✔
301
            debug!("new block is valid? {new_block_is_valid}");
17✔
302
            if !new_block_has_proof_of_work {
17✔
303
                warn!(
1✔
304
                    "Received invalid proof-of-work for block of height {} from peer with IP {}",
×
305
                    new_block.kernel.header.height, self.peer_address
×
306
                );
307
                warn!("Difficulty is {}.", previous_block.kernel.header.difficulty);
1✔
308
                warn!(
1✔
309
                    "Proof of work should be {} (or more) but was [{}].",
×
310
                    previous_block.kernel.header.difficulty.target(),
×
311
                    new_block.hash().values().iter().join(", ")
×
312
                );
313
                self.punish(NegativePeerSanction::InvalidBlock((
1✔
314
                    new_block.kernel.header.height,
1✔
315
                    new_block.hash(),
1✔
316
                )))
1✔
317
                .await?;
1✔
318
                warn!("Failed to validate block due to insufficient PoW");
1✔
319
                return Ok(None);
1✔
320
            } else if !new_block_is_valid {
16✔
321
                warn!(
×
322
                    "Received invalid block of height {} from peer with IP {}",
×
323
                    new_block.kernel.header.height, self.peer_address
×
324
                );
325
                self.punish(NegativePeerSanction::InvalidBlock((
×
326
                    new_block.kernel.header.height,
×
327
                    new_block.hash(),
×
328
                )))
×
329
                .await?;
×
330
                warn!("Failed to validate block: invalid block");
×
331
                return Ok(None);
×
332
            }
16✔
333
            info!(
16✔
334
                "Block with height {} is valid. mined: {}",
×
335
                new_block.kernel.header.height,
×
336
                new_block.kernel.header.timestamp.standard_format()
×
337
            );
338

339
            previous_block = new_block;
16✔
340
        }
341

342
        // evaluate the fork choice rule
343
        debug!("Checking last block's canonicity ...");
7✔
344
        let last_block = received_blocks.last().unwrap();
7✔
345
        let is_canonical = self
7✔
346
            .global_state_lock
7✔
347
            .lock_guard()
7✔
348
            .await
7✔
349
            .incoming_block_is_more_canonical(last_block);
7✔
350
        let last_block_height = last_block.header().height;
7✔
351
        let sync_mode_active_and_have_new_champion = self
7✔
352
            .global_state_lock
7✔
353
            .lock_guard()
7✔
354
            .await
7✔
355
            .net
356
            .sync_anchor
357
            .as_ref()
7✔
358
            .is_some_and(|x| {
7✔
359
                x.champion
×
360
                    .is_none_or(|(height, _)| height < last_block_height)
×
361
            });
7✔
362
        if !is_canonical && !sync_mode_active_and_have_new_champion {
7✔
363
            warn!(
1✔
364
                "Received {} blocks from peer but incoming blocks are less \
×
365
            canonical than current tip, or current sync-champion.",
×
366
                received_blocks.len()
×
367
            );
368
            return Ok(None);
1✔
369
        }
6✔
370

6✔
371
        // Send the new blocks to the main task which handles the state update
6✔
372
        // and storage to the database.
6✔
373
        let number_of_received_blocks = received_blocks.len();
6✔
374
        self.to_main_tx
6✔
375
            .send(PeerTaskToMain::NewBlocks(received_blocks))
6✔
376
            .await?;
6✔
377
        info!(
6✔
378
            "Updated block info by block from peer. block height {}",
×
379
            last_block_height
380
        );
381

382
        // Valuable, new, hard-to-produce information. Reward peer.
383
        self.reward(PositivePeerSanction::ValidBlocks(number_of_received_blocks))
6✔
384
            .await?;
6✔
385

386
        Ok(Some(last_block_height))
6✔
387
    }
8✔
388

389
    /// Take a single block received from a peer and (attempt to) find a path
390
    /// between the received block and a common ancestor stored in the blocks
391
    /// database.
392
    ///
393
    /// This function attempts to find the parent of the received block, either
394
    /// by searching the database or by requesting it from a peer.
395
    ///  - If the parent is not stored, it is requested from the peer and the
396
    ///    received block is pushed to the fork reconciliation list for later
397
    ///    handling by this function. The fork reconciliation list starts out
398
    ///    empty, but grows as more parents are requested and transmitted.
399
    ///  - If the parent is found in the database, a) block handling continues:
400
    ///    the entire list of fork reconciliation blocks are passed down the
401
    ///    pipeline, potentially leading to a state update; and b) the fork
402
    ///    reconciliation list is cleared.
403
    ///
404
    /// Locking:
405
    ///   * Acquires `global_state_lock` for write via `self.punish(..)` and
406
    ///     `self.reward(..)`.
407
    async fn try_ensure_path<S>(
20✔
408
        &mut self,
20✔
409
        received_block: Box<Block>,
20✔
410
        peer: &mut S,
20✔
411
        peer_state: &mut MutablePeerState,
20✔
412
    ) -> Result<()>
20✔
413
    where
20✔
414
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
20✔
415
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
20✔
416
        <S as TryStream>::Error: std::error::Error,
20✔
417
    {
20✔
418
        // Does the received block match the fork reconciliation list?
419
        let received_block_matches_fork_reconciliation_list = if let Some(successor) =
20✔
420
            peer_state.fork_reconciliation_blocks.last()
20✔
421
        {
422
            let valid = successor
10✔
423
                .is_valid(received_block.as_ref(), self.now())
10✔
424
                .await;
10✔
425
            if !valid {
10✔
426
                warn!(
×
427
                        "Fork reconciliation failed after receiving {} blocks: successor of received block is invalid",
×
428
                        peer_state.fork_reconciliation_blocks.len() + 1
×
429
                    );
430
            }
10✔
431
            valid
10✔
432
        } else {
433
            true
10✔
434
        };
435

436
        // Are we running out of RAM?
437
        let too_many_blocks = peer_state.fork_reconciliation_blocks.len() + 1
20✔
438
            >= self.global_state_lock.cli().sync_mode_threshold;
20✔
439
        if too_many_blocks {
20✔
440
            warn!(
1✔
441
                "Fork reconciliation failed after receiving {} blocks: block count exceeds sync mode threshold",
×
442
                peer_state.fork_reconciliation_blocks.len() + 1
×
443
            );
444
        }
19✔
445

446
        // Block mismatch or too many blocks: abort!
447
        if !received_block_matches_fork_reconciliation_list || too_many_blocks {
20✔
448
            self.punish(NegativePeerSanction::ForkResolutionError((
1✔
449
                received_block.header().height,
1✔
450
                peer_state.fork_reconciliation_blocks.len() as u16,
1✔
451
                received_block.hash(),
1✔
452
            )))
1✔
453
            .await?;
1✔
454
            peer_state.fork_reconciliation_blocks = vec![];
1✔
455
            return Ok(());
1✔
456
        }
19✔
457

19✔
458
        // otherwise, append
19✔
459
        peer_state.fork_reconciliation_blocks.push(*received_block);
19✔
460

19✔
461
        // Try fetch parent
19✔
462
        let received_block_header = *peer_state
19✔
463
            .fork_reconciliation_blocks
19✔
464
            .last()
19✔
465
            .unwrap()
19✔
466
            .header();
19✔
467

19✔
468
        let parent_digest = received_block_header.prev_block_digest;
19✔
469
        let parent_height = received_block_header.height.previous()
19✔
470
            .expect("transferred block must have previous height because genesis block cannot be transferred");
19✔
471
        debug!("Try ensure path: fetching parent block");
19✔
472
        let parent_block = self
19✔
473
            .global_state_lock
19✔
474
            .lock_guard()
19✔
475
            .await
19✔
476
            .chain
477
            .archival_state()
19✔
478
            .get_block(parent_digest)
19✔
479
            .await?;
19✔
480
        debug!(
19✔
481
            "Completed parent block fetching from DB: {}",
×
482
            if parent_block.is_some() {
×
483
                "found".to_string()
×
484
            } else {
485
                "not found".to_string()
×
486
            }
487
        );
488

489
        // If parent is not known (but not genesis) request it.
490
        let Some(parent_block) = parent_block else {
19✔
491
            if parent_height.is_genesis() {
11✔
492
                peer_state.fork_reconciliation_blocks.clear();
1✔
493
                self.punish(NegativePeerSanction::DifferentGenesis).await?;
1✔
494
                return Ok(());
×
495
            }
10✔
496
            info!(
10✔
497
                "Parent not known: Requesting previous block with height {} from peer",
×
498
                parent_height
499
            );
500

501
            peer.send(PeerMessage::BlockRequestByHash(parent_digest))
10✔
502
                .await?;
10✔
503

504
            return Ok(());
10✔
505
        };
506

507
        // We want to treat the received fork reconciliation blocks (plus the
508
        // received block) in reverse order, from oldest to newest, because
509
        // they were requested from high to low block height.
510
        let mut new_blocks = peer_state.fork_reconciliation_blocks.clone();
8✔
511
        new_blocks.reverse();
8✔
512

8✔
513
        // Reset the fork resolution state since we got all the way back to a
8✔
514
        // block that we have.
8✔
515
        let fork_reconciliation_event = !peer_state.fork_reconciliation_blocks.is_empty();
8✔
516
        peer_state.fork_reconciliation_blocks.clear();
8✔
517

518
        if let Some(new_block_height) = self.handle_blocks(new_blocks, parent_block).await? {
8✔
519
            // If `BlockNotification` was received during a block reconciliation
520
            // event, then the peer might have one (or more (unlikely)) blocks
521
            // that we do not have. We should thus request those blocks.
522
            if fork_reconciliation_event
6✔
523
                && peer_state.highest_shared_block_height > new_block_height
6✔
524
            {
525
                peer.send(PeerMessage::BlockRequestByHeight(
1✔
526
                    peer_state.highest_shared_block_height,
1✔
527
                ))
1✔
528
                .await?;
1✔
529
            }
5✔
530
        }
2✔
531

532
        Ok(())
8✔
533
    }
20✔
534

535
    /// Handle peer messages and returns Ok(true) if connection should be closed.
536
    /// Connection should also be closed if an error is returned.
537
    /// Otherwise, returns OK(false).
538
    ///
539
    /// Locking:
540
    ///   * Acquires `global_state_lock` for read.
541
    ///   * Acquires `global_state_lock` for write via `self.punish(..)` and
542
    ///     `self.reward(..)`.
543
    async fn handle_peer_message<S>(
93✔
544
        &mut self,
93✔
545
        msg: PeerMessage,
93✔
546
        peer: &mut S,
93✔
547
        peer_state_info: &mut MutablePeerState,
93✔
548
    ) -> Result<bool>
93✔
549
    where
93✔
550
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
93✔
551
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
93✔
552
        <S as TryStream>::Error: std::error::Error,
93✔
553
    {
93✔
554
        debug!(
93✔
555
            "Received {} from peer {}",
×
556
            msg.get_type(),
×
557
            self.peer_address
558
        );
559
        match msg {
93✔
560
            PeerMessage::Bye => {
561
                // Note that the current peer is not removed from the global_state.peer_map here
562
                // but that this is done by the caller.
563
                info!("Got bye. Closing connection to peer");
40✔
564
                Ok(DISCONNECT_CONNECTION)
40✔
565
            }
566
            PeerMessage::PeerListRequest => {
567
                let peer_info = {
2✔
568
                    log_slow_scope!(fn_name!() + "::PeerMessage::PeerListRequest");
2✔
569

570
                    // We are interested in the address on which peers accept ingoing connections,
571
                    // not in the address in which they are connected to us. We are only interested in
572
                    // peers that accept incoming connections.
573
                    let mut peer_info: Vec<(SocketAddr, u128)> = self
2✔
574
                        .global_state_lock
2✔
575
                        .lock_guard()
2✔
576
                        .await
2✔
577
                        .net
578
                        .peer_map
579
                        .values()
2✔
580
                        .filter(|peer_info| peer_info.listen_address().is_some())
5✔
581
                        .take(MAX_PEER_LIST_LENGTH) // limit length of response
2✔
582
                        .map(|peer_info| {
5✔
583
                            (
5✔
584
                                // unwrap is safe bc of above `filter`
5✔
585
                                peer_info.listen_address().unwrap(),
5✔
586
                                peer_info.instance_id(),
5✔
587
                            )
5✔
588
                        })
5✔
589
                        .collect();
2✔
590

2✔
591
                    // We sort the returned list, so this function is easier to test
2✔
592
                    peer_info.sort_by_cached_key(|x| x.0);
5✔
593
                    peer_info
2✔
594
                };
2✔
595

2✔
596
                debug!("Responding with: {:?}", peer_info);
2✔
597
                peer.send(PeerMessage::PeerListResponse(peer_info)).await?;
2✔
598
                Ok(KEEP_CONNECTION_ALIVE)
2✔
599
            }
600
            PeerMessage::PeerListResponse(peers) => {
×
601
                log_slow_scope!(fn_name!() + "::PeerMessage::PeerListResponse");
×
602

×
603
                if peers.len() > MAX_PEER_LIST_LENGTH {
×
604
                    self.punish(NegativePeerSanction::FloodPeerListResponse)
×
605
                        .await?;
×
606
                }
×
607
                self.to_main_tx
×
608
                    .send(PeerTaskToMain::PeerDiscoveryAnswer((
×
609
                        peers,
×
610
                        self.peer_address,
×
611
                        // The distance to the revealed peers is 1 + this peer's distance
×
612
                        self.distance + 1,
×
613
                    )))
×
614
                    .await?;
×
615
                Ok(KEEP_CONNECTION_ALIVE)
×
616
            }
617
            PeerMessage::BlockNotificationRequest => {
618
                debug!("Got BlockNotificationRequest");
×
619

620
                peer.send(PeerMessage::BlockNotification(
×
621
                    self.global_state_lock
×
622
                        .lock_guard()
×
623
                        .await
×
624
                        .chain
625
                        .light_state()
×
626
                        .into(),
×
627
                ))
×
628
                .await?;
×
629

630
                Ok(KEEP_CONNECTION_ALIVE)
×
631
            }
632
            PeerMessage::BlockNotification(block_notification) => {
4✔
633
                const SYNC_CHALLENGE_COOLDOWN: Timestamp = Timestamp::minutes(10);
634

635
                let (tip_header, sync_anchor_is_set) = {
4✔
636
                    let state = self.global_state_lock.lock_guard().await;
4✔
637
                    (
4✔
638
                        *state.chain.light_state().header(),
4✔
639
                        state.net.sync_anchor.is_some(),
4✔
640
                    )
4✔
641
                };
4✔
642
                debug!(
4✔
643
                    "Got BlockNotification of height {}. Own height is {}",
×
644
                    block_notification.height, tip_header.height
645
                );
646

647
                let sync_mode_threshold = self.global_state_lock.cli().sync_mode_threshold;
4✔
648
                let now = self.now();
4✔
649
                let time_since_latest_successful_challenge = peer_state_info
4✔
650
                    .successful_sync_challenge_response_time
4✔
651
                    .map(|then| now - then);
4✔
652
                let cooldown_expired = time_since_latest_successful_challenge
4✔
653
                    .is_none_or(|time_passed| time_passed > SYNC_CHALLENGE_COOLDOWN);
4✔
654
                let exceeds_sync_mode_threshold = GlobalState::sync_mode_threshold_stateless(
4✔
655
                    &tip_header,
4✔
656
                    block_notification.height,
4✔
657
                    block_notification.cumulative_proof_of_work,
4✔
658
                    sync_mode_threshold,
4✔
659
                );
4✔
660
                if cooldown_expired && exceeds_sync_mode_threshold {
4✔
661
                    debug!("sync mode criterion satisfied.");
1✔
662

663
                    if peer_state_info.sync_challenge.is_some() {
1✔
664
                        warn!("Cannot launch new sync challenge because one is already on-going.");
×
665
                        return Ok(KEEP_CONNECTION_ALIVE);
×
666
                    }
1✔
667

1✔
668
                    info!(
1✔
669
                        "Peer indicates block which satisfies sync mode criterion, issuing challenge."
×
670
                    );
671
                    let challenge = SyncChallenge::generate(
1✔
672
                        &block_notification,
1✔
673
                        tip_header.height,
1✔
674
                        self.rng.random(),
1✔
675
                    );
1✔
676
                    peer_state_info.sync_challenge = Some(IssuedSyncChallenge::new(
1✔
677
                        challenge,
1✔
678
                        block_notification.cumulative_proof_of_work,
1✔
679
                        self.now(),
1✔
680
                    ));
1✔
681

1✔
682
                    debug!("sending challenge ...");
1✔
683
                    peer.send(PeerMessage::SyncChallenge(challenge)).await?;
1✔
684

685
                    return Ok(KEEP_CONNECTION_ALIVE);
1✔
686
                }
3✔
687

3✔
688
                peer_state_info.highest_shared_block_height = block_notification.height;
3✔
689
                let block_is_new = tip_header.cumulative_proof_of_work
3✔
690
                    < block_notification.cumulative_proof_of_work;
3✔
691

3✔
692
                debug!("block_is_new: {}", block_is_new);
3✔
693

694
                if block_is_new
3✔
695
                    && peer_state_info.fork_reconciliation_blocks.is_empty()
3✔
696
                    && !sync_anchor_is_set
2✔
697
                    && !exceeds_sync_mode_threshold
2✔
698
                {
699
                    debug!(
1✔
700
                        "sending BlockRequestByHeight to peer for block with height {}",
×
701
                        block_notification.height
702
                    );
703
                    peer.send(PeerMessage::BlockRequestByHeight(block_notification.height))
1✔
704
                        .await?;
1✔
705
                } else {
706
                    debug!(
2✔
707
                        "ignoring peer block. height {}. new: {}, reconciling_fork: {}",
×
708
                        block_notification.height,
×
709
                        block_is_new,
×
710
                        !peer_state_info.fork_reconciliation_blocks.is_empty()
×
711
                    );
712
                }
713

714
                Ok(KEEP_CONNECTION_ALIVE)
3✔
715
            }
716
            PeerMessage::SyncChallenge(sync_challenge) => {
2✔
717
                let response = {
×
718
                    log_slow_scope!(fn_name!() + "::PeerMessage::SyncChallenge");
2✔
719

2✔
720
                    info!("Got sync challenge from {}", self.peer_address.ip());
2✔
721

722
                    let response = self
2✔
723
                        .global_state_lock
2✔
724
                        .lock_guard()
2✔
725
                        .await
2✔
726
                        .response_to_sync_challenge(sync_challenge)
2✔
727
                        .await;
2✔
728

729
                    match response {
2✔
730
                        Ok(resp) => resp,
×
731
                        Err(e) => {
2✔
732
                            warn!("could not generate sync challenge response:\n{e}");
2✔
733
                            self.punish(NegativePeerSanction::InvalidSyncChallenge)
2✔
734
                                .await?;
2✔
735
                            return Ok(KEEP_CONNECTION_ALIVE);
2✔
736
                        }
737
                    }
738
                };
739

740
                info!(
×
741
                    "Responding to sync challenge from {}",
×
742
                    self.peer_address.ip()
×
743
                );
744
                peer.send(PeerMessage::SyncChallengeResponse(Box::new(response)))
×
745
                    .await?;
×
746

747
                Ok(KEEP_CONNECTION_ALIVE)
×
748
            }
749
            PeerMessage::SyncChallengeResponse(challenge_response) => {
1✔
750
                const SYNC_RESPONSE_TIMEOUT: Timestamp = Timestamp::seconds(45);
751

752
                log_slow_scope!(fn_name!() + "::PeerMessage::SyncChallengeResponse");
1✔
753
                info!(
1✔
754
                    "Got sync challenge response from {}",
×
755
                    self.peer_address.ip()
×
756
                );
757

758
                // The purpose of the sync challenge and sync challenge response
759
                // is to avoid going into sync mode based on a malicious target
760
                // fork. Instead of verifying that the claimed proof-of-work
761
                // number is correct (which would require sending and verifying,
762
                // at least, all blocks between luca (whatever that is) and the
763
                // claimed tip), we use a heuristic that requires less
764
                // communication and less verification work. The downside of
765
                // using a heuristic here is a nonzero false positive and false
766
                // negative rate. Note that the false negative event
767
                // (maliciously sending someone into sync mode based on a bogus
768
                // fork) still requires a significant amount of work from the
769
                // attacker, *in addition* to being lucky. Also, down the line
770
                // succinctness (and more specifically, recursive block
771
                // validation) obviates this entire subprotocol.
772

773
                // Did we issue a challenge?
774
                let Some(issued_challenge) = peer_state_info.sync_challenge else {
1✔
775
                    warn!("Sync challenge response was not prompted.");
×
776
                    self.punish(NegativePeerSanction::UnexpectedSyncChallengeResponse)
×
777
                        .await?;
×
778
                    return Ok(KEEP_CONNECTION_ALIVE);
×
779
                };
780

781
                // Reset the challenge, regardless of the response's success.
782
                peer_state_info.sync_challenge = None;
1✔
783

1✔
784
                // Does response match issued challenge?
1✔
785
                if !challenge_response.matches(issued_challenge) {
1✔
786
                    self.punish(NegativePeerSanction::InvalidSyncChallengeResponse)
×
787
                        .await?;
×
788
                    return Ok(KEEP_CONNECTION_ALIVE);
×
789
                }
1✔
790

1✔
791
                // Does response verify?
1✔
792
                let claimed_tip_height = challenge_response.tip.header.height;
1✔
793
                let now = self.now();
1✔
794
                if !challenge_response.is_valid(now).await {
1✔
795
                    self.punish(NegativePeerSanction::InvalidSyncChallengeResponse)
×
796
                        .await?;
×
797
                    return Ok(KEEP_CONNECTION_ALIVE);
×
798
                }
1✔
799

800
                // Does cumulative proof-of-work evolve reasonably?
801
                let own_tip_header = *self
1✔
802
                    .global_state_lock
1✔
803
                    .lock_guard()
1✔
804
                    .await
1✔
805
                    .chain
806
                    .light_state()
1✔
807
                    .header();
1✔
808
                if !challenge_response
1✔
809
                    .check_pow(self.global_state_lock.cli().network, own_tip_header.height)
1✔
810
                {
811
                    self.punish(NegativePeerSanction::FishyPowEvolutionChallengeResponse)
×
812
                        .await?;
×
813
                    return Ok(KEEP_CONNECTION_ALIVE);
×
814
                }
1✔
815

1✔
816
                // Is there some specific (*i.e.*, not aggregate) proof of work?
1✔
817
                if !challenge_response.check_difficulty(own_tip_header.difficulty) {
1✔
818
                    self.punish(NegativePeerSanction::FishyDifficultiesChallengeResponse)
×
819
                        .await?;
×
820
                    return Ok(KEEP_CONNECTION_ALIVE);
×
821
                }
1✔
822

1✔
823
                // Did it come in time?
1✔
824
                if now - issued_challenge.issued_at > SYNC_RESPONSE_TIMEOUT {
1✔
825
                    self.punish(NegativePeerSanction::TimedOutSyncChallengeResponse)
×
826
                        .await?;
×
827
                    return Ok(KEEP_CONNECTION_ALIVE);
×
828
                }
1✔
829

1✔
830
                info!("Successful sync challenge response; relaying peer tip info to main loop.");
1✔
831
                peer_state_info.successful_sync_challenge_response_time = Some(now);
1✔
832

1✔
833
                let mut sync_mmra_anchor = challenge_response.tip.body.block_mmr_accumulator;
1✔
834
                sync_mmra_anchor.append(issued_challenge.challenge.tip_digest);
1✔
835

1✔
836
                // Inform main loop
1✔
837
                self.to_main_tx
1✔
838
                    .send(PeerTaskToMain::AddPeerMaxBlockHeight {
1✔
839
                        peer_address: self.peer_address,
1✔
840
                        claimed_height: claimed_tip_height,
1✔
841
                        claimed_cumulative_pow: issued_challenge.accumulated_pow,
1✔
842
                        claimed_block_mmra: sync_mmra_anchor,
1✔
843
                    })
1✔
844
                    .await?;
1✔
845

846
                Ok(KEEP_CONNECTION_ALIVE)
1✔
847
            }
848
            PeerMessage::BlockRequestByHash(block_digest) => {
×
849
                match self
×
850
                    .global_state_lock
×
851
                    .lock_guard()
×
852
                    .await
×
853
                    .chain
854
                    .archival_state()
×
855
                    .get_block(block_digest)
×
856
                    .await?
×
857
                {
858
                    None => {
859
                        // TODO: Consider punishing here
860
                        warn!("Peer requested unknown block with hash {}", block_digest);
×
861
                        Ok(KEEP_CONNECTION_ALIVE)
×
862
                    }
863
                    Some(b) => {
×
864
                        peer.send(PeerMessage::Block(Box::new(b.try_into().unwrap())))
×
865
                            .await?;
×
866
                        Ok(KEEP_CONNECTION_ALIVE)
×
867
                    }
868
                }
869
            }
870
            PeerMessage::BlockRequestByHeight(block_height) => {
4✔
871
                let block_response = {
3✔
872
                    log_slow_scope!(fn_name!() + "::PeerMessage::BlockRequestByHeight");
4✔
873

4✔
874
                    debug!("Got BlockRequestByHeight of height {}", block_height);
4✔
875

876
                    let canonical_block_digest = self
4✔
877
                        .global_state_lock
4✔
878
                        .lock_guard()
4✔
879
                        .await
4✔
880
                        .chain
881
                        .archival_state()
4✔
882
                        .archival_block_mmr
4✔
883
                        .ammr()
4✔
884
                        .try_get_leaf(block_height.into())
4✔
885
                        .await;
4✔
886

887
                    let canonical_block_digest = match canonical_block_digest {
4✔
888
                        None => {
889
                            let own_tip_height = self
1✔
890
                                .global_state_lock
1✔
891
                                .lock_guard()
1✔
892
                                .await
1✔
893
                                .chain
894
                                .light_state()
1✔
895
                                .header()
1✔
896
                                .height;
1✔
897
                            warn!("Got block request by height ({block_height}) for unknown block. Own tip height is {own_tip_height}.");
1✔
898
                            self.punish(NegativePeerSanction::BlockRequestUnknownHeight)
1✔
899
                                .await?;
1✔
900

901
                            return Ok(KEEP_CONNECTION_ALIVE);
1✔
902
                        }
903
                        Some(digest) => digest,
3✔
904
                    };
905

906
                    let canonical_chain_block: Block = self
3✔
907
                        .global_state_lock
3✔
908
                        .lock_guard()
3✔
909
                        .await
3✔
910
                        .chain
911
                        .archival_state()
3✔
912
                        .get_block(canonical_block_digest)
3✔
913
                        .await?
3✔
914
                        .unwrap();
3✔
915

3✔
916
                    PeerMessage::Block(Box::new(canonical_chain_block.try_into().unwrap()))
3✔
917
                };
3✔
918

3✔
919
                debug!("Sending block");
3✔
920
                peer.send(block_response).await?;
3✔
921
                debug!("Sent block");
3✔
922
                Ok(KEEP_CONNECTION_ALIVE)
3✔
923
            }
924
            PeerMessage::Block(t_block) => {
20✔
925
                log_slow_scope!(fn_name!() + "::PeerMessage::Block");
20✔
926

20✔
927
                info!(
20✔
928
                    "Got new block from peer {}, height {}, mined {}",
×
929
                    self.peer_address,
×
930
                    t_block.header.height,
×
931
                    t_block.header.timestamp.standard_format()
×
932
                );
933
                let new_block_height = t_block.header.height;
20✔
934

935
                let block = match Block::try_from(*t_block) {
20✔
936
                    Ok(block) => Box::new(block),
20✔
937
                    Err(e) => {
×
938
                        warn!("Peer sent invalid block: {e:?}");
×
939
                        self.punish(NegativePeerSanction::InvalidTransferBlock)
×
940
                            .await?;
×
941

942
                        return Ok(KEEP_CONNECTION_ALIVE);
×
943
                    }
944
                };
945

946
                // Update the value for the highest known height that peer possesses iff
947
                // we are not in a fork reconciliation state.
948
                if peer_state_info.fork_reconciliation_blocks.is_empty() {
20✔
949
                    peer_state_info.highest_shared_block_height = new_block_height;
10✔
950
                }
10✔
951

952
                self.try_ensure_path(block, peer, peer_state_info).await?;
20✔
953

954
                // Reward happens as part of `try_ensure_path`
955

956
                Ok(KEEP_CONNECTION_ALIVE)
19✔
957
            }
958
            PeerMessage::BlockRequestBatch(BlockRequestBatch {
959
                known_blocks,
8✔
960
                max_response_len,
8✔
961
                anchor,
8✔
962
            }) => {
8✔
963
                debug!(
8✔
964
                    "Received BlockRequestBatch from peer {}, max_response_len: {max_response_len}",
×
965
                    self.peer_address
966
                );
967

968
                if known_blocks.len() > MAX_NUM_DIGESTS_IN_BATCH_REQUEST {
8✔
969
                    self.punish(NegativePeerSanction::BatchBlocksRequestTooManyDigests)
×
970
                        .await?;
×
971

972
                    return Ok(KEEP_CONNECTION_ALIVE);
×
973
                }
8✔
974

975
                // The last block in the list of the peers known block is the
976
                // earliest block, block with lowest height, the peer has
977
                // requested. If it does not belong to canonical chain, none of
978
                // the later will. So we can do an early abort in that case.
979
                let least_preferred = match known_blocks.last() {
8✔
980
                    Some(least_preferred) => *least_preferred,
8✔
981
                    None => {
982
                        self.punish(NegativePeerSanction::BatchBlocksRequestEmpty)
×
983
                            .await?;
×
984

985
                        return Ok(KEEP_CONNECTION_ALIVE);
×
986
                    }
987
                };
988

989
                let state = self.global_state_lock.lock_guard().await;
8✔
990
                let block_mmr_num_leafs = state.chain.light_state().header().height.next().into();
8✔
991
                let luca_is_known = state
8✔
992
                    .chain
8✔
993
                    .archival_state()
8✔
994
                    .block_belongs_to_canonical_chain(least_preferred)
8✔
995
                    .await;
8✔
996
                if !luca_is_known || anchor.num_leafs() > block_mmr_num_leafs {
8✔
997
                    drop(state);
×
998
                    self.punish(NegativePeerSanction::BatchBlocksUnknownRequest)
×
999
                        .await?;
×
1000
                    peer.send(PeerMessage::UnableToSatisfyBatchRequest).await?;
×
1001

1002
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1003
                }
8✔
1004

1005
                // Happy case: At least *one* of the blocks referenced by peer
1006
                // is known to us.
1007
                let first_block_in_response = {
8✔
1008
                    let mut first_block_in_response: Option<BlockHeight> = None;
8✔
1009
                    for block_digest in known_blocks {
10✔
1010
                        if state
10✔
1011
                            .chain
10✔
1012
                            .archival_state()
10✔
1013
                            .block_belongs_to_canonical_chain(block_digest)
10✔
1014
                            .await
10✔
1015
                        {
1016
                            let height = state
8✔
1017
                                .chain
8✔
1018
                                .archival_state()
8✔
1019
                                .get_block_header(block_digest)
8✔
1020
                                .await
8✔
1021
                                .unwrap()
8✔
1022
                                .height;
8✔
1023
                            first_block_in_response = Some(height);
8✔
1024
                            debug!(
8✔
1025
                                "Found block in canonical chain for batch response: {}",
×
1026
                                block_digest
1027
                            );
1028
                            break;
8✔
1029
                        }
2✔
1030
                    }
1031

1032
                    first_block_in_response
8✔
1033
                        .expect("existence of LUCA should have been established already.")
8✔
1034
                };
8✔
1035

8✔
1036
                debug!(
8✔
1037
                    "Peer's most preferred block has height {first_block_in_response}.\
×
1038
                 Now building response from that height."
×
1039
                );
1040

1041
                // Get the relevant blocks, at most batch-size many, descending from the
1042
                // peer's (alleged) most canonical block. Don't exceed `max_response_len`
1043
                // or `STANDARD_BLOCK_BATCH_SIZE` number of blocks in response.
1044
                let max_response_len = cmp::min(
8✔
1045
                    max_response_len,
8✔
1046
                    self.global_state_lock.cli().sync_mode_threshold,
8✔
1047
                );
8✔
1048
                let max_response_len = cmp::max(max_response_len, MINIMUM_BLOCK_BATCH_SIZE);
8✔
1049
                let max_response_len = cmp::min(max_response_len, STANDARD_BLOCK_BATCH_SIZE);
8✔
1050

8✔
1051
                let mut digests_of_returned_blocks = Vec::with_capacity(max_response_len);
8✔
1052
                let response_start_height: u64 = first_block_in_response.into();
8✔
1053
                let mut i: u64 = 1;
8✔
1054
                while digests_of_returned_blocks.len() < max_response_len {
31✔
1055
                    let block_height = response_start_height + i;
31✔
1056
                    match state
31✔
1057
                        .chain
31✔
1058
                        .archival_state()
31✔
1059
                        .archival_block_mmr
31✔
1060
                        .ammr()
31✔
1061
                        .try_get_leaf(block_height)
31✔
1062
                        .await
31✔
1063
                    {
1064
                        Some(digest) => {
23✔
1065
                            digests_of_returned_blocks.push(digest);
23✔
1066
                        }
23✔
1067
                        None => break,
8✔
1068
                    }
1069
                    i += 1;
23✔
1070
                }
1071

1072
                let mut returned_blocks: Vec<Block> =
8✔
1073
                    Vec::with_capacity(digests_of_returned_blocks.len());
8✔
1074
                for block_digest in digests_of_returned_blocks {
31✔
1075
                    let block = state
23✔
1076
                        .chain
23✔
1077
                        .archival_state()
23✔
1078
                        .get_block(block_digest)
23✔
1079
                        .await?
23✔
1080
                        .unwrap();
23✔
1081
                    returned_blocks.push(block);
23✔
1082
                }
1083

1084
                let response = Self::batch_response(&state, returned_blocks, &anchor).await;
8✔
1085

1086
                // issue 457. do not hold lock across a peer.send(), nor self.punish()
1087
                drop(state);
8✔
1088

1089
                let Some(response) = response else {
8✔
1090
                    warn!("Unable to satisfy batch-block request");
×
1091
                    self.punish(NegativePeerSanction::BatchBlocksUnknownRequest)
×
1092
                        .await?;
×
1093
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1094
                };
1095

1096
                debug!("Returning {} blocks in batch response", response.len());
8✔
1097

1098
                let response = PeerMessage::BlockResponseBatch(response);
8✔
1099
                peer.send(response).await?;
8✔
1100

1101
                Ok(KEEP_CONNECTION_ALIVE)
8✔
1102
            }
1103
            PeerMessage::BlockResponseBatch(authenticated_blocks) => {
×
1104
                log_slow_scope!(fn_name!() + "::PeerMessage::BlockResponseBatch");
×
1105

×
1106
                debug!(
×
1107
                    "handling block response batch with {} blocks",
×
1108
                    authenticated_blocks.len()
×
1109
                );
1110

1111
                // (Alan:) why is there even a minimum?
1112
                if authenticated_blocks.len() < MINIMUM_BLOCK_BATCH_SIZE {
×
1113
                    warn!("Got smaller batch response than allowed");
×
1114
                    self.punish(NegativePeerSanction::TooShortBlockBatch)
×
1115
                        .await?;
×
1116
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1117
                }
×
1118

1119
                // Verify that we are in fact in syncing mode
1120
                // TODO: Separate peer messages into those allowed under syncing
1121
                // and those that are not
1122
                let Some(sync_anchor) = self
×
1123
                    .global_state_lock
×
1124
                    .lock_guard()
×
1125
                    .await
×
1126
                    .net
1127
                    .sync_anchor
1128
                    .clone()
×
1129
                else {
1130
                    warn!("Received a batch of blocks without being in syncing mode");
×
1131
                    self.punish(NegativePeerSanction::ReceivedBatchBlocksOutsideOfSync)
×
1132
                        .await?;
×
1133
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1134
                };
1135

1136
                // Verify that the response matches the current state
1137
                // We get the latest block from the DB here since this message is
1138
                // only valid for archival nodes.
1139
                let (first_block, _) = &authenticated_blocks[0];
×
1140
                let first_blocks_parent_digest: Digest = first_block.header.prev_block_digest;
×
1141
                let most_canonical_own_block_match: Option<Block> = self
×
1142
                    .global_state_lock
×
1143
                    .lock_guard()
×
1144
                    .await
×
1145
                    .chain
1146
                    .archival_state()
×
1147
                    .get_block(first_blocks_parent_digest)
×
1148
                    .await
×
1149
                    .expect("Block lookup must succeed");
×
1150
                let most_canonical_own_block_match: Block = match most_canonical_own_block_match {
×
1151
                    Some(block) => block,
×
1152
                    None => {
1153
                        warn!("Got batch response with invalid start block");
×
1154
                        self.punish(NegativePeerSanction::BatchBlocksInvalidStartHeight)
×
1155
                            .await?;
×
1156
                        return Ok(KEEP_CONNECTION_ALIVE);
×
1157
                    }
1158
                };
1159

1160
                // Convert all blocks to Block objects
1161
                debug!(
×
1162
                    "Found own block of height {} to match received batch",
×
1163
                    most_canonical_own_block_match.kernel.header.height
×
1164
                );
1165
                let mut received_blocks = vec![];
×
1166
                for (t_block, membership_proof) in authenticated_blocks {
×
1167
                    let Ok(block) = Block::try_from(t_block) else {
×
1168
                        warn!("Received invalid transfer block from peer");
×
1169
                        self.punish(NegativePeerSanction::InvalidTransferBlock)
×
1170
                            .await?;
×
1171
                        return Ok(KEEP_CONNECTION_ALIVE);
×
1172
                    };
1173

1174
                    if !membership_proof.verify(
×
1175
                        block.header().height.into(),
×
1176
                        block.hash(),
×
1177
                        &sync_anchor.block_mmr.peaks(),
×
1178
                        sync_anchor.block_mmr.num_leafs(),
×
1179
                    ) {
×
1180
                        warn!("Authentication of received block fails relative to anchor");
×
1181
                        self.punish(NegativePeerSanction::InvalidBlockMmrAuthentication)
×
1182
                            .await?;
×
1183
                        return Ok(KEEP_CONNECTION_ALIVE);
×
1184
                    }
×
1185

×
1186
                    received_blocks.push(block);
×
1187
                }
1188

1189
                // Get the latest block that we know of and handle all received blocks
1190
                self.handle_blocks(received_blocks, most_canonical_own_block_match)
×
1191
                    .await?;
×
1192

1193
                // Reward happens as part of `handle_blocks`.
1194

1195
                Ok(KEEP_CONNECTION_ALIVE)
×
1196
            }
1197
            PeerMessage::UnableToSatisfyBatchRequest => {
1198
                log_slow_scope!(fn_name!() + "::PeerMessage::UnableToSatisfyBatchRequest");
×
1199
                warn!(
×
1200
                    "Peer {} reports inability to satisfy batch request.",
×
1201
                    self.peer_address
1202
                );
1203

1204
                Ok(KEEP_CONNECTION_ALIVE)
×
1205
            }
1206
            PeerMessage::Handshake(_) => {
1207
                log_slow_scope!(fn_name!() + "::PeerMessage::Handshake");
×
1208

×
1209
                // The handshake should have been sent during connection
×
1210
                // initialization. Here it is out of order at best, malicious at
×
1211
                // worst.
×
1212
                self.punish(NegativePeerSanction::InvalidMessage).await?;
×
1213
                Ok(KEEP_CONNECTION_ALIVE)
×
1214
            }
1215
            PeerMessage::ConnectionStatus(_) => {
1216
                log_slow_scope!(fn_name!() + "::PeerMessage::ConnectionStatus");
×
1217

×
1218
                // The connection status should have been sent during connection
×
1219
                // initialization. Here it is out of order at best, malicious at
×
1220
                // worst.
×
1221

×
1222
                self.punish(NegativePeerSanction::InvalidMessage).await?;
×
1223
                Ok(KEEP_CONNECTION_ALIVE)
×
1224
            }
1225
            PeerMessage::Transaction(transaction) => {
2✔
1226
                log_slow_scope!(fn_name!() + "::PeerMessage::Transaction");
2✔
1227

2✔
1228
                debug!(
2✔
1229
                    "`peer_loop` received following transaction from peer. {} inputs, {} outputs. Synced to mutator set hash: {}",
×
1230
                    transaction.kernel.inputs.len(),
×
1231
                    transaction.kernel.outputs.len(),
×
1232
                    transaction.kernel.mutator_set_hash
×
1233
                );
1234

1235
                let transaction: Transaction = (*transaction).into();
2✔
1236

2✔
1237
                // 1. If transaction is invalid, punish.
2✔
1238
                if !transaction.is_valid().await {
2✔
1239
                    warn!("Received invalid tx");
×
1240
                    self.punish(NegativePeerSanction::InvalidTransaction)
×
1241
                        .await?;
×
1242
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1243
                }
2✔
1244

2✔
1245
                // 2. If transaction has coinbase, punish.
2✔
1246
                // Transactions received from peers have not been mined yet.
2✔
1247
                // Only the miner is allowed to produce transactions with non-empty coinbase fields.
2✔
1248
                if transaction.kernel.coinbase.is_some() {
2✔
1249
                    warn!("Received non-mined transaction with coinbase.");
×
1250
                    self.punish(NegativePeerSanction::NonMinedTransactionHasCoinbase)
×
1251
                        .await?;
×
1252
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1253
                }
2✔
1254

2✔
1255
                // 3. If negative fee, punish.
2✔
1256
                if transaction.kernel.fee.is_negative() {
2✔
1257
                    warn!("Received negative-fee transaction.");
×
1258
                    self.punish(NegativePeerSanction::TransactionWithNegativeFee)
×
1259
                        .await?;
×
1260
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1261
                }
2✔
1262

2✔
1263
                // 4. If transaction is already known, ignore.
2✔
1264
                if self
2✔
1265
                    .global_state_lock
2✔
1266
                    .lock_guard()
2✔
1267
                    .await
2✔
1268
                    .mempool
1269
                    .contains_with_higher_proof_quality(
1270
                        transaction.kernel.txid(),
2✔
1271
                        transaction.proof.proof_quality()?,
2✔
1272
                    )
1273
                {
1274
                    warn!("Received transaction that was already known");
×
1275

1276
                    // We received a transaction that we *probably* haven't requested.
1277
                    // Consider punishing here, if this is abused.
1278
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1279
                }
2✔
1280

1281
                // 5. if transaction is not confirmable, punish.
1282
                let (tip, mutator_set_accumulator_after) = {
2✔
1283
                    let state = self.global_state_lock.lock_guard().await;
2✔
1284

1285
                    (
2✔
1286
                        state.chain.light_state().hash(),
2✔
1287
                        state.chain.light_state().mutator_set_accumulator_after(),
2✔
1288
                    )
2✔
1289
                };
2✔
1290
                if !transaction.is_confirmable_relative_to(&mutator_set_accumulator_after) {
2✔
1291
                    warn!(
×
1292
                        "Received unconfirmable transaction with TXID {}. Unconfirmable because:",
×
1293
                        transaction.kernel.txid()
×
1294
                    );
1295
                    // get fine-grained error code for informative logging
1296
                    let confirmability_error_code = transaction
×
1297
                        .kernel
×
1298
                        .is_confirmable_relative_to(&mutator_set_accumulator_after);
×
1299
                    match confirmability_error_code {
×
1300
                        Ok(_) => unreachable!(),
×
1301
                        Err(TransactionConfirmabilityError::InvalidRemovalRecord(index)) => {
×
1302
                            warn!("invalid removal record (at index {index})");
×
1303
                            let invalid_removal_record = transaction.kernel.inputs[index].clone();
×
1304
                            let removal_record_error_code = invalid_removal_record
×
1305
                                .validate_inner(&mutator_set_accumulator_after);
×
1306
                            debug!(
×
1307
                                "Absolute index set of removal record {index}: {:?}",
×
1308
                                invalid_removal_record.absolute_indices
1309
                            );
1310
                            match removal_record_error_code {
×
1311
                                Ok(_) => unreachable!(),
×
1312
                                Err(RemovalRecordValidityError::AbsentAuthenticatedChunk) => {
1313
                                    debug!("invalid because membership proof is missing");
×
1314
                                }
1315
                                Err(RemovalRecordValidityError::InvalidSwbfiMmrMp {
1316
                                    chunk_index,
×
1317
                                }) => {
×
1318
                                    debug!("invalid because membership proof for chunk index {chunk_index} is invalid");
×
1319
                                }
1320
                            };
1321
                            self.punish(NegativePeerSanction::UnconfirmableTransaction)
×
1322
                                .await?;
×
1323
                            return Ok(KEEP_CONNECTION_ALIVE);
×
1324
                        }
1325
                        Err(TransactionConfirmabilityError::DuplicateInputs) => {
1326
                            warn!("duplicate inputs");
×
1327
                            self.punish(NegativePeerSanction::DoubleSpendingTransaction)
×
1328
                                .await?;
×
1329
                            return Ok(KEEP_CONNECTION_ALIVE);
×
1330
                        }
1331
                        Err(TransactionConfirmabilityError::AlreadySpentInput(index)) => {
×
1332
                            warn!("already spent input (at index {index})");
×
1333
                            self.punish(NegativePeerSanction::DoubleSpendingTransaction)
×
1334
                                .await?;
×
1335
                            return Ok(KEEP_CONNECTION_ALIVE);
×
1336
                        }
1337
                    };
1338
                }
2✔
1339

2✔
1340
                // If transaction cannot be applied to mutator set, punish.
2✔
1341
                // I don't think this can happen when above checks pass but we include
2✔
1342
                // the check to ensure that transaction can be applied.
2✔
1343
                let ms_update = MutatorSetUpdate::new(
2✔
1344
                    transaction.kernel.inputs.clone(),
2✔
1345
                    transaction.kernel.outputs.clone(),
2✔
1346
                );
2✔
1347
                let can_apply = ms_update
2✔
1348
                    .apply_to_accumulator(&mut mutator_set_accumulator_after.clone())
2✔
1349
                    .is_ok();
2✔
1350
                if !can_apply {
2✔
1351
                    warn!("Cannot apply transaction to current mutator set.");
×
1352
                    warn!("Transaction ID: {}", transaction.kernel.txid());
×
1353
                    self.punish(NegativePeerSanction::CannotApplyTransactionToMutatorSet)
×
1354
                        .await?;
×
1355
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1356
                }
2✔
1357

2✔
1358
                let tx_timestamp = transaction.kernel.timestamp;
2✔
1359

2✔
1360
                // 6. Ignore if transaction is too old
2✔
1361
                let now = self.now();
2✔
1362
                if tx_timestamp < now - Timestamp::seconds(MEMPOOL_TX_THRESHOLD_AGE_IN_SECS) {
2✔
1363
                    // TODO: Consider punishing here
1364
                    warn!("Received too old tx");
×
1365
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1366
                }
2✔
1367

2✔
1368
                // 7. Ignore if transaction is too far into the future
2✔
1369
                if tx_timestamp
2✔
1370
                    > now + Timestamp::seconds(MEMPOOL_IGNORE_TRANSACTIONS_THIS_MANY_SECS_AHEAD)
2✔
1371
                {
1372
                    // TODO: Consider punishing here
1373
                    warn!("Received tx too far into the future. Got timestamp: {tx_timestamp:?}");
×
1374
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1375
                }
2✔
1376

2✔
1377
                // Otherwise, relay to main
2✔
1378
                let pt2m_transaction = PeerTaskToMainTransaction {
2✔
1379
                    transaction,
2✔
1380
                    confirmable_for_block: tip,
2✔
1381
                };
2✔
1382
                self.to_main_tx
2✔
1383
                    .send(PeerTaskToMain::Transaction(Box::new(pt2m_transaction)))
2✔
1384
                    .await?;
2✔
1385

1386
                Ok(KEEP_CONNECTION_ALIVE)
2✔
1387
            }
1388
            PeerMessage::TransactionNotification(tx_notification) => {
6✔
1389
                // addresses #457
6✔
1390
                // new scope for state read-lock to avoid holding across peer.send()
6✔
1391
                {
6✔
1392
                    log_slow_scope!(fn_name!() + "::PeerMessage::TransactionNotification");
6✔
1393

1394
                    // 1. Ignore if we already know this transaction, and
1395
                    // the proof quality is not higher than what we already know.
1396
                    let state = self.global_state_lock.lock_guard().await;
6✔
1397
                    let transaction_of_same_or_higher_proof_quality_is_known =
6✔
1398
                        state.mempool.contains_with_higher_proof_quality(
6✔
1399
                            tx_notification.txid,
6✔
1400
                            tx_notification.proof_quality,
6✔
1401
                        );
6✔
1402
                    if transaction_of_same_or_higher_proof_quality_is_known {
6✔
1403
                        debug!("transaction with same or higher proof quality was already known");
4✔
1404
                        return Ok(KEEP_CONNECTION_ALIVE);
4✔
1405
                    }
2✔
1406

2✔
1407
                    // Only accept transactions that do not require executing
2✔
1408
                    // `update`.
2✔
1409
                    if state
2✔
1410
                        .chain
2✔
1411
                        .light_state()
2✔
1412
                        .mutator_set_accumulator_after()
2✔
1413
                        .hash()
2✔
1414
                        != tx_notification.mutator_set_hash
2✔
1415
                    {
1416
                        debug!("transaction refers to non-canonical mutator set state");
×
1417
                        return Ok(KEEP_CONNECTION_ALIVE);
×
1418
                    }
2✔
1419
                }
2✔
1420

2✔
1421
                // 2. Request the actual `Transaction` from peer
2✔
1422
                debug!("requesting transaction from peer");
2✔
1423
                peer.send(PeerMessage::TransactionRequest(tx_notification.txid))
2✔
1424
                    .await?;
2✔
1425

1426
                Ok(KEEP_CONNECTION_ALIVE)
2✔
1427
            }
1428
            PeerMessage::TransactionRequest(transaction_identifier) => {
2✔
1429
                let state = self.global_state_lock.lock_guard().await;
2✔
1430
                let Some(transaction) = state.mempool.get(transaction_identifier) else {
2✔
1431
                    return Ok(KEEP_CONNECTION_ALIVE);
1✔
1432
                };
1433

1434
                let Ok(transfer_transaction) = transaction.try_into() else {
1✔
NEW
1435
                    warn!("Peer requested transaction that cannot be converted to transfer object");
×
NEW
1436
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1437
                };
1438

1439
                // Drop state immediately to prevent holding over a response.
1440
                drop(state);
1✔
1441

1✔
1442
                peer.send(PeerMessage::Transaction(Box::new(transfer_transaction)))
1✔
1443
                    .await?;
1✔
1444

1445
                Ok(KEEP_CONNECTION_ALIVE)
1✔
1446
            }
1447
            PeerMessage::BlockProposalNotification(block_proposal_notification) => {
1✔
1448
                let verdict = self
1✔
1449
                    .global_state_lock
1✔
1450
                    .lock_guard()
1✔
1451
                    .await
1✔
1452
                    .favor_incoming_block_proposal(
1✔
1453
                        block_proposal_notification.height,
1✔
1454
                        block_proposal_notification.guesser_fee,
1✔
1455
                    );
1✔
1456
                match verdict {
1✔
1457
                    Ok(_) => {
1458
                        peer.send(PeerMessage::BlockProposalRequest(
1✔
1459
                            BlockProposalRequest::new(block_proposal_notification.body_mast_hash),
1✔
1460
                        ))
1✔
1461
                        .await?
1✔
1462
                    }
1463
                    Err(reject_reason) => {
×
1464
                        info!(
×
1465
                        "Rejecting notification of block proposal with guesser fee {} from peer \
×
1466
                        {}. Reason:\n{reject_reason}",
×
1467
                        block_proposal_notification.guesser_fee.display_n_decimals(5),
×
1468
                        self.peer_address
1469
                    )
1470
                    }
1471
                }
1472

1473
                Ok(KEEP_CONNECTION_ALIVE)
1✔
1474
            }
1475
            PeerMessage::BlockProposalRequest(block_proposal_request) => {
×
1476
                let matching_proposal = self
×
1477
                    .global_state_lock
×
1478
                    .lock_guard()
×
1479
                    .await
×
1480
                    .mining_state
1481
                    .block_proposal
1482
                    .filter(|x| x.body().mast_hash() == block_proposal_request.body_mast_hash)
×
1483
                    .map(|x| x.to_owned());
×
1484
                if let Some(proposal) = matching_proposal {
×
1485
                    peer.send(PeerMessage::BlockProposal(Box::new(proposal)))
×
1486
                        .await?;
×
1487
                } else {
1488
                    self.punish(NegativePeerSanction::BlockProposalNotFound)
×
1489
                        .await?;
×
1490
                }
1491

1492
                Ok(KEEP_CONNECTION_ALIVE)
×
1493
            }
1494
            PeerMessage::BlockProposal(block) => {
1✔
1495
                info!("Got block proposal from peer.");
1✔
1496

1497
                let should_punish = {
1✔
1498
                    log_slow_scope!(fn_name!() + "::PeerMessage::BlockProposal::should_punish");
1✔
1499

1500
                    let (verdict, tip) = {
1✔
1501
                        let state = self.global_state_lock.lock_guard().await;
1✔
1502

1503
                        let verdict = state.favor_incoming_block_proposal(
1✔
1504
                            block.header().height,
1✔
1505
                            block.total_guesser_reward(),
1✔
1506
                        );
1✔
1507
                        let tip = state.chain.light_state().to_owned();
1✔
1508
                        (verdict, tip)
1✔
1509
                    };
1510

1511
                    if let Err(rejection_reason) = verdict {
1✔
1512
                        match rejection_reason {
×
1513
                            // no need to punish and log if the fees are equal.  we just ignore the incoming proposal.
1514
                            BlockProposalRejectError::InsufficientFee { current, received }
×
1515
                                if Some(received) == current =>
×
1516
                            {
×
1517
                                debug!("ignoring new block proposal because the fee is equal to the present one");
×
1518
                                None
×
1519
                            }
1520
                            _ => {
1521
                                warn!("Rejecting new block proposal:\n{rejection_reason}");
×
1522
                                Some(NegativePeerSanction::NonFavorableBlockProposal)
×
1523
                            }
1524
                        }
1525
                    } else {
1526
                        // Verify validity and that proposal is child of current tip
1527
                        if block.is_valid(&tip, self.now()).await {
1✔
1528
                            None // all is well, no punishment.
1✔
1529
                        } else {
1530
                            Some(NegativePeerSanction::InvalidBlockProposal)
×
1531
                        }
1532
                    }
1533
                };
1534

1535
                if let Some(sanction) = should_punish {
1✔
1536
                    self.punish(sanction).await?;
×
1537
                } else {
1538
                    self.send_to_main(PeerTaskToMain::BlockProposal(block), line!())
1✔
1539
                        .await?;
1✔
1540

1541
                    // Valuable, new, hard-to-produce information. Reward peer.
1542
                    self.reward(PositivePeerSanction::NewBlockProposal).await?;
1✔
1543
                }
1544

1545
                Ok(KEEP_CONNECTION_ALIVE)
1✔
1546
            }
1547
        }
1548
    }
93✔
1549

1550
    /// send msg to main via mpsc channel `to_main_tx` and logs if slow.
1551
    ///
1552
    /// the channel could potentially fill up in which case the send() will
1553
    /// block until there is capacity.  we wrap the send() so we can log if
1554
    /// that ever happens to the extent it passes slow-scope threshold.
1555
    async fn send_to_main(
1✔
1556
        &self,
1✔
1557
        msg: PeerTaskToMain,
1✔
1558
        line: u32,
1✔
1559
    ) -> Result<(), tokio::sync::mpsc::error::SendError<PeerTaskToMain>> {
1✔
1560
        // we measure across the send() in case the channel ever fills up.
1✔
1561
        log_slow_scope!(fn_name!() + &format!("peer_loop.rs:{}", line));
1✔
1562

1✔
1563
        self.to_main_tx.send(msg).await
1✔
1564
    }
1✔
1565

1566
    /// Handle message from main task. The boolean return value indicates if
1567
    /// the connection should be closed.
1568
    ///
1569
    /// Locking:
1570
    ///   * acquires `global_state_lock` for write via Self::punish()
1571
    async fn handle_main_task_message<S>(
×
1572
        &mut self,
×
1573
        msg: MainToPeerTask,
×
1574
        peer: &mut S,
×
1575
        peer_state_info: &mut MutablePeerState,
×
1576
    ) -> Result<bool>
×
1577
    where
×
1578
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
×
1579
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
×
1580
        <S as TryStream>::Error: std::error::Error,
×
1581
    {
×
1582
        debug!("Handling {} message from main in peer loop", msg.get_type());
×
1583
        match msg {
×
1584
            MainToPeerTask::Block(block) => {
×
1585
                // We don't currently differentiate whether a new block came from a peer, or from our
×
1586
                // own miner. It's always shared through this logic.
×
1587
                let new_block_height = block.kernel.header.height;
×
1588
                if new_block_height > peer_state_info.highest_shared_block_height {
×
1589
                    debug!("Sending PeerMessage::BlockNotification");
×
1590
                    peer_state_info.highest_shared_block_height = new_block_height;
×
1591
                    peer.send(PeerMessage::BlockNotification(block.as_ref().into()))
×
1592
                        .await?;
×
1593
                    debug!("Sent PeerMessage::BlockNotification");
×
1594
                }
×
1595
                Ok(KEEP_CONNECTION_ALIVE)
×
1596
            }
1597
            MainToPeerTask::RequestBlockBatch(batch_block_request) => {
×
1598
                // Only ask one of the peers about the batch of blocks
×
1599
                if batch_block_request.peer_addr_target != self.peer_address {
×
1600
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1601
                }
×
1602

×
1603
                let max_response_len = std::cmp::min(
×
1604
                    STANDARD_BLOCK_BATCH_SIZE,
×
1605
                    self.global_state_lock.cli().sync_mode_threshold,
×
1606
                );
×
1607

×
1608
                peer.send(PeerMessage::BlockRequestBatch(BlockRequestBatch {
×
1609
                    known_blocks: batch_block_request.known_blocks,
×
1610
                    max_response_len,
×
1611
                    anchor: batch_block_request.anchor_mmr,
×
1612
                }))
×
1613
                .await?;
×
1614

1615
                Ok(KEEP_CONNECTION_ALIVE)
×
1616
            }
1617
            MainToPeerTask::PeerSynchronizationTimeout(socket_addr) => {
×
1618
                log_slow_scope!(fn_name!() + "::MainToPeerTask::PeerSynchronizationTimeout");
×
1619

×
1620
                if self.peer_address != socket_addr {
×
1621
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1622
                }
×
1623

×
1624
                self.punish(NegativePeerSanction::SynchronizationTimeout)
×
1625
                    .await?;
×
1626

1627
                // If this peer failed the last synchronization attempt, we only
1628
                // sanction, we don't disconnect.
1629
                Ok(KEEP_CONNECTION_ALIVE)
×
1630
            }
1631
            MainToPeerTask::MakePeerDiscoveryRequest => {
1632
                peer.send(PeerMessage::PeerListRequest).await?;
×
1633
                Ok(KEEP_CONNECTION_ALIVE)
×
1634
            }
1635
            MainToPeerTask::Disconnect(peer_address) => {
×
1636
                log_slow_scope!(fn_name!() + "::MainToPeerTask::Disconnect");
×
1637

×
1638
                // Only disconnect from the peer the main task requested a disconnect for.
×
1639
                if peer_address != self.peer_address {
×
1640
                    return Ok(KEEP_CONNECTION_ALIVE);
×
1641
                }
×
1642
                self.register_peer_disconnection().await;
×
1643

1644
                Ok(DISCONNECT_CONNECTION)
×
1645
            }
1646
            MainToPeerTask::DisconnectAll() => {
1647
                self.register_peer_disconnection().await;
×
1648

1649
                Ok(DISCONNECT_CONNECTION)
×
1650
            }
1651
            MainToPeerTask::MakeSpecificPeerDiscoveryRequest(target_socket_addr) => {
×
1652
                if target_socket_addr == self.peer_address {
×
1653
                    peer.send(PeerMessage::PeerListRequest).await?;
×
1654
                }
×
1655
                Ok(KEEP_CONNECTION_ALIVE)
×
1656
            }
1657
            MainToPeerTask::TransactionNotification(transaction_notification) => {
×
1658
                debug!("Sending PeerMessage::TransactionNotification");
×
1659
                peer.send(PeerMessage::TransactionNotification(
×
1660
                    transaction_notification,
×
1661
                ))
×
1662
                .await?;
×
1663
                debug!("Sent PeerMessage::TransactionNotification");
×
1664
                Ok(KEEP_CONNECTION_ALIVE)
×
1665
            }
1666
            MainToPeerTask::BlockProposalNotification(block_proposal_notification) => {
×
1667
                debug!("Sending PeerMessage::BlockProposalNotification");
×
1668
                peer.send(PeerMessage::BlockProposalNotification(
×
1669
                    block_proposal_notification,
×
1670
                ))
×
1671
                .await?;
×
1672
                debug!("Sent PeerMessage::BlockProposalNotification");
×
1673
                Ok(KEEP_CONNECTION_ALIVE)
×
1674
            }
1675
        }
1676
    }
×
1677

1678
    /// Loop for the peer tasks. Awaits either a message from the peer over TCP,
1679
    /// or a message from main over the main-to-peer-tasks broadcast channel.
1680
    async fn run<S>(
41✔
1681
        &mut self,
41✔
1682
        mut peer: S,
41✔
1683
        mut from_main_rx: broadcast::Receiver<MainToPeerTask>,
41✔
1684
        peer_state_info: &mut MutablePeerState,
41✔
1685
    ) -> Result<()>
41✔
1686
    where
41✔
1687
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
41✔
1688
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
41✔
1689
        <S as TryStream>::Error: std::error::Error,
41✔
1690
    {
41✔
1691
        loop {
1692
            select! {
93✔
1693
                // Handle peer messages
1694
                peer_message = peer.try_next() => {
93✔
1695
                    let peer_address = self.peer_address;
93✔
1696
                    let peer_message = match peer_message {
93✔
1697
                        Ok(message) => message,
93✔
1698
                        Err(err) => {
×
1699
                            let msg = format!("Error when receiving from peer: {peer_address}");
×
1700
                            error!("{msg}. Error: {err}");
×
1701
                            bail!("{msg}. Closing connection.");
×
1702
                        }
1703
                    };
1704
                    let Some(peer_message) = peer_message else {
93✔
1705
                        info!("Peer {peer_address} closed connection.");
×
1706
                        break;
×
1707
                    };
1708

1709
                    let syncing =
93✔
1710
                        self.global_state_lock.lock(|s| s.net.sync_anchor.is_some()).await;
93✔
1711
                    let message_type = peer_message.get_type();
93✔
1712
                    if peer_message.ignore_during_sync() && syncing {
93✔
1713
                        debug!(
×
1714
                            "Ignoring {message_type} message when syncing, from {peer_address}",
×
1715
                        );
1716
                        continue;
×
1717
                    }
93✔
1718
                    if peer_message.ignore_when_not_sync() && !syncing {
93✔
1719
                        debug!(
×
1720
                            "Ignoring {message_type} message when not syncing, from {peer_address}",
×
1721
                        );
1722
                        continue;
×
1723
                    }
93✔
1724

93✔
1725
                    match self
93✔
1726
                        .handle_peer_message(peer_message, &mut peer, peer_state_info)
93✔
1727
                        .await
93✔
1728
                    {
1729
                        Ok(false) => {}
52✔
1730
                        Ok(true) => {
1731
                            info!("Closing connection to {peer_address}");
40✔
1732
                            break;
40✔
1733
                        }
1734
                        Err(err) => {
1✔
1735
                            warn!("Closing connection to {peer_address} because of error {err}.");
1✔
1736
                            bail!("{err}");
1✔
1737
                        }
1738
                    };
1739
                }
1740

1741
                // Handle messages from main task
1742
                main_msg_res = from_main_rx.recv() => {
93✔
1743
                    let main_msg = main_msg_res
×
1744
                        .unwrap_or_else(|e| panic!("Failed to read from main loop: {e}"));
×
1745
                    let close_connection = self
×
1746
                        .handle_main_task_message(main_msg, &mut peer, peer_state_info)
×
1747
                        .await
×
1748
                        .unwrap_or_else(|err| {
×
1749
                            warn!("handle_main_task_message returned an error: {err}");
×
1750
                            true
×
1751
                        });
×
1752

×
1753
                    if close_connection {
×
1754
                        info!(
×
1755
                            "handle_main_task_message is closing the connection to {}",
×
1756
                            self.peer_address
1757
                        );
1758
                        break;
×
1759
                    }
×
1760
                }
1761
            }
1762
        }
1763

1764
        Ok(())
40✔
1765
    }
41✔
1766

1767
    /// Function called before entering the peer loop. Reads the potentially stored
1768
    /// peer standing from the database and does other book-keeping before entering
1769
    /// its final resting place: the `peer_loop`. Note that the peer has already been
1770
    /// accepted for a connection for this loop to be entered. So we don't need
1771
    /// to check the standing again.
1772
    ///
1773
    /// Locking:
1774
    ///   * acquires `global_state_lock` for write
1775
    pub(crate) async fn run_wrapper<S>(
32✔
1776
        &mut self,
32✔
1777
        mut peer: S,
32✔
1778
        from_main_rx: broadcast::Receiver<MainToPeerTask>,
32✔
1779
    ) -> Result<()>
32✔
1780
    where
32✔
1781
        S: Sink<PeerMessage> + TryStream<Ok = PeerMessage> + Unpin,
32✔
1782
        <S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
32✔
1783
        <S as TryStream>::Error: std::error::Error,
32✔
1784
    {
32✔
1785
        const TIME_DIFFERENCE_WARN_THRESHOLD_IN_SECONDS: i128 = 120;
1786

1787
        let cli_args = self.global_state_lock.cli().clone();
32✔
1788

1789
        let standing = self
32✔
1790
            .global_state_lock
32✔
1791
            .lock_guard()
32✔
1792
            .await
32✔
1793
            .net
1794
            .peer_databases
1795
            .peer_standings
1796
            .get(self.peer_address.ip())
32✔
1797
            .await
32✔
1798
            .unwrap_or_else(|| PeerStanding::new(cli_args.peer_tolerance));
31✔
1799

31✔
1800
        // Add peer to peer map
31✔
1801
        let peer_connection_info = PeerConnectionInfo::new(
31✔
1802
            self.peer_handshake_data.listen_port,
31✔
1803
            self.peer_address,
31✔
1804
            self.inbound_connection,
31✔
1805
        );
31✔
1806
        let new_peer = PeerInfo::new(
31✔
1807
            peer_connection_info,
31✔
1808
            &self.peer_handshake_data,
31✔
1809
            SystemTime::now(),
31✔
1810
            cli_args.peer_tolerance,
31✔
1811
        )
31✔
1812
        .with_standing(standing);
31✔
1813

31✔
1814
        // If timestamps are different, we currently just log a warning.
31✔
1815
        let peer_clock_ahead_in_seconds = new_peer.time_difference_in_seconds();
31✔
1816
        let own_clock_ahead_in_seconds = -peer_clock_ahead_in_seconds;
31✔
1817
        if peer_clock_ahead_in_seconds > TIME_DIFFERENCE_WARN_THRESHOLD_IN_SECONDS
31✔
1818
            || own_clock_ahead_in_seconds > TIME_DIFFERENCE_WARN_THRESHOLD_IN_SECONDS
31✔
1819
        {
1820
            let own_datetime_utc: DateTime<Utc> =
×
1821
                new_peer.own_timestamp_connection_established.into();
×
1822
            let peer_datetime_utc: DateTime<Utc> =
×
1823
                new_peer.peer_timestamp_connection_established.into();
×
1824
            warn!(
×
1825
                "New peer {} disagrees with us about time. Peer reports time {} but our clock at handshake was {}.",
×
1826
                new_peer.connected_address(),
×
1827
                peer_datetime_utc.format("%Y-%m-%d %H:%M:%S"),
×
1828
                own_datetime_utc.format("%Y-%m-%d %H:%M:%S"));
×
1829
        }
31✔
1830

1831
        // Multiple tasks might attempt to set up a connection concurrently. So
1832
        // even though we've checked that this connection is allowed, this check
1833
        // could have been invalidated by another task, for one accepting an
1834
        // incoming connection from a peer we're currently connecting to. So we
1835
        // need to make the a check again while holding a write-lock, since
1836
        // we're modifying `peer_map` here. Holding a read-lock doesn't work
1837
        // since it would have to be dropped before acquiring the write-lock.
1838
        {
1839
            let mut global_state = self.global_state_lock.lock_guard_mut().await;
31✔
1840
            let peer_map = &mut global_state.net.peer_map;
31✔
1841
            if peer_map
31✔
1842
                .values()
31✔
1843
                .any(|pi| pi.instance_id() == self.peer_handshake_data.instance_id)
31✔
1844
            {
1845
                bail!("Attempted to connect to already connected peer. Aborting connection.");
×
1846
            }
31✔
1847

31✔
1848
            if peer_map.len() >= cli_args.max_num_peers {
31✔
1849
                bail!("Attempted to connect to more peers than allowed. Aborting connection.");
×
1850
            }
31✔
1851

31✔
1852
            if peer_map.contains_key(&self.peer_address) {
31✔
1853
                // This shouldn't be possible, unless the peer reports a different instance ID than
1854
                // for the other connection. Only a malignant client would do that.
1855
                bail!("Already connected to peer. Aborting connection");
×
1856
            }
31✔
1857

31✔
1858
            peer_map.insert(self.peer_address, new_peer);
31✔
1859
        }
31✔
1860

31✔
1861
        // `MutablePeerState` contains the part of the peer-loop's state that is mutable
31✔
1862
        let mut peer_state = MutablePeerState::new(self.peer_handshake_data.tip_header.height);
31✔
1863

31✔
1864
        // If peer indicates more canonical block, request a block notification to catch up ASAP
31✔
1865
        if self.peer_handshake_data.tip_header.cumulative_proof_of_work
31✔
1866
            > self
31✔
1867
                .global_state_lock
31✔
1868
                .lock_guard()
31✔
1869
                .await
31✔
1870
                .chain
1871
                .light_state()
31✔
1872
                .kernel
1873
                .header
1874
                .cumulative_proof_of_work
1875
        {
1876
            // Send block notification request to catch up ASAP, in case we're
1877
            // behind the newly-connected peer.
1878
            peer.send(PeerMessage::BlockNotificationRequest).await?;
×
1879
        }
31✔
1880

1881
        let res = self.run(peer, from_main_rx, &mut peer_state).await;
31✔
1882
        debug!("Exited peer loop for {}", self.peer_address);
31✔
1883

1884
        close_peer_connected_callback(
31✔
1885
            self.global_state_lock.clone(),
31✔
1886
            self.peer_address,
31✔
1887
            &self.to_main_tx,
31✔
1888
        )
31✔
1889
        .await;
31✔
1890

1891
        debug!("Ending peer loop for {}", self.peer_address);
31✔
1892

1893
        // Return any error that `run` returned. Returning and not suppressing errors is a quite nice
1894
        // feature to have for testing purposes.
1895
        res
31✔
1896
    }
31✔
1897

1898
    /// Register graceful peer disconnection in the global state.
1899
    ///
1900
    /// See also [`NetworkingState::register_peer_disconnection`][1].
1901
    ///
1902
    /// # Locking:
1903
    ///   * acquires `global_state_lock` for write
1904
    ///
1905
    /// [1]: crate::models::state::networking_state::NetworkingState::register_peer_disconnection
1906
    async fn register_peer_disconnection(&mut self) {
×
1907
        let peer_id = self.peer_handshake_data.instance_id;
×
1908
        self.global_state_lock
×
1909
            .lock_guard_mut()
×
1910
            .await
×
1911
            .net
1912
            .register_peer_disconnection(peer_id, SystemTime::now());
×
1913
    }
×
1914
}
1915

1916
#[cfg(test)]
1917
mod peer_loop_tests {
1918
    use rand::rngs::StdRng;
1919
    use rand::Rng;
1920
    use rand::SeedableRng;
1921
    use tokio::sync::mpsc::error::TryRecvError;
1922
    use tracing_test::traced_test;
1923

1924
    use super::*;
1925
    use crate::config_models::cli_args;
1926
    use crate::config_models::network::Network;
1927
    use crate::job_queue::triton_vm::TritonVmJobQueue;
1928
    use crate::models::blockchain::block::block_header::TARGET_BLOCK_INTERVAL;
1929
    use crate::models::blockchain::type_scripts::native_currency_amount::NativeCurrencyAmount;
1930
    use crate::models::peer::peer_block_notifications::PeerBlockNotification;
1931
    use crate::models::peer::transaction_notification::TransactionNotification;
1932
    use crate::models::state::mempool::TransactionOrigin;
1933
    use crate::models::state::tx_proving_capability::TxProvingCapability;
1934
    use crate::models::state::wallet::utxo_notification::UtxoNotificationMedium;
1935
    use crate::models::state::wallet::wallet_entropy::WalletEntropy;
1936
    use crate::tests::shared::fake_valid_block_for_tests;
1937
    use crate::tests::shared::fake_valid_sequence_of_blocks_for_tests;
1938
    use crate::tests::shared::get_dummy_handshake_data_for_genesis;
1939
    use crate::tests::shared::get_dummy_peer_connection_data_genesis;
1940
    use crate::tests::shared::get_dummy_socket_address;
1941
    use crate::tests::shared::get_test_genesis_setup;
1942
    use crate::tests::shared::invalid_empty_single_proof_transaction;
1943
    use crate::tests::shared::Action;
1944
    use crate::tests::shared::Mock;
1945

1946
    #[traced_test]
×
1947
    #[tokio::test]
1948
    async fn test_peer_loop_bye() -> Result<()> {
1✔
1949
        let mock = Mock::new(vec![Action::Read(PeerMessage::Bye)]);
1✔
1950

1✔
1951
        let (peer_broadcast_tx, _from_main_rx_clone, to_main_tx, _to_main_rx1, state_lock, hsd) =
1✔
1952
            get_test_genesis_setup(Network::Alpha, 2, cli_args::Args::default()).await?;
1✔
1953

1✔
1954
        let peer_address = get_dummy_socket_address(2);
1✔
1955
        let from_main_rx_clone = peer_broadcast_tx.subscribe();
1✔
1956
        let mut peer_loop_handler =
1✔
1957
            PeerLoopHandler::new(to_main_tx, state_lock.clone(), peer_address, hsd, true, 1);
1✔
1958
        peer_loop_handler
1✔
1959
            .run_wrapper(mock, from_main_rx_clone)
1✔
1960
            .await?;
1✔
1961

1✔
1962
        assert_eq!(
1✔
1963
            2,
1✔
1964
            state_lock.lock_guard().await.net.peer_map.len(),
1✔
1965
            "peer map length must be back to 2 after goodbye"
1✔
1966
        );
1✔
1967

1✔
1968
        Ok(())
1✔
1969
    }
1✔
1970

1971
    #[traced_test]
×
1972
    #[tokio::test]
1973
    async fn test_peer_loop_peer_list() {
1✔
1974
        let (peer_broadcast_tx, _from_main_rx_clone, to_main_tx, _to_main_rx1, state_lock, _hsd) =
1✔
1975
            get_test_genesis_setup(Network::Alpha, 2, cli_args::Args::default())
1✔
1976
                .await
1✔
1977
                .unwrap();
1✔
1978

1✔
1979
        let mut peer_infos = state_lock
1✔
1980
            .lock_guard()
1✔
1981
            .await
1✔
1982
            .net
1✔
1983
            .peer_map
1✔
1984
            .clone()
1✔
1985
            .into_values()
1✔
1986
            .collect::<Vec<_>>();
1✔
1987
        peer_infos.sort_by_cached_key(|x| x.connected_address());
2✔
1988
        let (peer_address0, instance_id0) = (
1✔
1989
            peer_infos[0].connected_address(),
1✔
1990
            peer_infos[0].instance_id(),
1✔
1991
        );
1✔
1992
        let (peer_address1, instance_id1) = (
1✔
1993
            peer_infos[1].connected_address(),
1✔
1994
            peer_infos[1].instance_id(),
1✔
1995
        );
1✔
1996

1✔
1997
        let (hsd2, sa2) = get_dummy_peer_connection_data_genesis(Network::Alpha, 2);
1✔
1998
        let expected_response = vec![
1✔
1999
            (peer_address0, instance_id0),
1✔
2000
            (peer_address1, instance_id1),
1✔
2001
            (sa2, hsd2.instance_id),
1✔
2002
        ];
1✔
2003
        let mock = Mock::new(vec![
1✔
2004
            Action::Read(PeerMessage::PeerListRequest),
1✔
2005
            Action::Write(PeerMessage::PeerListResponse(expected_response)),
1✔
2006
            Action::Read(PeerMessage::Bye),
1✔
2007
        ]);
1✔
2008

1✔
2009
        let from_main_rx_clone = peer_broadcast_tx.subscribe();
1✔
2010

1✔
2011
        let mut peer_loop_handler =
1✔
2012
            PeerLoopHandler::new(to_main_tx, state_lock.clone(), sa2, hsd2, true, 0);
1✔
2013
        peer_loop_handler
1✔
2014
            .run_wrapper(mock, from_main_rx_clone)
1✔
2015
            .await
1✔
2016
            .unwrap();
1✔
2017

1✔
2018
        assert_eq!(
1✔
2019
            2,
1✔
2020
            state_lock.lock_guard().await.net.peer_map.len(),
1✔
2021
            "peer map must have length 2 after saying goodbye to peer 2"
1✔
2022
        );
1✔
2023
    }
1✔
2024

2025
    #[traced_test]
×
2026
    #[tokio::test]
2027
    async fn different_genesis_test() -> Result<()> {
1✔
2028
        // In this scenario a peer provides another genesis block than what has been
1✔
2029
        // hardcoded. This should lead to the closing of the connection to this peer
1✔
2030
        // and a ban.
1✔
2031

1✔
2032
        let network = Network::Main;
1✔
2033
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, mut to_main_rx1, state_lock, hsd) =
1✔
2034
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
2035
        assert_eq!(1000, state_lock.cli().peer_tolerance);
1✔
2036
        let peer_address = get_dummy_socket_address(0);
1✔
2037

1✔
2038
        // Although the database is empty, `get_latest_block` still returns the genesis block,
1✔
2039
        // since that block is hardcoded.
1✔
2040
        let mut different_genesis_block = state_lock
1✔
2041
            .lock_guard()
1✔
2042
            .await
1✔
2043
            .chain
1✔
2044
            .archival_state()
1✔
2045
            .get_tip()
1✔
2046
            .await;
1✔
2047

1✔
2048
        different_genesis_block.set_header_nonce(StdRng::seed_from_u64(5550001).random());
1✔
2049
        let [block_1_with_different_genesis] = fake_valid_sequence_of_blocks_for_tests(
1✔
2050
            &different_genesis_block,
1✔
2051
            Timestamp::hours(1),
1✔
2052
            StdRng::seed_from_u64(5550001).random(),
1✔
2053
        )
1✔
2054
        .await;
1✔
2055
        let mock = Mock::new(vec![Action::Read(PeerMessage::Block(Box::new(
1✔
2056
            block_1_with_different_genesis.try_into().unwrap(),
1✔
2057
        )))]);
1✔
2058

1✔
2059
        let mut peer_loop_handler = PeerLoopHandler::new(
1✔
2060
            to_main_tx.clone(),
1✔
2061
            state_lock.clone(),
1✔
2062
            peer_address,
1✔
2063
            hsd,
1✔
2064
            true,
1✔
2065
            1,
1✔
2066
        );
1✔
2067
        let res = peer_loop_handler
1✔
2068
            .run_wrapper(mock, from_main_rx_clone)
1✔
2069
            .await;
1✔
2070
        assert!(
1✔
2071
            res.is_err(),
1✔
2072
            "run_wrapper must return failure when genesis is different"
1✔
2073
        );
1✔
2074

1✔
2075
        match to_main_rx1.recv().await {
1✔
2076
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
1✔
2077
            _ => bail!("Must receive remove of peer block max height"),
1✔
2078
        }
1✔
2079

1✔
2080
        // Verify that no further message was sent to main loop
1✔
2081
        match to_main_rx1.try_recv() {
1✔
2082
            Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (),
1✔
2083
            _ => bail!("Block notification must not be sent for block with invalid PoW"),
1✔
2084
        };
1✔
2085

1✔
2086
        drop(to_main_tx);
1✔
2087

1✔
2088
        let peer_standing = state_lock
1✔
2089
            .lock_guard()
1✔
2090
            .await
1✔
2091
            .net
1✔
2092
            .get_peer_standing_from_database(peer_address.ip())
1✔
2093
            .await;
1✔
2094
        assert_eq!(
1✔
2095
            -i32::from(state_lock.cli().peer_tolerance),
1✔
2096
            peer_standing.unwrap().standing
1✔
2097
        );
1✔
2098
        assert_eq!(
1✔
2099
            NegativePeerSanction::DifferentGenesis,
1✔
2100
            peer_standing.unwrap().latest_punishment.unwrap().0
1✔
2101
        );
1✔
2102

1✔
2103
        Ok(())
1✔
2104
    }
1✔
2105

2106
    #[traced_test]
×
2107
    #[tokio::test]
2108
    async fn node_does_not_record_disconnection_time_when_peer_initiates_disconnect() -> Result<()>
1✔
2109
    {
1✔
2110
        let args = cli_args::Args::default();
1✔
2111
        let network = args.network;
1✔
2112
        let (from_main_tx, from_main_rx, to_main_tx, to_main_rx, state_lock, _) =
1✔
2113
            get_test_genesis_setup(network, 0, args).await?;
1✔
2114

1✔
2115
        let peer_address = get_dummy_socket_address(0);
1✔
2116
        let peer_handshake_data = get_dummy_handshake_data_for_genesis(network);
1✔
2117
        let peer_id = peer_handshake_data.instance_id;
1✔
2118
        let mut peer_loop_handler = PeerLoopHandler::new(
1✔
2119
            to_main_tx,
1✔
2120
            state_lock.clone(),
1✔
2121
            peer_address,
1✔
2122
            peer_handshake_data,
1✔
2123
            true,
1✔
2124
            1,
1✔
2125
        );
1✔
2126
        let mock = Mock::new(vec![Action::Read(PeerMessage::Bye)]);
1✔
2127
        peer_loop_handler.run_wrapper(mock, from_main_rx).await?;
1✔
2128

1✔
2129
        let global_state = state_lock.lock_guard().await;
1✔
2130
        assert!(global_state
1✔
2131
            .net
1✔
2132
            .last_disconnection_time_of_peer(peer_id)
1✔
2133
            .is_none());
1✔
2134

1✔
2135
        drop(to_main_rx);
1✔
2136
        drop(from_main_tx);
1✔
2137

1✔
2138
        Ok(())
1✔
2139
    }
1✔
2140

2141
    #[traced_test]
×
2142
    #[tokio::test]
2143
    async fn block_without_valid_pow_test() -> Result<()> {
1✔
2144
        // In this scenario, a block without a valid PoW is received. This block should be rejected
1✔
2145
        // by the peer loop and a notification should never reach the main loop.
1✔
2146

1✔
2147
        let network = Network::Main;
1✔
2148
        let (peer_broadcast_tx, _from_main_rx_clone, to_main_tx, mut to_main_rx1, state_lock, hsd) =
1✔
2149
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
2150
        let peer_address = get_dummy_socket_address(0);
1✔
2151
        let genesis_block: Block = state_lock
1✔
2152
            .lock_guard()
1✔
2153
            .await
1✔
2154
            .chain
1✔
2155
            .archival_state()
1✔
2156
            .get_tip()
1✔
2157
            .await;
1✔
2158

1✔
2159
        // Make a with hash above what the implied threshold from
1✔
2160
        let [mut block_without_valid_pow] = fake_valid_sequence_of_blocks_for_tests(
1✔
2161
            &genesis_block,
1✔
2162
            Timestamp::hours(1),
1✔
2163
            StdRng::seed_from_u64(5550001).random(),
1✔
2164
        )
1✔
2165
        .await;
1✔
2166

1✔
2167
        // This *probably* is invalid PoW -- and needs to be for this test to
1✔
2168
        // work.
1✔
2169
        block_without_valid_pow.set_header_nonce(Digest::default());
1✔
2170

1✔
2171
        // Sending an invalid block will not necessarily result in a ban. This depends on the peer
1✔
2172
        // tolerance that is set in the client. For this reason, we include a "Bye" here.
1✔
2173
        let mock = Mock::new(vec![
1✔
2174
            Action::Read(PeerMessage::Block(Box::new(
1✔
2175
                block_without_valid_pow.clone().try_into().unwrap(),
1✔
2176
            ))),
1✔
2177
            Action::Read(PeerMessage::Bye),
1✔
2178
        ]);
1✔
2179

1✔
2180
        let from_main_rx_clone = peer_broadcast_tx.subscribe();
1✔
2181

1✔
2182
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
2183
            to_main_tx.clone(),
1✔
2184
            state_lock.clone(),
1✔
2185
            peer_address,
1✔
2186
            hsd,
1✔
2187
            true,
1✔
2188
            1,
1✔
2189
            block_without_valid_pow.header().timestamp,
1✔
2190
        );
1✔
2191
        peer_loop_handler
1✔
2192
            .run_wrapper(mock, from_main_rx_clone)
1✔
2193
            .await
1✔
2194
            .expect("sending (one) invalid block should not result in closed connection");
1✔
2195

1✔
2196
        match to_main_rx1.recv().await {
1✔
2197
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
1✔
2198
            _ => bail!("Must receive remove of peer block max height"),
1✔
2199
        }
1✔
2200

1✔
2201
        // Verify that no further message was sent to main loop
1✔
2202
        match to_main_rx1.try_recv() {
1✔
2203
            Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (),
1✔
2204
            _ => bail!("Block notification must not be sent for block with invalid PoW"),
1✔
2205
        };
1✔
2206

1✔
2207
        // We need to have the transmitter in scope until we have received from it
1✔
2208
        // otherwise the receiver will report the disconnected error when we attempt
1✔
2209
        // to read from it. And the purpose is to verify that the channel is empty,
1✔
2210
        // not that it has been closed.
1✔
2211
        drop(to_main_tx);
1✔
2212

1✔
2213
        // Verify that peer standing was stored in database
1✔
2214
        let standing = state_lock
1✔
2215
            .lock_guard()
1✔
2216
            .await
1✔
2217
            .net
1✔
2218
            .peer_databases
1✔
2219
            .peer_standings
1✔
2220
            .get(peer_address.ip())
1✔
2221
            .await
1✔
2222
            .unwrap();
1✔
2223
        assert!(
1✔
2224
            standing.standing < 0,
1✔
2225
            "Peer must be sanctioned for sending a bad block"
1✔
2226
        );
1✔
2227

1✔
2228
        Ok(())
1✔
2229
    }
1✔
2230

2231
    #[traced_test]
×
2232
    #[tokio::test]
2233
    async fn test_peer_loop_block_with_block_in_db() -> Result<()> {
1✔
2234
        // The scenario tested here is that a client receives a block that is already
1✔
2235
        // known and stored. The expected behavior is to ignore the block and not send
1✔
2236
        // a message to the main task.
1✔
2237

1✔
2238
        let network = Network::Main;
1✔
2239
        let (peer_broadcast_tx, _from_main_rx_clone, to_main_tx, mut to_main_rx1, mut alice, hsd) =
1✔
2240
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
2241
        let peer_address = get_dummy_socket_address(0);
1✔
2242
        let genesis_block: Block = Block::genesis(network);
1✔
2243

1✔
2244
        let now = genesis_block.header().timestamp + Timestamp::hours(1);
1✔
2245
        let block_1 =
1✔
2246
            fake_valid_block_for_tests(&alice, StdRng::seed_from_u64(5550001).random()).await;
1✔
2247
        assert!(
1✔
2248
            block_1.is_valid(&genesis_block, now).await,
1✔
2249
            "Block must be valid for this test to make sense"
1✔
2250
        );
1✔
2251
        alice.set_new_tip(block_1.clone()).await?;
1✔
2252

1✔
2253
        let mock_peer_messages = Mock::new(vec![
1✔
2254
            Action::Read(PeerMessage::Block(Box::new(
1✔
2255
                block_1.clone().try_into().unwrap(),
1✔
2256
            ))),
1✔
2257
            Action::Read(PeerMessage::Bye),
1✔
2258
        ]);
1✔
2259

1✔
2260
        let from_main_rx_clone = peer_broadcast_tx.subscribe();
1✔
2261

1✔
2262
        let mut alice_peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
2263
            to_main_tx.clone(),
1✔
2264
            alice.clone(),
1✔
2265
            peer_address,
1✔
2266
            hsd,
1✔
2267
            false,
1✔
2268
            1,
1✔
2269
            block_1.header().timestamp,
1✔
2270
        );
1✔
2271
        alice_peer_loop_handler
1✔
2272
            .run_wrapper(mock_peer_messages, from_main_rx_clone)
1✔
2273
            .await?;
1✔
2274

1✔
2275
        match to_main_rx1.recv().await {
1✔
2276
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
1✔
2277
            other => bail!("Must receive remove of peer block max height. Got:\n {other:?}"),
1✔
2278
        }
1✔
2279
        match to_main_rx1.try_recv() {
1✔
2280
            Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (),
1✔
2281
            _ => bail!("Block notification must not be sent for block with invalid PoW"),
1✔
2282
        };
1✔
2283
        drop(to_main_tx);
1✔
2284

1✔
2285
        if !alice.lock_guard().await.net.peer_map.is_empty() {
1✔
2286
            bail!("peer map must be empty after closing connection gracefully");
1✔
2287
        }
1✔
2288

1✔
2289
        Ok(())
1✔
2290
    }
1✔
2291

2292
    #[traced_test]
×
2293
    #[tokio::test]
2294
    async fn block_request_batch_simple() {
1✔
2295
        // Scenario: Six blocks (including genesis) are known. Peer requests
1✔
2296
        // from all possible starting points, and client responds with the
1✔
2297
        // correct list of blocks.
1✔
2298
        let network = Network::Main;
1✔
2299
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, mut state_lock, hsd) =
1✔
2300
            get_test_genesis_setup(network, 0, cli_args::Args::default())
1✔
2301
                .await
1✔
2302
                .unwrap();
1✔
2303
        let genesis_block: Block = Block::genesis(network);
1✔
2304
        let peer_address = get_dummy_socket_address(0);
1✔
2305
        let [block_1, block_2, block_3, block_4, block_5] =
1✔
2306
            fake_valid_sequence_of_blocks_for_tests(
1✔
2307
                &genesis_block,
1✔
2308
                Timestamp::hours(1),
1✔
2309
                StdRng::seed_from_u64(5550001).random(),
1✔
2310
            )
1✔
2311
            .await;
1✔
2312
        let blocks = vec![
1✔
2313
            genesis_block,
1✔
2314
            block_1,
1✔
2315
            block_2,
1✔
2316
            block_3,
1✔
2317
            block_4,
1✔
2318
            block_5.clone(),
1✔
2319
        ];
1✔
2320
        for block in blocks.iter().skip(1) {
5✔
2321
            state_lock.set_new_tip(block.to_owned()).await.unwrap();
5✔
2322
        }
1✔
2323

1✔
2324
        let mmra = state_lock
1✔
2325
            .lock_guard()
1✔
2326
            .await
1✔
2327
            .chain
1✔
2328
            .archival_state()
1✔
2329
            .archival_block_mmr
1✔
2330
            .ammr()
1✔
2331
            .to_accumulator_async()
1✔
2332
            .await;
1✔
2333
        for i in 0..=4 {
6✔
2334
            let expected_response = {
5✔
2335
                let state = state_lock.lock_guard().await;
5✔
2336
                let blocks_for_response = blocks.iter().skip(i + 1).cloned().collect_vec();
5✔
2337
                PeerLoopHandler::batch_response(&state, blocks_for_response, &mmra)
5✔
2338
                    .await
5✔
2339
                    .unwrap()
5✔
2340
            };
5✔
2341
            let mock = Mock::new(vec![
5✔
2342
                Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch {
5✔
2343
                    known_blocks: vec![blocks[i].hash()],
5✔
2344
                    max_response_len: 14,
5✔
2345
                    anchor: mmra.clone(),
5✔
2346
                })),
5✔
2347
                Action::Write(PeerMessage::BlockResponseBatch(expected_response)),
5✔
2348
                Action::Read(PeerMessage::Bye),
5✔
2349
            ]);
5✔
2350
            let mut peer_loop_handler = PeerLoopHandler::new(
5✔
2351
                to_main_tx.clone(),
5✔
2352
                state_lock.clone(),
5✔
2353
                peer_address,
5✔
2354
                hsd.clone(),
5✔
2355
                false,
5✔
2356
                1,
5✔
2357
            );
5✔
2358

5✔
2359
            peer_loop_handler
5✔
2360
                .run_wrapper(mock, from_main_rx_clone.resubscribe())
5✔
2361
                .await
5✔
2362
                .unwrap();
5✔
2363
        }
1✔
2364
    }
1✔
2365

2366
    #[traced_test]
×
2367
    #[tokio::test]
2368
    async fn block_request_batch_in_order_test() -> Result<()> {
1✔
2369
        // Scenario: A fork began at block 2, node knows two blocks of height 2 and two of height 3.
1✔
2370
        // A peer requests a batch of blocks starting from block 1. Ensure that the correct blocks
1✔
2371
        // are returned.
1✔
2372

1✔
2373
        let network = Network::Main;
1✔
2374
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, mut state_lock, hsd) =
1✔
2375
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
2376
        let genesis_block: Block = Block::genesis(network);
1✔
2377
        let peer_address = get_dummy_socket_address(0);
1✔
2378
        let [block_1, block_2_a, block_3_a] = fake_valid_sequence_of_blocks_for_tests(
1✔
2379
            &genesis_block,
1✔
2380
            Timestamp::hours(1),
1✔
2381
            StdRng::seed_from_u64(5550001).random(),
1✔
2382
        )
1✔
2383
        .await;
1✔
2384
        let [block_2_b, block_3_b] = fake_valid_sequence_of_blocks_for_tests(
1✔
2385
            &block_1,
1✔
2386
            Timestamp::hours(1),
1✔
2387
            StdRng::seed_from_u64(5550002).random(),
1✔
2388
        )
1✔
2389
        .await;
1✔
2390
        assert_ne!(block_2_b.hash(), block_2_a.hash());
1✔
2391

1✔
2392
        state_lock.set_new_tip(block_1.clone()).await?;
1✔
2393
        state_lock.set_new_tip(block_2_a.clone()).await?;
1✔
2394
        state_lock.set_new_tip(block_2_b.clone()).await?;
1✔
2395
        state_lock.set_new_tip(block_3_b.clone()).await?;
1✔
2396
        state_lock.set_new_tip(block_3_a.clone()).await?;
1✔
2397

1✔
2398
        let anchor = state_lock
1✔
2399
            .lock_guard()
1✔
2400
            .await
1✔
2401
            .chain
1✔
2402
            .archival_state()
1✔
2403
            .archival_block_mmr
1✔
2404
            .ammr()
1✔
2405
            .to_accumulator_async()
1✔
2406
            .await;
1✔
2407
        let response_1 = {
1✔
2408
            let state_lock = state_lock.lock_guard().await;
1✔
2409
            PeerLoopHandler::batch_response(
1✔
2410
                &state_lock,
1✔
2411
                vec![block_1.clone(), block_2_a.clone(), block_3_a.clone()],
1✔
2412
                &anchor,
1✔
2413
            )
1✔
2414
            .await
1✔
2415
            .unwrap()
1✔
2416
        };
1✔
2417

1✔
2418
        let mut mock = Mock::new(vec![
1✔
2419
            Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch {
1✔
2420
                known_blocks: vec![genesis_block.hash()],
1✔
2421
                max_response_len: 14,
1✔
2422
                anchor: anchor.clone(),
1✔
2423
            })),
1✔
2424
            Action::Write(PeerMessage::BlockResponseBatch(response_1)),
1✔
2425
            Action::Read(PeerMessage::Bye),
1✔
2426
        ]);
1✔
2427

1✔
2428
        let mut peer_loop_handler_1 = PeerLoopHandler::with_mocked_time(
1✔
2429
            to_main_tx.clone(),
1✔
2430
            state_lock.clone(),
1✔
2431
            peer_address,
1✔
2432
            hsd.clone(),
1✔
2433
            false,
1✔
2434
            1,
1✔
2435
            block_3_a.header().timestamp,
1✔
2436
        );
1✔
2437

1✔
2438
        peer_loop_handler_1
1✔
2439
            .run_wrapper(mock, from_main_rx_clone.resubscribe())
1✔
2440
            .await?;
1✔
2441

1✔
2442
        // Peer knows block 2_b, verify that canonical chain with 2_a is returned
1✔
2443
        let response_2 = {
1✔
2444
            let state_lock = state_lock.lock_guard().await;
1✔
2445
            PeerLoopHandler::batch_response(
1✔
2446
                &state_lock,
1✔
2447
                vec![block_2_a, block_3_a.clone()],
1✔
2448
                &anchor,
1✔
2449
            )
1✔
2450
            .await
1✔
2451
            .unwrap()
1✔
2452
        };
1✔
2453
        mock = Mock::new(vec![
1✔
2454
            Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch {
1✔
2455
                known_blocks: vec![block_2_b.hash(), block_1.hash(), genesis_block.hash()],
1✔
2456
                max_response_len: 14,
1✔
2457
                anchor,
1✔
2458
            })),
1✔
2459
            Action::Write(PeerMessage::BlockResponseBatch(response_2)),
1✔
2460
            Action::Read(PeerMessage::Bye),
1✔
2461
        ]);
1✔
2462

1✔
2463
        let mut peer_loop_handler_2 = PeerLoopHandler::with_mocked_time(
1✔
2464
            to_main_tx.clone(),
1✔
2465
            state_lock.clone(),
1✔
2466
            peer_address,
1✔
2467
            hsd,
1✔
2468
            false,
1✔
2469
            1,
1✔
2470
            block_3_a.header().timestamp,
1✔
2471
        );
1✔
2472

1✔
2473
        peer_loop_handler_2
1✔
2474
            .run_wrapper(mock, from_main_rx_clone)
1✔
2475
            .await?;
1✔
2476

1✔
2477
        Ok(())
1✔
2478
    }
1✔
2479

2480
    #[traced_test]
×
2481
    #[tokio::test]
2482
    async fn block_request_batch_out_of_order_test() -> Result<()> {
1✔
2483
        // Scenario: A fork began at block 2, node knows two blocks of height 2 and two of height 3.
1✔
2484
        // A peer requests a batch of blocks starting from block 1, but the peer supplies their
1✔
2485
        // hashes in a wrong order. Ensure that the correct blocks are returned, in the right order.
1✔
2486
        // The blocks will be supplied in the correct order but starting from the first digest in
1✔
2487
        // the list that is known and canonical.
1✔
2488

1✔
2489
        let network = Network::Main;
1✔
2490
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, mut state_lock, hsd) =
1✔
2491
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
2492
        let genesis_block = Block::genesis(network);
1✔
2493
        let peer_address = get_dummy_socket_address(0);
1✔
2494
        let [block_1, block_2_a, block_3_a] = fake_valid_sequence_of_blocks_for_tests(
1✔
2495
            &genesis_block,
1✔
2496
            Timestamp::hours(1),
1✔
2497
            StdRng::seed_from_u64(5550001).random(),
1✔
2498
        )
1✔
2499
        .await;
1✔
2500
        let [block_2_b, block_3_b] = fake_valid_sequence_of_blocks_for_tests(
1✔
2501
            &block_1,
1✔
2502
            Timestamp::hours(1),
1✔
2503
            StdRng::seed_from_u64(5550002).random(),
1✔
2504
        )
1✔
2505
        .await;
1✔
2506
        assert_ne!(block_2_a.hash(), block_2_b.hash());
1✔
2507

1✔
2508
        state_lock.set_new_tip(block_1.clone()).await?;
1✔
2509
        state_lock.set_new_tip(block_2_a.clone()).await?;
1✔
2510
        state_lock.set_new_tip(block_2_b.clone()).await?;
1✔
2511
        state_lock.set_new_tip(block_3_b.clone()).await?;
1✔
2512
        state_lock.set_new_tip(block_3_a.clone()).await?;
1✔
2513

1✔
2514
        // Peer knows block 2_b, verify that canonical chain with 2_a is returned
1✔
2515
        let mut expected_anchor = block_3_a.body().block_mmr_accumulator.clone();
1✔
2516
        expected_anchor.append(block_3_a.hash());
1✔
2517
        let state_anchor = state_lock
1✔
2518
            .lock_guard()
1✔
2519
            .await
1✔
2520
            .chain
1✔
2521
            .archival_state()
1✔
2522
            .archival_block_mmr
1✔
2523
            .ammr()
1✔
2524
            .to_accumulator_async()
1✔
2525
            .await;
1✔
2526
        assert_eq!(
1✔
2527
            expected_anchor, state_anchor,
1✔
2528
            "Catching assumption about MMRA in tip and in archival state"
1✔
2529
        );
1✔
2530

1✔
2531
        let response = {
1✔
2532
            let state_lock = state_lock.lock_guard().await;
1✔
2533
            PeerLoopHandler::batch_response(
1✔
2534
                &state_lock,
1✔
2535
                vec![block_1.clone(), block_2_a, block_3_a.clone()],
1✔
2536
                &expected_anchor,
1✔
2537
            )
1✔
2538
            .await
1✔
2539
            .unwrap()
1✔
2540
        };
1✔
2541
        let mock = Mock::new(vec![
1✔
2542
            Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch {
1✔
2543
                known_blocks: vec![block_2_b.hash(), genesis_block.hash(), block_1.hash()],
1✔
2544
                max_response_len: 14,
1✔
2545
                anchor: expected_anchor,
1✔
2546
            })),
1✔
2547
            // Since genesis block is the 1st known in the list of known blocks,
1✔
2548
            // it's immediate descendent, block_1, is the first one returned.
1✔
2549
            Action::Write(PeerMessage::BlockResponseBatch(response)),
1✔
2550
            Action::Read(PeerMessage::Bye),
1✔
2551
        ]);
1✔
2552

1✔
2553
        let mut peer_loop_handler_2 = PeerLoopHandler::with_mocked_time(
1✔
2554
            to_main_tx.clone(),
1✔
2555
            state_lock.clone(),
1✔
2556
            peer_address,
1✔
2557
            hsd,
1✔
2558
            false,
1✔
2559
            1,
1✔
2560
            block_3_a.header().timestamp,
1✔
2561
        );
1✔
2562

1✔
2563
        peer_loop_handler_2
1✔
2564
            .run_wrapper(mock, from_main_rx_clone)
1✔
2565
            .await?;
1✔
2566

1✔
2567
        Ok(())
1✔
2568
    }
1✔
2569

2570
    #[traced_test]
×
2571
    #[tokio::test]
2572
    async fn request_unknown_height_doesnt_crash() {
1✔
2573
        // Scenario: Only genesis block is known. Peer requests block of height
1✔
2574
        // 2.
1✔
2575
        let network = Network::Main;
1✔
2576
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, state_lock, hsd) =
1✔
2577
            get_test_genesis_setup(network, 0, cli_args::Args::default())
1✔
2578
                .await
1✔
2579
                .unwrap();
1✔
2580
        let peer_address = get_dummy_socket_address(0);
1✔
2581
        let mock = Mock::new(vec![
1✔
2582
            Action::Read(PeerMessage::BlockRequestByHeight(2.into())),
1✔
2583
            Action::Read(PeerMessage::Bye),
1✔
2584
        ]);
1✔
2585

1✔
2586
        let mut peer_loop_handler = PeerLoopHandler::new(
1✔
2587
            to_main_tx.clone(),
1✔
2588
            state_lock.clone(),
1✔
2589
            peer_address,
1✔
2590
            hsd,
1✔
2591
            false,
1✔
2592
            1,
1✔
2593
        );
1✔
2594

1✔
2595
        // This will return error if seen read/write order does not match that of the
1✔
2596
        // mocked object.
1✔
2597
        peer_loop_handler
1✔
2598
            .run_wrapper(mock, from_main_rx_clone)
1✔
2599
            .await
1✔
2600
            .unwrap();
1✔
2601

1✔
2602
        // Verify that peer is sanctioned for this nonsense.
1✔
2603
        assert!(state_lock
1✔
2604
            .lock_guard()
1✔
2605
            .await
1✔
2606
            .net
1✔
2607
            .get_peer_standing_from_database(peer_address.ip())
1✔
2608
            .await
1✔
2609
            .unwrap()
1✔
2610
            .standing
1✔
2611
            .is_negative());
1✔
2612
    }
1✔
2613

2614
    #[traced_test]
×
2615
    #[tokio::test]
2616
    async fn find_canonical_chain_when_multiple_blocks_at_same_height_test() -> Result<()> {
1✔
2617
        // Scenario: A fork began at block 2, node knows two blocks of height 2 and two of height 3.
1✔
2618
        // A peer requests a block at height 2. Verify that the correct block at height 2 is
1✔
2619
        // returned.
1✔
2620

1✔
2621
        let network = Network::Main;
1✔
2622
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, _to_main_rx1, mut state_lock, hsd) =
1✔
2623
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
2624
        let genesis_block = Block::genesis(network);
1✔
2625
        let peer_address = get_dummy_socket_address(0);
1✔
2626

1✔
2627
        let [block_1, block_2_a, block_3_a] = fake_valid_sequence_of_blocks_for_tests(
1✔
2628
            &genesis_block,
1✔
2629
            Timestamp::hours(1),
1✔
2630
            StdRng::seed_from_u64(5550001).random(),
1✔
2631
        )
1✔
2632
        .await;
1✔
2633
        let [block_2_b, block_3_b] = fake_valid_sequence_of_blocks_for_tests(
1✔
2634
            &block_1,
1✔
2635
            Timestamp::hours(1),
1✔
2636
            StdRng::seed_from_u64(5550002).random(),
1✔
2637
        )
1✔
2638
        .await;
1✔
2639
        assert_ne!(block_2_a.hash(), block_2_b.hash());
1✔
2640

1✔
2641
        state_lock.set_new_tip(block_1.clone()).await?;
1✔
2642
        state_lock.set_new_tip(block_2_a.clone()).await?;
1✔
2643
        state_lock.set_new_tip(block_2_b.clone()).await?;
1✔
2644
        state_lock.set_new_tip(block_3_b.clone()).await?;
1✔
2645
        state_lock.set_new_tip(block_3_a.clone()).await?;
1✔
2646

1✔
2647
        let mock = Mock::new(vec![
1✔
2648
            Action::Read(PeerMessage::BlockRequestByHeight(2.into())),
1✔
2649
            Action::Write(PeerMessage::Block(Box::new(block_2_a.try_into().unwrap()))),
1✔
2650
            Action::Read(PeerMessage::BlockRequestByHeight(3.into())),
1✔
2651
            Action::Write(PeerMessage::Block(Box::new(
1✔
2652
                block_3_a.clone().try_into().unwrap(),
1✔
2653
            ))),
1✔
2654
            Action::Read(PeerMessage::Bye),
1✔
2655
        ]);
1✔
2656

1✔
2657
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
2658
            to_main_tx.clone(),
1✔
2659
            state_lock.clone(),
1✔
2660
            peer_address,
1✔
2661
            hsd,
1✔
2662
            false,
1✔
2663
            1,
1✔
2664
            block_3_a.header().timestamp,
1✔
2665
        );
1✔
2666

1✔
2667
        // This will return error if seen read/write order does not match that of the
1✔
2668
        // mocked object.
1✔
2669
        peer_loop_handler
1✔
2670
            .run_wrapper(mock, from_main_rx_clone)
1✔
2671
            .await?;
1✔
2672

1✔
2673
        Ok(())
1✔
2674
    }
1✔
2675

2676
    #[traced_test]
×
2677
    #[tokio::test]
2678
    async fn receival_of_block_notification_height_1() {
1✔
2679
        // Scenario: client only knows genesis block. Then receives block
1✔
2680
        // notification of height 1. Must request block 1.
1✔
2681
        let network = Network::Main;
1✔
2682
        let mut rng = StdRng::seed_from_u64(5552401);
1✔
2683
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, to_main_rx1, state_lock, hsd) =
1✔
2684
            get_test_genesis_setup(network, 0, cli_args::Args::default())
1✔
2685
                .await
1✔
2686
                .unwrap();
1✔
2687
        let block_1 = fake_valid_block_for_tests(&state_lock, rng.random()).await;
1✔
2688
        let notification_height1 = (&block_1).into();
1✔
2689
        let mock = Mock::new(vec![
1✔
2690
            Action::Read(PeerMessage::BlockNotification(notification_height1)),
1✔
2691
            Action::Write(PeerMessage::BlockRequestByHeight(1u64.into())),
1✔
2692
            Action::Read(PeerMessage::Bye),
1✔
2693
        ]);
1✔
2694

1✔
2695
        let peer_address = get_dummy_socket_address(0);
1✔
2696
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
2697
            to_main_tx.clone(),
1✔
2698
            state_lock.clone(),
1✔
2699
            peer_address,
1✔
2700
            hsd,
1✔
2701
            false,
1✔
2702
            1,
1✔
2703
            block_1.header().timestamp,
1✔
2704
        );
1✔
2705
        peer_loop_handler
1✔
2706
            .run_wrapper(mock, from_main_rx_clone)
1✔
2707
            .await
1✔
2708
            .unwrap();
1✔
2709

1✔
2710
        drop(to_main_rx1);
1✔
2711
    }
1✔
2712

2713
    #[traced_test]
×
2714
    #[tokio::test]
2715
    async fn receive_block_request_by_height_block_7() {
1✔
2716
        // Scenario: client only knows blocks up to height 7. Then receives block-
1✔
2717
        // request-by-height for height 7. Must respond with block 7.
1✔
2718
        let network = Network::Main;
1✔
2719
        let mut rng = StdRng::seed_from_u64(5552401);
1✔
2720
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, to_main_rx1, mut state_lock, hsd) =
1✔
2721
            get_test_genesis_setup(network, 0, cli_args::Args::default())
1✔
2722
                .await
1✔
2723
                .unwrap();
1✔
2724
        let genesis_block = Block::genesis(network);
1✔
2725
        let blocks: [Block; 7] = fake_valid_sequence_of_blocks_for_tests(
1✔
2726
            &genesis_block,
1✔
2727
            Timestamp::hours(1),
1✔
2728
            rng.random(),
1✔
2729
        )
1✔
2730
        .await;
1✔
2731
        let block7 = blocks.last().unwrap().to_owned();
1✔
2732
        let tip_height: u64 = block7.header().height.into();
1✔
2733
        assert_eq!(7, tip_height);
1✔
2734

1✔
2735
        for block in &blocks {
8✔
2736
            state_lock.set_new_tip(block.to_owned()).await.unwrap();
7✔
2737
        }
1✔
2738

1✔
2739
        let block7_response = PeerMessage::Block(Box::new(block7.try_into().unwrap()));
1✔
2740
        let mock = Mock::new(vec![
1✔
2741
            Action::Read(PeerMessage::BlockRequestByHeight(7u64.into())),
1✔
2742
            Action::Write(block7_response),
1✔
2743
            Action::Read(PeerMessage::Bye),
1✔
2744
        ]);
1✔
2745

1✔
2746
        let peer_address = get_dummy_socket_address(0);
1✔
2747
        let mut peer_loop_handler = PeerLoopHandler::new(
1✔
2748
            to_main_tx.clone(),
1✔
2749
            state_lock.clone(),
1✔
2750
            peer_address,
1✔
2751
            hsd,
1✔
2752
            false,
1✔
2753
            1,
1✔
2754
        );
1✔
2755
        peer_loop_handler
1✔
2756
            .run_wrapper(mock, from_main_rx_clone)
1✔
2757
            .await
1✔
2758
            .unwrap();
1✔
2759

1✔
2760
        drop(to_main_rx1);
1✔
2761
    }
1✔
2762

2763
    #[traced_test]
×
2764
    #[tokio::test]
2765
    async fn test_peer_loop_receival_of_first_block() -> Result<()> {
1✔
2766
        // Scenario: client only knows genesis block. Then receives block 1.
1✔
2767

1✔
2768
        let network = Network::Main;
1✔
2769
        let mut rng = StdRng::seed_from_u64(5550001);
1✔
2770
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, mut to_main_rx1, state_lock, hsd) =
1✔
2771
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
2772
        let peer_address = get_dummy_socket_address(0);
1✔
2773

1✔
2774
        let block_1 = fake_valid_block_for_tests(&state_lock, rng.random()).await;
1✔
2775
        let mock = Mock::new(vec![
1✔
2776
            Action::Read(PeerMessage::Block(Box::new(
1✔
2777
                block_1.clone().try_into().unwrap(),
1✔
2778
            ))),
1✔
2779
            Action::Read(PeerMessage::Bye),
1✔
2780
        ]);
1✔
2781

1✔
2782
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
2783
            to_main_tx.clone(),
1✔
2784
            state_lock.clone(),
1✔
2785
            peer_address,
1✔
2786
            hsd,
1✔
2787
            false,
1✔
2788
            1,
1✔
2789
            block_1.header().timestamp,
1✔
2790
        );
1✔
2791
        peer_loop_handler
1✔
2792
            .run_wrapper(mock, from_main_rx_clone)
1✔
2793
            .await?;
1✔
2794

1✔
2795
        // Verify that a block was sent to `main_loop`
1✔
2796
        match to_main_rx1.recv().await {
1✔
2797
            Some(PeerTaskToMain::NewBlocks(_block)) => (),
1✔
2798
            _ => bail!("Did not find msg sent to main task"),
1✔
2799
        };
1✔
2800

1✔
2801
        match to_main_rx1.recv().await {
1✔
2802
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
1✔
2803
            _ => bail!("Must receive remove of peer block max height"),
1✔
2804
        }
1✔
2805

1✔
2806
        if !state_lock.lock_guard().await.net.peer_map.is_empty() {
1✔
2807
            bail!("peer map must be empty after closing connection gracefully");
1✔
2808
        }
1✔
2809

1✔
2810
        Ok(())
1✔
2811
    }
1✔
2812

2813
    #[traced_test]
×
2814
    #[tokio::test]
2815
    async fn test_peer_loop_receival_of_second_block_no_blocks_in_db() -> Result<()> {
1✔
2816
        // In this scenario, the client only knows the genesis block (block 0) and then
1✔
2817
        // receives block 2, meaning that block 1 will have to be requested.
1✔
2818

1✔
2819
        let network = Network::Main;
1✔
2820
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, mut to_main_rx1, state_lock, hsd) =
1✔
2821
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
2822
        let peer_address = get_dummy_socket_address(0);
1✔
2823
        let genesis_block: Block = state_lock
1✔
2824
            .lock_guard()
1✔
2825
            .await
1✔
2826
            .chain
1✔
2827
            .archival_state()
1✔
2828
            .get_tip()
1✔
2829
            .await;
1✔
2830
        let [block_1, block_2] = fake_valid_sequence_of_blocks_for_tests(
1✔
2831
            &genesis_block,
1✔
2832
            Timestamp::hours(1),
1✔
2833
            StdRng::seed_from_u64(5550001).random(),
1✔
2834
        )
1✔
2835
        .await;
1✔
2836

1✔
2837
        let mock = Mock::new(vec![
1✔
2838
            Action::Read(PeerMessage::Block(Box::new(
1✔
2839
                block_2.clone().try_into().unwrap(),
1✔
2840
            ))),
1✔
2841
            Action::Write(PeerMessage::BlockRequestByHash(block_1.hash())),
1✔
2842
            Action::Read(PeerMessage::Block(Box::new(
1✔
2843
                block_1.clone().try_into().unwrap(),
1✔
2844
            ))),
1✔
2845
            Action::Read(PeerMessage::Bye),
1✔
2846
        ]);
1✔
2847

1✔
2848
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
2849
            to_main_tx.clone(),
1✔
2850
            state_lock.clone(),
1✔
2851
            peer_address,
1✔
2852
            hsd,
1✔
2853
            true,
1✔
2854
            1,
1✔
2855
            block_2.header().timestamp,
1✔
2856
        );
1✔
2857
        peer_loop_handler
1✔
2858
            .run_wrapper(mock, from_main_rx_clone)
1✔
2859
            .await?;
1✔
2860

1✔
2861
        match to_main_rx1.recv().await {
1✔
2862
            Some(PeerTaskToMain::NewBlocks(blocks)) => {
1✔
2863
                if blocks[0].hash() != block_1.hash() {
1✔
2864
                    bail!("1st received block by main loop must be block 1");
1✔
2865
                }
1✔
2866
                if blocks[1].hash() != block_2.hash() {
1✔
2867
                    bail!("2nd received block by main loop must be block 2");
1✔
2868
                }
1✔
2869
            }
1✔
2870
            _ => bail!("Did not find msg sent to main task 1"),
1✔
2871
        };
1✔
2872
        match to_main_rx1.recv().await {
1✔
2873
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
1✔
2874
            _ => bail!("Must receive remove of peer block max height"),
1✔
2875
        }
1✔
2876

1✔
2877
        if !state_lock.lock_guard().await.net.peer_map.is_empty() {
1✔
2878
            bail!("peer map must be empty after closing connection gracefully");
1✔
2879
        }
1✔
2880

1✔
2881
        Ok(())
1✔
2882
    }
1✔
2883

2884
    #[traced_test]
×
2885
    #[tokio::test]
2886
    async fn prevent_ram_exhaustion_test() -> Result<()> {
1✔
2887
        // In this scenario the peer sends more blocks than the client allows to store in the
1✔
2888
        // fork-reconciliation field. This should result in abandonment of the fork-reconciliation
1✔
2889
        // process as the alternative is that the program will crash because it runs out of RAM.
1✔
2890

1✔
2891
        let network = Network::Main;
1✔
2892
        let mut rng = StdRng::seed_from_u64(5550001);
1✔
2893
        let (
1✔
2894
            _peer_broadcast_tx,
1✔
2895
            from_main_rx_clone,
1✔
2896
            to_main_tx,
1✔
2897
            mut to_main_rx1,
1✔
2898
            mut state_lock,
1✔
2899
            _hsd,
1✔
2900
        ) = get_test_genesis_setup(network, 1, cli_args::Args::default()).await?;
1✔
2901
        let genesis_block = Block::genesis(network);
1✔
2902

1✔
2903
        // Restrict max number of blocks held in memory to 2.
1✔
2904
        let mut cli = state_lock.cli().clone();
1✔
2905
        cli.sync_mode_threshold = 2;
1✔
2906
        state_lock.set_cli(cli).await;
1✔
2907

1✔
2908
        let (hsd1, peer_address1) = get_dummy_peer_connection_data_genesis(Network::Alpha, 1);
1✔
2909
        let [block_1, _block_2, block_3, block_4] = fake_valid_sequence_of_blocks_for_tests(
1✔
2910
            &genesis_block,
1✔
2911
            Timestamp::hours(1),
1✔
2912
            rng.random(),
1✔
2913
        )
1✔
2914
        .await;
1✔
2915
        state_lock.set_new_tip(block_1.clone()).await?;
1✔
2916

1✔
2917
        let mock = Mock::new(vec![
1✔
2918
            Action::Read(PeerMessage::Block(Box::new(
1✔
2919
                block_4.clone().try_into().unwrap(),
1✔
2920
            ))),
1✔
2921
            Action::Write(PeerMessage::BlockRequestByHash(block_3.hash())),
1✔
2922
            Action::Read(PeerMessage::Block(Box::new(
1✔
2923
                block_3.clone().try_into().unwrap(),
1✔
2924
            ))),
1✔
2925
            Action::Read(PeerMessage::Bye),
1✔
2926
        ]);
1✔
2927

1✔
2928
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
2929
            to_main_tx.clone(),
1✔
2930
            state_lock.clone(),
1✔
2931
            peer_address1,
1✔
2932
            hsd1,
1✔
2933
            true,
1✔
2934
            1,
1✔
2935
            block_4.header().timestamp,
1✔
2936
        );
1✔
2937
        peer_loop_handler
1✔
2938
            .run_wrapper(mock, from_main_rx_clone)
1✔
2939
            .await?;
1✔
2940

1✔
2941
        match to_main_rx1.recv().await {
1✔
2942
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
1✔
2943
            _ => bail!("Must receive remove of peer block max height"),
1✔
2944
        }
1✔
2945

1✔
2946
        // Verify that no block is sent to main loop.
1✔
2947
        match to_main_rx1.try_recv() {
1✔
2948
            Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (),
1✔
2949
            _ => bail!("Peer must not handle more fork-reconciliation blocks than specified in CLI arguments"),
1✔
2950
        };
1✔
2951
        drop(to_main_tx);
1✔
2952

1✔
2953
        // Verify that peer is sanctioned for failed fork reconciliation attempt
1✔
2954
        assert!(state_lock
1✔
2955
            .lock_guard()
1✔
2956
            .await
1✔
2957
            .net
1✔
2958
            .get_peer_standing_from_database(peer_address1.ip())
1✔
2959
            .await
1✔
2960
            .unwrap()
1✔
2961
            .standing
1✔
2962
            .is_negative());
1✔
2963

1✔
2964
        Ok(())
1✔
2965
    }
1✔
2966

2967
    #[traced_test]
×
2968
    #[tokio::test]
2969
    async fn test_peer_loop_receival_of_fourth_block_one_block_in_db() {
1✔
2970
        // In this scenario, the client know the genesis block (block 0) and block 1, it
1✔
2971
        // then receives block 4, meaning that block 3 and 2 will have to be requested.
1✔
2972

1✔
2973
        let network = Network::Main;
1✔
2974
        let (
1✔
2975
            _peer_broadcast_tx,
1✔
2976
            from_main_rx_clone,
1✔
2977
            to_main_tx,
1✔
2978
            mut to_main_rx1,
1✔
2979
            mut state_lock,
1✔
2980
            hsd,
1✔
2981
        ) = get_test_genesis_setup(network, 0, cli_args::Args::default())
1✔
2982
            .await
1✔
2983
            .unwrap();
1✔
2984
        let peer_address: SocketAddr = get_dummy_socket_address(0);
1✔
2985
        let genesis_block = Block::genesis(network);
1✔
2986
        let [block_1, block_2, block_3, block_4] = fake_valid_sequence_of_blocks_for_tests(
1✔
2987
            &genesis_block,
1✔
2988
            Timestamp::hours(1),
1✔
2989
            StdRng::seed_from_u64(5550001).random(),
1✔
2990
        )
1✔
2991
        .await;
1✔
2992
        state_lock.set_new_tip(block_1.clone()).await.unwrap();
1✔
2993

1✔
2994
        let mock = Mock::new(vec![
1✔
2995
            Action::Read(PeerMessage::Block(Box::new(
1✔
2996
                block_4.clone().try_into().unwrap(),
1✔
2997
            ))),
1✔
2998
            Action::Write(PeerMessage::BlockRequestByHash(block_3.hash())),
1✔
2999
            Action::Read(PeerMessage::Block(Box::new(
1✔
3000
                block_3.clone().try_into().unwrap(),
1✔
3001
            ))),
1✔
3002
            Action::Write(PeerMessage::BlockRequestByHash(block_2.hash())),
1✔
3003
            Action::Read(PeerMessage::Block(Box::new(
1✔
3004
                block_2.clone().try_into().unwrap(),
1✔
3005
            ))),
1✔
3006
            Action::Read(PeerMessage::Bye),
1✔
3007
        ]);
1✔
3008

1✔
3009
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
3010
            to_main_tx.clone(),
1✔
3011
            state_lock.clone(),
1✔
3012
            peer_address,
1✔
3013
            hsd,
1✔
3014
            true,
1✔
3015
            1,
1✔
3016
            block_4.header().timestamp,
1✔
3017
        );
1✔
3018
        peer_loop_handler
1✔
3019
            .run_wrapper(mock, from_main_rx_clone)
1✔
3020
            .await
1✔
3021
            .unwrap();
1✔
3022

1✔
3023
        let Some(PeerTaskToMain::NewBlocks(blocks)) = to_main_rx1.recv().await else {
1✔
3024
            panic!("Did not find msg sent to main task");
1✔
3025
        };
1✔
3026
        assert_eq!(blocks[0].hash(), block_2.hash());
1✔
3027
        assert_eq!(blocks[1].hash(), block_3.hash());
1✔
3028
        assert_eq!(blocks[2].hash(), block_4.hash());
1✔
3029

1✔
3030
        let Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) = to_main_rx1.recv().await else {
1✔
3031
            panic!("Must receive remove of peer block max height");
1✔
3032
        };
1✔
3033

1✔
3034
        assert!(
1✔
3035
            state_lock.lock_guard().await.net.peer_map.is_empty(),
1✔
3036
            "peer map must be empty after closing connection gracefully"
1✔
3037
        );
1✔
3038
    }
1✔
3039

3040
    #[traced_test]
×
3041
    #[tokio::test]
3042
    async fn test_peer_loop_receival_of_third_block_no_blocks_in_db() -> Result<()> {
1✔
3043
        // In this scenario, the client only knows the genesis block (block 0) and then
1✔
3044
        // receives block 3, meaning that block 2 and 1 will have to be requested.
1✔
3045

1✔
3046
        let network = Network::Main;
1✔
3047
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, mut to_main_rx1, state_lock, hsd) =
1✔
3048
            get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
3049
        let peer_address = get_dummy_socket_address(0);
1✔
3050
        let genesis_block = Block::genesis(network);
1✔
3051

1✔
3052
        let [block_1, block_2, block_3] = fake_valid_sequence_of_blocks_for_tests(
1✔
3053
            &genesis_block,
1✔
3054
            Timestamp::hours(1),
1✔
3055
            StdRng::seed_from_u64(5550001).random(),
1✔
3056
        )
1✔
3057
        .await;
1✔
3058

1✔
3059
        let mock = Mock::new(vec![
1✔
3060
            Action::Read(PeerMessage::Block(Box::new(
1✔
3061
                block_3.clone().try_into().unwrap(),
1✔
3062
            ))),
1✔
3063
            Action::Write(PeerMessage::BlockRequestByHash(block_2.hash())),
1✔
3064
            Action::Read(PeerMessage::Block(Box::new(
1✔
3065
                block_2.clone().try_into().unwrap(),
1✔
3066
            ))),
1✔
3067
            Action::Write(PeerMessage::BlockRequestByHash(block_1.hash())),
1✔
3068
            Action::Read(PeerMessage::Block(Box::new(
1✔
3069
                block_1.clone().try_into().unwrap(),
1✔
3070
            ))),
1✔
3071
            Action::Read(PeerMessage::Bye),
1✔
3072
        ]);
1✔
3073

1✔
3074
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
3075
            to_main_tx.clone(),
1✔
3076
            state_lock.clone(),
1✔
3077
            peer_address,
1✔
3078
            hsd,
1✔
3079
            true,
1✔
3080
            1,
1✔
3081
            block_3.header().timestamp,
1✔
3082
        );
1✔
3083
        peer_loop_handler
1✔
3084
            .run_wrapper(mock, from_main_rx_clone)
1✔
3085
            .await?;
1✔
3086

1✔
3087
        match to_main_rx1.recv().await {
1✔
3088
            Some(PeerTaskToMain::NewBlocks(blocks)) => {
1✔
3089
                if blocks[0].hash() != block_1.hash() {
1✔
3090
                    bail!("1st received block by main loop must be block 1");
1✔
3091
                }
1✔
3092
                if blocks[1].hash() != block_2.hash() {
1✔
3093
                    bail!("2nd received block by main loop must be block 2");
1✔
3094
                }
1✔
3095
                if blocks[2].hash() != block_3.hash() {
1✔
3096
                    bail!("3rd received block by main loop must be block 3");
1✔
3097
                }
1✔
3098
            }
1✔
3099
            _ => bail!("Did not find msg sent to main task"),
1✔
3100
        };
1✔
3101
        match to_main_rx1.recv().await {
1✔
3102
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
1✔
3103
            _ => bail!("Must receive remove of peer block max height"),
1✔
3104
        }
1✔
3105

1✔
3106
        if !state_lock.lock_guard().await.net.peer_map.is_empty() {
1✔
3107
            bail!("peer map must be empty after closing connection gracefully");
1✔
3108
        }
1✔
3109

1✔
3110
        Ok(())
1✔
3111
    }
1✔
3112

3113
    #[traced_test]
×
3114
    #[tokio::test]
3115
    async fn test_block_reconciliation_interrupted_by_block_notification() -> Result<()> {
1✔
3116
        // In this scenario, the client know the genesis block (block 0) and block 1, it
1✔
3117
        // then receives block 4, meaning that block 3, 2, and 1 will have to be requested.
1✔
3118
        // But the requests are interrupted by the peer sending another message: a new block
1✔
3119
        // notification.
1✔
3120

1✔
3121
        let network = Network::Main;
1✔
3122
        let (
1✔
3123
            _peer_broadcast_tx,
1✔
3124
            from_main_rx_clone,
1✔
3125
            to_main_tx,
1✔
3126
            mut to_main_rx1,
1✔
3127
            mut state_lock,
1✔
3128
            hsd,
1✔
3129
        ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
3130
        let peer_socket_address: SocketAddr = get_dummy_socket_address(0);
1✔
3131
        let genesis_block: Block = state_lock
1✔
3132
            .lock_guard()
1✔
3133
            .await
1✔
3134
            .chain
1✔
3135
            .archival_state()
1✔
3136
            .get_tip()
1✔
3137
            .await;
1✔
3138

1✔
3139
        let [block_1, block_2, block_3, block_4, block_5] =
1✔
3140
            fake_valid_sequence_of_blocks_for_tests(
1✔
3141
                &genesis_block,
1✔
3142
                Timestamp::hours(1),
1✔
3143
                StdRng::seed_from_u64(5550001).random(),
1✔
3144
            )
1✔
3145
            .await;
1✔
3146
        state_lock.set_new_tip(block_1.clone()).await?;
1✔
3147

1✔
3148
        let mock = Mock::new(vec![
1✔
3149
            Action::Read(PeerMessage::Block(Box::new(
1✔
3150
                block_4.clone().try_into().unwrap(),
1✔
3151
            ))),
1✔
3152
            Action::Write(PeerMessage::BlockRequestByHash(block_3.hash())),
1✔
3153
            Action::Read(PeerMessage::Block(Box::new(
1✔
3154
                block_3.clone().try_into().unwrap(),
1✔
3155
            ))),
1✔
3156
            Action::Write(PeerMessage::BlockRequestByHash(block_2.hash())),
1✔
3157
            //
1✔
3158
            // Now make the interruption of the block reconciliation process
1✔
3159
            Action::Read(PeerMessage::BlockNotification((&block_5).into())),
1✔
3160
            //
1✔
3161
            // Complete the block reconciliation process by requesting the last block
1✔
3162
            // in this process, to get back to a mutually known block.
1✔
3163
            Action::Read(PeerMessage::Block(Box::new(
1✔
3164
                block_2.clone().try_into().unwrap(),
1✔
3165
            ))),
1✔
3166
            //
1✔
3167
            // Then anticipate the request of the block that was announced
1✔
3168
            // in the interruption.
1✔
3169
            // Note that we cannot anticipate the response, as only the main
1✔
3170
            // task writes to the database. And the database needs to be updated
1✔
3171
            // for the handling of block 5 to be done correctly.
1✔
3172
            Action::Write(PeerMessage::BlockRequestByHeight(
1✔
3173
                block_5.kernel.header.height,
1✔
3174
            )),
1✔
3175
            Action::Read(PeerMessage::Bye),
1✔
3176
        ]);
1✔
3177

1✔
3178
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
3179
            to_main_tx.clone(),
1✔
3180
            state_lock.clone(),
1✔
3181
            peer_socket_address,
1✔
3182
            hsd,
1✔
3183
            false,
1✔
3184
            1,
1✔
3185
            block_5.header().timestamp,
1✔
3186
        );
1✔
3187
        peer_loop_handler
1✔
3188
            .run_wrapper(mock, from_main_rx_clone)
1✔
3189
            .await?;
1✔
3190

1✔
3191
        match to_main_rx1.recv().await {
1✔
3192
            Some(PeerTaskToMain::NewBlocks(blocks)) => {
1✔
3193
                if blocks[0].hash() != block_2.hash() {
1✔
3194
                    bail!("1st received block by main loop must be block 1");
1✔
3195
                }
1✔
3196
                if blocks[1].hash() != block_3.hash() {
1✔
3197
                    bail!("2nd received block by main loop must be block 2");
1✔
3198
                }
1✔
3199
                if blocks[2].hash() != block_4.hash() {
1✔
3200
                    bail!("3rd received block by main loop must be block 3");
1✔
3201
                }
1✔
3202
            }
1✔
3203
            _ => bail!("Did not find msg sent to main task"),
1✔
3204
        };
1✔
3205
        match to_main_rx1.recv().await {
1✔
3206
            Some(PeerTaskToMain::RemovePeerMaxBlockHeight(_)) => (),
1✔
3207
            _ => bail!("Must receive remove of peer block max height"),
1✔
3208
        }
1✔
3209

1✔
3210
        if !state_lock.lock_guard().await.net.peer_map.is_empty() {
1✔
3211
            bail!("peer map must be empty after closing connection gracefully");
1✔
3212
        }
1✔
3213

1✔
3214
        Ok(())
1✔
3215
    }
1✔
3216

3217
    #[traced_test]
×
3218
    #[tokio::test]
3219
    async fn test_block_reconciliation_interrupted_by_peer_list_request() -> Result<()> {
1✔
3220
        // In this scenario, the client knows the genesis block (block 0) and block 1, it
1✔
3221
        // then receives block 4, meaning that block 3, 2, and 1 will have to be requested.
1✔
3222
        // But the requests are interrupted by the peer sending another message: a request
1✔
3223
        // for a list of peers.
1✔
3224

1✔
3225
        let network = Network::Main;
1✔
3226
        let (
1✔
3227
            _peer_broadcast_tx,
1✔
3228
            from_main_rx_clone,
1✔
3229
            to_main_tx,
1✔
3230
            mut to_main_rx1,
1✔
3231
            mut state_lock,
1✔
3232
            _hsd,
1✔
3233
        ) = get_test_genesis_setup(network, 1, cli_args::Args::default()).await?;
1✔
3234
        let genesis_block = Block::genesis(network);
1✔
3235
        let peer_infos: Vec<PeerInfo> = state_lock
1✔
3236
            .lock_guard()
1✔
3237
            .await
1✔
3238
            .net
1✔
3239
            .peer_map
1✔
3240
            .clone()
1✔
3241
            .into_values()
1✔
3242
            .collect::<Vec<_>>();
1✔
3243

1✔
3244
        let [block_1, block_2, block_3, block_4] = fake_valid_sequence_of_blocks_for_tests(
1✔
3245
            &genesis_block,
1✔
3246
            Timestamp::hours(1),
1✔
3247
            StdRng::seed_from_u64(5550001).random(),
1✔
3248
        )
1✔
3249
        .await;
1✔
3250
        state_lock.set_new_tip(block_1.clone()).await?;
1✔
3251

1✔
3252
        let (hsd_1, sa_1) = get_dummy_peer_connection_data_genesis(network, 1);
1✔
3253
        let expected_peer_list_resp = vec![
1✔
3254
            (
1✔
3255
                peer_infos[0].listen_address().unwrap(),
1✔
3256
                peer_infos[0].instance_id(),
1✔
3257
            ),
1✔
3258
            (sa_1, hsd_1.instance_id),
1✔
3259
        ];
1✔
3260
        let mock = Mock::new(vec![
1✔
3261
            Action::Read(PeerMessage::Block(Box::new(
1✔
3262
                block_4.clone().try_into().unwrap(),
1✔
3263
            ))),
1✔
3264
            Action::Write(PeerMessage::BlockRequestByHash(block_3.hash())),
1✔
3265
            Action::Read(PeerMessage::Block(Box::new(
1✔
3266
                block_3.clone().try_into().unwrap(),
1✔
3267
            ))),
1✔
3268
            Action::Write(PeerMessage::BlockRequestByHash(block_2.hash())),
1✔
3269
            //
1✔
3270
            // Now make the interruption of the block reconciliation process
1✔
3271
            Action::Read(PeerMessage::PeerListRequest),
1✔
3272
            //
1✔
3273
            // Answer the request for a peer list
1✔
3274
            Action::Write(PeerMessage::PeerListResponse(expected_peer_list_resp)),
1✔
3275
            //
1✔
3276
            // Complete the block reconciliation process by requesting the last block
1✔
3277
            // in this process, to get back to a mutually known block.
1✔
3278
            Action::Read(PeerMessage::Block(Box::new(
1✔
3279
                block_2.clone().try_into().unwrap(),
1✔
3280
            ))),
1✔
3281
            Action::Read(PeerMessage::Bye),
1✔
3282
        ]);
1✔
3283

1✔
3284
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
3285
            to_main_tx,
1✔
3286
            state_lock.clone(),
1✔
3287
            sa_1,
1✔
3288
            hsd_1,
1✔
3289
            true,
1✔
3290
            1,
1✔
3291
            block_4.header().timestamp,
1✔
3292
        );
1✔
3293
        peer_loop_handler
1✔
3294
            .run_wrapper(mock, from_main_rx_clone)
1✔
3295
            .await?;
1✔
3296

1✔
3297
        // Verify that blocks are sent to `main_loop` in expected ordering
1✔
3298
        match to_main_rx1.recv().await {
1✔
3299
            Some(PeerTaskToMain::NewBlocks(blocks)) => {
1✔
3300
                if blocks[0].hash() != block_2.hash() {
1✔
3301
                    bail!("1st received block by main loop must be block 1");
1✔
3302
                }
1✔
3303
                if blocks[1].hash() != block_3.hash() {
1✔
3304
                    bail!("2nd received block by main loop must be block 2");
1✔
3305
                }
1✔
3306
                if blocks[2].hash() != block_4.hash() {
1✔
3307
                    bail!("3rd received block by main loop must be block 3");
1✔
3308
                }
1✔
3309
            }
1✔
3310
            _ => bail!("Did not find msg sent to main task"),
1✔
3311
        };
1✔
3312

1✔
3313
        assert_eq!(
1✔
3314
            1,
1✔
3315
            state_lock.lock_guard().await.net.peer_map.len(),
1✔
3316
            "One peer must remain in peer list after peer_1 closed gracefully"
1✔
3317
        );
1✔
3318

1✔
3319
        Ok(())
1✔
3320
    }
1✔
3321

NEW
3322
    #[traced_test]
×
3323
    #[tokio::test]
3324
    async fn receive_transaction_request() {
1✔
3325
        let network = Network::Main;
1✔
3326
        let dummy_tx = invalid_empty_single_proof_transaction();
1✔
3327
        let txid = dummy_tx.kernel.txid();
1✔
3328

1✔
3329
        for transaction_is_known in [false, true] {
3✔
3330
            let (_peer_broadcast_tx, from_main_rx, to_main_tx, _, mut state_lock, _hsd) =
2✔
3331
                get_test_genesis_setup(network, 1, cli_args::Args::default())
2✔
3332
                    .await
2✔
3333
                    .unwrap();
2✔
3334
            if transaction_is_known {
2✔
3335
                state_lock
1✔
3336
                    .lock_guard_mut()
1✔
3337
                    .await
1✔
3338
                    .mempool_insert(dummy_tx.clone(), TransactionOrigin::Own)
1✔
3339
                    .await;
1✔
3340
            }
1✔
3341

1✔
3342
            let mock = if transaction_is_known {
2✔
3343
                Mock::new(vec![
1✔
3344
                    Action::Read(PeerMessage::TransactionRequest(txid)),
1✔
3345
                    Action::Write(PeerMessage::Transaction(Box::new(
1✔
3346
                        (&dummy_tx).try_into().unwrap(),
1✔
3347
                    ))),
1✔
3348
                    Action::Read(PeerMessage::Bye),
1✔
3349
                ])
1✔
3350
            } else {
1✔
3351
                Mock::new(vec![
1✔
3352
                    Action::Read(PeerMessage::TransactionRequest(txid)),
1✔
3353
                    Action::Read(PeerMessage::Bye),
1✔
3354
                ])
1✔
3355
            };
1✔
3356

1✔
3357
            let hsd = get_dummy_handshake_data_for_genesis(network);
2✔
3358
            let mut peer_state = MutablePeerState::new(hsd.tip_header.height);
2✔
3359
            let mut peer_loop_handler = PeerLoopHandler::new(
2✔
3360
                to_main_tx,
2✔
3361
                state_lock,
2✔
3362
                get_dummy_socket_address(0),
2✔
3363
                hsd,
2✔
3364
                true,
2✔
3365
                1,
2✔
3366
            );
2✔
3367

2✔
3368
            peer_loop_handler
2✔
3369
                .run(mock, from_main_rx, &mut peer_state)
2✔
3370
                .await
2✔
3371
                .unwrap();
2✔
3372
        }
1✔
3373
    }
1✔
3374

UNCOV
3375
    #[traced_test]
×
3376
    #[tokio::test]
3377
    async fn empty_mempool_request_tx_test() {
1✔
3378
        // In this scenario the client receives a transaction notification from
1✔
3379
        // a peer of a transaction it doesn't know; the client must then request it.
1✔
3380

1✔
3381
        let network = Network::Main;
1✔
3382
        let (_peer_broadcast_tx, from_main_rx_clone, to_main_tx, mut to_main_rx1, state_lock, _hsd) =
1✔
3383
            get_test_genesis_setup(network, 1, cli_args::Args::default())
1✔
3384
                .await
1✔
3385
                .unwrap();
1✔
3386

1✔
3387
        let spending_key = state_lock
1✔
3388
            .lock_guard()
1✔
3389
            .await
1✔
3390
            .wallet_state
1✔
3391
            .wallet_entropy
1✔
3392
            .nth_symmetric_key_for_tests(0);
1✔
3393
        let genesis_block = Block::genesis(network);
1✔
3394
        let now = genesis_block.kernel.header.timestamp;
1✔
3395
        let (transaction_1, _, _change_output) = state_lock
1✔
3396
            .lock_guard()
1✔
3397
            .await
1✔
3398
            .create_transaction_with_prover_capability(
1✔
3399
                Default::default(),
1✔
3400
                spending_key.into(),
1✔
3401
                UtxoNotificationMedium::OffChain,
1✔
3402
                NativeCurrencyAmount::coins(0),
1✔
3403
                now,
1✔
3404
                TxProvingCapability::ProofCollection,
1✔
3405
                &TritonVmJobQueue::dummy(),
1✔
3406
            )
1✔
3407
            .await
1✔
3408
            .unwrap();
1✔
3409

1✔
3410
        // Build the resulting transaction notification
1✔
3411
        let tx_notification: TransactionNotification = (&transaction_1).try_into().unwrap();
1✔
3412
        let mock = Mock::new(vec![
1✔
3413
            Action::Read(PeerMessage::TransactionNotification(tx_notification)),
1✔
3414
            Action::Write(PeerMessage::TransactionRequest(tx_notification.txid)),
1✔
3415
            Action::Read(PeerMessage::Transaction(Box::new(
1✔
3416
                (&transaction_1).try_into().unwrap(),
1✔
3417
            ))),
1✔
3418
            Action::Read(PeerMessage::Bye),
1✔
3419
        ]);
1✔
3420

1✔
3421
        let (hsd_1, _sa_1) = get_dummy_peer_connection_data_genesis(network, 1);
1✔
3422

1✔
3423
        // Mock a timestamp to allow transaction to be considered valid
1✔
3424
        let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
3425
            to_main_tx,
1✔
3426
            state_lock.clone(),
1✔
3427
            get_dummy_socket_address(0),
1✔
3428
            hsd_1.clone(),
1✔
3429
            true,
1✔
3430
            1,
1✔
3431
            now,
1✔
3432
        );
1✔
3433

1✔
3434
        let mut peer_state = MutablePeerState::new(hsd_1.tip_header.height);
1✔
3435

1✔
3436
        assert!(
1✔
3437
            state_lock.lock_guard().await.mempool.is_empty(),
1✔
3438
            "Mempool must be empty at init"
1✔
3439
        );
1✔
3440
        peer_loop_handler
1✔
3441
            .run(mock, from_main_rx_clone, &mut peer_state)
1✔
3442
            .await
1✔
3443
            .unwrap();
1✔
3444

1✔
3445
        // Transaction must be sent to `main_loop`. The transaction is stored to the mempool
1✔
3446
        // by the `main_loop`.
1✔
3447
        match to_main_rx1.recv().await {
1✔
3448
            Some(PeerTaskToMain::Transaction(_)) => (),
1✔
3449
            _ => panic!("Must receive remove of peer block max height"),
1✔
3450
        };
1✔
3451
    }
1✔
3452

3453
    #[traced_test]
×
3454
    #[tokio::test]
3455
    async fn populated_mempool_request_tx_test() -> Result<()> {
1✔
3456
        // In this scenario the peer is informed of a transaction that it already knows
1✔
3457

1✔
3458
        let network = Network::Main;
1✔
3459
        let (
1✔
3460
            _peer_broadcast_tx,
1✔
3461
            from_main_rx_clone,
1✔
3462
            to_main_tx,
1✔
3463
            mut to_main_rx1,
1✔
3464
            mut state_lock,
1✔
3465
            _hsd,
1✔
3466
        ) = get_test_genesis_setup(network, 1, cli_args::Args::default())
1✔
3467
            .await
1✔
3468
            .unwrap();
1✔
3469
        let spending_key = state_lock
1✔
3470
            .lock_guard()
1✔
3471
            .await
1✔
3472
            .wallet_state
1✔
3473
            .wallet_entropy
1✔
3474
            .nth_symmetric_key_for_tests(0);
1✔
3475

1✔
3476
        let genesis_block = Block::genesis(network);
1✔
3477
        let now = genesis_block.kernel.header.timestamp;
1✔
3478
        let (transaction_1, _, _change_output) = state_lock
1✔
3479
            .lock_guard()
1✔
3480
            .await
1✔
3481
            .create_transaction_with_prover_capability(
1✔
3482
                Default::default(),
1✔
3483
                spending_key.into(),
1✔
3484
                UtxoNotificationMedium::OffChain,
1✔
3485
                NativeCurrencyAmount::coins(0),
1✔
3486
                now,
1✔
3487
                TxProvingCapability::ProofCollection,
1✔
3488
                &TritonVmJobQueue::dummy(),
1✔
3489
            )
1✔
3490
            .await
1✔
3491
            .unwrap();
1✔
3492

1✔
3493
        let (hsd_1, _sa_1) = get_dummy_peer_connection_data_genesis(network, 1);
1✔
3494
        let mut peer_loop_handler = PeerLoopHandler::new(
1✔
3495
            to_main_tx,
1✔
3496
            state_lock.clone(),
1✔
3497
            get_dummy_socket_address(0),
1✔
3498
            hsd_1.clone(),
1✔
3499
            true,
1✔
3500
            1,
1✔
3501
        );
1✔
3502
        let mut peer_state = MutablePeerState::new(hsd_1.tip_header.height);
1✔
3503

1✔
3504
        assert!(
1✔
3505
            state_lock.lock_guard().await.mempool.is_empty(),
1✔
3506
            "Mempool must be empty at init"
1✔
3507
        );
1✔
3508
        state_lock
1✔
3509
            .lock_guard_mut()
1✔
3510
            .await
1✔
3511
            .mempool_insert(transaction_1.clone(), TransactionOrigin::Foreign)
1✔
3512
            .await;
1✔
3513
        assert!(
1✔
3514
            !state_lock.lock_guard().await.mempool.is_empty(),
1✔
3515
            "Mempool must be non-empty after insertion"
1✔
3516
        );
1✔
3517

1✔
3518
        // Run the peer loop and verify expected exchange -- namely that the
1✔
3519
        // tx notification is received and the the transaction is *not*
1✔
3520
        // requested.
1✔
3521
        let tx_notification: TransactionNotification = (&transaction_1).try_into().unwrap();
1✔
3522
        let mock = Mock::new(vec![
1✔
3523
            Action::Read(PeerMessage::TransactionNotification(tx_notification)),
1✔
3524
            Action::Read(PeerMessage::Bye),
1✔
3525
        ]);
1✔
3526
        peer_loop_handler
1✔
3527
            .run(mock, from_main_rx_clone, &mut peer_state)
1✔
3528
            .await
1✔
3529
            .unwrap();
1✔
3530

1✔
3531
        // nothing is allowed to be sent to `main_loop`
1✔
3532
        match to_main_rx1.try_recv() {
1✔
3533
            Err(TryRecvError::Empty) => (),
1✔
3534
            Err(TryRecvError::Disconnected) => panic!("to_main channel must still be open"),
1✔
3535
            Ok(_) => panic!("to_main channel must be empty"),
1✔
3536
        };
1✔
3537
        Ok(())
1✔
3538
    }
1✔
3539

3540
    mod block_proposals {
3541
        use super::*;
3542
        use crate::tests::shared::get_dummy_handshake_data_for_genesis;
3543

3544
        struct TestSetup {
3545
            peer_loop_handler: PeerLoopHandler,
3546
            to_main_rx: mpsc::Receiver<PeerTaskToMain>,
3547
            from_main_rx: broadcast::Receiver<MainToPeerTask>,
3548
            peer_state: MutablePeerState,
3549
            to_main_tx: mpsc::Sender<PeerTaskToMain>,
3550
            genesis_block: Block,
3551
            peer_broadcast_tx: broadcast::Sender<MainToPeerTask>,
3552
        }
3553

3554
        async fn genesis_setup(network: Network) -> TestSetup {
2✔
3555
            let (peer_broadcast_tx, from_main_rx, to_main_tx, to_main_rx, alice, _hsd) =
2✔
3556
                get_test_genesis_setup(network, 0, cli_args::Args::default())
2✔
3557
                    .await
2✔
3558
                    .unwrap();
2✔
3559
            let peer_hsd = get_dummy_handshake_data_for_genesis(network);
2✔
3560
            let peer_loop_handler = PeerLoopHandler::new(
2✔
3561
                to_main_tx.clone(),
2✔
3562
                alice.clone(),
2✔
3563
                get_dummy_socket_address(0),
2✔
3564
                peer_hsd.clone(),
2✔
3565
                true,
2✔
3566
                1,
2✔
3567
            );
2✔
3568
            let peer_state = MutablePeerState::new(peer_hsd.tip_header.height);
2✔
3569

2✔
3570
            // (peer_loop_handler, to_main_rx1)
2✔
3571
            TestSetup {
2✔
3572
                peer_broadcast_tx,
2✔
3573
                peer_loop_handler,
2✔
3574
                to_main_rx,
2✔
3575
                from_main_rx,
2✔
3576
                peer_state,
2✔
3577
                to_main_tx,
2✔
3578
                genesis_block: Block::genesis(network),
2✔
3579
            }
2✔
3580
        }
2✔
3581

3582
        #[traced_test]
×
3583
        #[tokio::test]
3584
        async fn accept_block_proposal_height_one() {
1✔
3585
            // Node knows genesis block, receives a block proposal for block 1
1✔
3586
            // and must accept this. Verify that main loop is informed of block
1✔
3587
            // proposal.
1✔
3588
            let TestSetup {
1✔
3589
                peer_broadcast_tx,
1✔
3590
                mut peer_loop_handler,
1✔
3591
                mut to_main_rx,
1✔
3592
                from_main_rx,
1✔
3593
                mut peer_state,
1✔
3594
                to_main_tx,
1✔
3595
                genesis_block,
1✔
3596
            } = genesis_setup(Network::Main).await;
1✔
3597
            let block1 = fake_valid_block_for_tests(
1✔
3598
                &peer_loop_handler.global_state_lock,
1✔
3599
                StdRng::seed_from_u64(5550001).random(),
1✔
3600
            )
1✔
3601
            .await;
1✔
3602

1✔
3603
            let mock = Mock::new(vec![
1✔
3604
                Action::Read(PeerMessage::BlockProposal(Box::new(block1))),
1✔
3605
                Action::Read(PeerMessage::Bye),
1✔
3606
            ]);
1✔
3607
            peer_loop_handler
1✔
3608
                .run(mock, from_main_rx, &mut peer_state)
1✔
3609
                .await
1✔
3610
                .unwrap();
1✔
3611

1✔
3612
            match to_main_rx.try_recv().unwrap() {
1✔
3613
                PeerTaskToMain::BlockProposal(block) => {
1✔
3614
                    assert_eq!(genesis_block.hash(), block.header().prev_block_digest);
1✔
3615
                }
1✔
3616
                _ => panic!("Expected main loop to be informed of block proposal"),
1✔
3617
            };
1✔
3618

1✔
3619
            drop(to_main_tx);
1✔
3620
            drop(peer_broadcast_tx);
1✔
3621
        }
1✔
3622

3623
        #[traced_test]
×
3624
        #[tokio::test]
3625
        async fn accept_block_proposal_notification_height_one() {
1✔
3626
            // Node knows genesis block, receives a block proposal notification
1✔
3627
            // for block 1 and must accept this by requesting the block
1✔
3628
            // proposal from peer.
1✔
3629
            let TestSetup {
1✔
3630
                peer_broadcast_tx,
1✔
3631
                mut peer_loop_handler,
1✔
3632
                to_main_rx: _,
1✔
3633
                from_main_rx,
1✔
3634
                mut peer_state,
1✔
3635
                to_main_tx,
1✔
3636
                ..
1✔
3637
            } = genesis_setup(Network::Main).await;
1✔
3638
            let block1 = fake_valid_block_for_tests(
1✔
3639
                &peer_loop_handler.global_state_lock,
1✔
3640
                StdRng::seed_from_u64(5550001).random(),
1✔
3641
            )
1✔
3642
            .await;
1✔
3643

1✔
3644
            let mock = Mock::new(vec![
1✔
3645
                Action::Read(PeerMessage::BlockProposalNotification((&block1).into())),
1✔
3646
                Action::Write(PeerMessage::BlockProposalRequest(
1✔
3647
                    BlockProposalRequest::new(block1.body().mast_hash()),
1✔
3648
                )),
1✔
3649
                Action::Read(PeerMessage::Bye),
1✔
3650
            ]);
1✔
3651
            peer_loop_handler
1✔
3652
                .run(mock, from_main_rx, &mut peer_state)
1✔
3653
                .await
1✔
3654
                .unwrap();
1✔
3655

1✔
3656
            drop(to_main_tx);
1✔
3657
            drop(peer_broadcast_tx);
1✔
3658
        }
1✔
3659
    }
3660

3661
    mod proof_qualities {
3662
        use strum::IntoEnumIterator;
3663

3664
        use super::*;
3665
        use crate::config_models::cli_args;
3666
        use crate::models::blockchain::transaction::Transaction;
3667
        use crate::models::peer::transfer_transaction::TransactionProofQuality;
3668
        use crate::tests::shared::mock_genesis_global_state;
3669

3670
        async fn tx_of_proof_quality(
2✔
3671
            network: Network,
2✔
3672
            quality: TransactionProofQuality,
2✔
3673
        ) -> Transaction {
2✔
3674
            let wallet_secret = WalletEntropy::devnet_wallet();
2✔
3675
            let alice_key = wallet_secret.nth_generation_spending_key_for_tests(0);
2✔
3676
            let alice =
2✔
3677
                mock_genesis_global_state(network, 1, wallet_secret, cli_args::Args::default())
2✔
3678
                    .await;
2✔
3679
            let alice = alice.lock_guard().await;
2✔
3680
            let genesis_block = alice.chain.light_state();
2✔
3681
            let in_seven_months = genesis_block.header().timestamp + Timestamp::months(7);
2✔
3682
            let prover_capability = match quality {
2✔
3683
                TransactionProofQuality::ProofCollection => TxProvingCapability::ProofCollection,
1✔
3684
                TransactionProofQuality::SingleProof => TxProvingCapability::SingleProof,
1✔
3685
            };
3686
            alice
2✔
3687
                .create_transaction_with_prover_capability(
2✔
3688
                    vec![].into(),
2✔
3689
                    alice_key.into(),
2✔
3690
                    UtxoNotificationMedium::OffChain,
2✔
3691
                    NativeCurrencyAmount::coins(1),
2✔
3692
                    in_seven_months,
2✔
3693
                    prover_capability,
2✔
3694
                    &TritonVmJobQueue::dummy(),
2✔
3695
                )
2✔
3696
                .await
2✔
3697
                .unwrap()
2✔
3698
                .0
2✔
3699
        }
2✔
3700

3701
        #[traced_test]
×
3702
        #[tokio::test]
3703
        async fn client_favors_higher_proof_quality() {
1✔
3704
            // In this scenario the peer is informed of a transaction that it
1✔
3705
            // already knows, and it's tested that it checks the proof quality
1✔
3706
            // field and verifies that it exceeds the proof in the mempool
1✔
3707
            // before requesting the transasction.
1✔
3708
            let network = Network::Main;
1✔
3709
            let proof_collection_tx =
1✔
3710
                tx_of_proof_quality(network, TransactionProofQuality::ProofCollection).await;
1✔
3711
            let single_proof_tx =
1✔
3712
                tx_of_proof_quality(network, TransactionProofQuality::SingleProof).await;
1✔
3713

1✔
3714
            for (own_tx_pq, new_tx_pq) in
4✔
3715
                TransactionProofQuality::iter().cartesian_product(TransactionProofQuality::iter())
1✔
3716
            {
1✔
3717
                use TransactionProofQuality::*;
1✔
3718

1✔
3719
                let (
1✔
3720
                    _peer_broadcast_tx,
4✔
3721
                    from_main_rx_clone,
4✔
3722
                    to_main_tx,
4✔
3723
                    mut to_main_rx1,
4✔
3724
                    mut alice,
4✔
3725
                    handshake_data,
4✔
3726
                ) = get_test_genesis_setup(network, 1, cli_args::Args::default())
4✔
3727
                    .await
4✔
3728
                    .unwrap();
4✔
3729

1✔
3730
                let (own_tx, new_tx) = match (own_tx_pq, new_tx_pq) {
4✔
3731
                    (ProofCollection, ProofCollection) => {
1✔
3732
                        (&proof_collection_tx, &proof_collection_tx)
1✔
3733
                    }
1✔
3734
                    (ProofCollection, SingleProof) => (&proof_collection_tx, &single_proof_tx),
1✔
3735
                    (SingleProof, ProofCollection) => (&single_proof_tx, &proof_collection_tx),
1✔
3736
                    (SingleProof, SingleProof) => (&single_proof_tx, &single_proof_tx),
1✔
3737
                };
1✔
3738

1✔
3739
                alice
4✔
3740
                    .lock_guard_mut()
4✔
3741
                    .await
4✔
3742
                    .mempool_insert(own_tx.to_owned(), TransactionOrigin::Foreign)
4✔
3743
                    .await;
4✔
3744

1✔
3745
                let tx_notification: TransactionNotification = new_tx.try_into().unwrap();
4✔
3746

4✔
3747
                let own_proof_is_supreme = own_tx_pq >= new_tx_pq;
4✔
3748
                let mock = if own_proof_is_supreme {
4✔
3749
                    Mock::new(vec![
3✔
3750
                        Action::Read(PeerMessage::TransactionNotification(tx_notification)),
3✔
3751
                        Action::Read(PeerMessage::Bye),
3✔
3752
                    ])
3✔
3753
                } else {
1✔
3754
                    Mock::new(vec![
1✔
3755
                        Action::Read(PeerMessage::TransactionNotification(tx_notification)),
1✔
3756
                        Action::Write(PeerMessage::TransactionRequest(tx_notification.txid)),
1✔
3757
                        Action::Read(PeerMessage::Transaction(Box::new(
1✔
3758
                            new_tx.try_into().unwrap(),
1✔
3759
                        ))),
1✔
3760
                        Action::Read(PeerMessage::Bye),
1✔
3761
                    ])
1✔
3762
                };
1✔
3763

1✔
3764
                let now = proof_collection_tx.kernel.timestamp;
4✔
3765
                let mut peer_loop_handler = PeerLoopHandler::with_mocked_time(
4✔
3766
                    to_main_tx,
4✔
3767
                    alice.clone(),
4✔
3768
                    get_dummy_socket_address(0),
4✔
3769
                    handshake_data.clone(),
4✔
3770
                    true,
4✔
3771
                    1,
4✔
3772
                    now,
4✔
3773
                );
4✔
3774
                let mut peer_state = MutablePeerState::new(handshake_data.tip_header.height);
4✔
3775

4✔
3776
                peer_loop_handler
4✔
3777
                    .run(mock, from_main_rx_clone, &mut peer_state)
4✔
3778
                    .await
4✔
3779
                    .unwrap();
4✔
3780

4✔
3781
                if own_proof_is_supreme {
4✔
3782
                    match to_main_rx1.try_recv() {
3✔
3783
                        Err(TryRecvError::Empty) => (),
3✔
3784
                        Err(TryRecvError::Disconnected) => {
1✔
3785
                            panic!("to_main channel must still be open")
1✔
3786
                        }
1✔
3787
                        Ok(_) => panic!("to_main channel must be empty"),
1✔
3788
                    }
1✔
3789
                } else {
1✔
3790
                    match to_main_rx1.try_recv() {
1✔
3791
                        Err(TryRecvError::Empty) => panic!("Transaction must be sent to main loop"),
1✔
3792
                        Err(TryRecvError::Disconnected) => {
1✔
3793
                            panic!("to_main channel must still be open")
1✔
3794
                        }
1✔
3795
                        Ok(PeerTaskToMain::Transaction(_)) => (),
1✔
3796
                        _ => panic!("Unexpected result from channel"),
1✔
3797
                    }
1✔
3798
                }
1✔
3799
            }
1✔
3800
        }
1✔
3801
    }
3802

3803
    mod sync_challenges {
3804
        use super::*;
3805
        use crate::tests::shared::fake_valid_sequence_of_blocks_for_tests_dyn;
3806

3807
        #[traced_test]
×
3808
        #[tokio::test]
3809
        async fn bad_sync_challenge_height_greater_than_tip() {
1✔
3810
            // Criterium: Challenge height may not exceed that of tip in the
1✔
3811
            // request.
1✔
3812

1✔
3813
            let network = Network::Main;
1✔
3814
            let (
1✔
3815
                _alice_main_to_peer_tx,
1✔
3816
                alice_main_to_peer_rx,
1✔
3817
                alice_peer_to_main_tx,
1✔
3818
                alice_peer_to_main_rx,
1✔
3819
                mut alice,
1✔
3820
                alice_hsd,
1✔
3821
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default())
1✔
3822
                .await
1✔
3823
                .unwrap();
1✔
3824
            let genesis_block: Block = Block::genesis(network);
1✔
3825
            let blocks: [Block; 11] = fake_valid_sequence_of_blocks_for_tests(
1✔
3826
                &genesis_block,
1✔
3827
                Timestamp::hours(1),
1✔
3828
                [0u8; 32],
1✔
3829
            )
1✔
3830
            .await;
1✔
3831
            for block in &blocks {
12✔
3832
                alice.set_new_tip(block.clone()).await.unwrap();
11✔
3833
            }
1✔
3834

1✔
3835
            let bh12 = blocks.last().unwrap().header().height;
1✔
3836
            let sync_challenge = SyncChallenge {
1✔
3837
                tip_digest: blocks[9].hash(),
1✔
3838
                challenges: [bh12; 10],
1✔
3839
            };
1✔
3840
            let alice_p2p_messages = Mock::new(vec![
1✔
3841
                Action::Read(PeerMessage::SyncChallenge(sync_challenge)),
1✔
3842
                Action::Read(PeerMessage::Bye),
1✔
3843
            ]);
1✔
3844

1✔
3845
            let peer_address = get_dummy_socket_address(0);
1✔
3846
            let mut alice_peer_loop_handler = PeerLoopHandler::new(
1✔
3847
                alice_peer_to_main_tx.clone(),
1✔
3848
                alice.clone(),
1✔
3849
                peer_address,
1✔
3850
                alice_hsd,
1✔
3851
                false,
1✔
3852
                1,
1✔
3853
            );
1✔
3854
            alice_peer_loop_handler
1✔
3855
                .run_wrapper(alice_p2p_messages, alice_main_to_peer_rx)
1✔
3856
                .await
1✔
3857
                .unwrap();
1✔
3858

1✔
3859
            drop(alice_peer_to_main_rx);
1✔
3860

1✔
3861
            let latest_sanction = alice
1✔
3862
                .lock_guard()
1✔
3863
                .await
1✔
3864
                .net
1✔
3865
                .get_peer_standing_from_database(peer_address.ip())
1✔
3866
                .await
1✔
3867
                .unwrap();
1✔
3868
            assert_eq!(
1✔
3869
                NegativePeerSanction::InvalidSyncChallenge,
1✔
3870
                latest_sanction
1✔
3871
                    .latest_punishment
1✔
3872
                    .expect("peer must be sanctioned")
1✔
3873
                    .0
1✔
3874
            );
1✔
3875
        }
1✔
3876

3877
        #[traced_test]
×
3878
        #[tokio::test]
3879
        async fn bad_sync_challenge_genesis_block_doesnt_crash_client() {
1✔
3880
            // Criterium: Challenge may not point to genesis block, or block 1, as
1✔
3881
            // tip.
1✔
3882

1✔
3883
            let network = Network::Main;
1✔
3884
            let genesis_block: Block = Block::genesis(network);
1✔
3885

1✔
3886
            let alice_cli = cli_args::Args::default();
1✔
3887
            let (
1✔
3888
                _alice_main_to_peer_tx,
1✔
3889
                alice_main_to_peer_rx,
1✔
3890
                alice_peer_to_main_tx,
1✔
3891
                alice_peer_to_main_rx,
1✔
3892
                alice,
1✔
3893
                alice_hsd,
1✔
3894
            ) = get_test_genesis_setup(network, 0, alice_cli).await.unwrap();
1✔
3895

1✔
3896
            let sync_challenge = SyncChallenge {
1✔
3897
                tip_digest: genesis_block.hash(),
1✔
3898
                challenges: [BlockHeight::genesis(); 10],
1✔
3899
            };
1✔
3900

1✔
3901
            let alice_p2p_messages = Mock::new(vec![
1✔
3902
                Action::Read(PeerMessage::SyncChallenge(sync_challenge)),
1✔
3903
                Action::Read(PeerMessage::Bye),
1✔
3904
            ]);
1✔
3905

1✔
3906
            let peer_address = get_dummy_socket_address(0);
1✔
3907
            let mut alice_peer_loop_handler = PeerLoopHandler::new(
1✔
3908
                alice_peer_to_main_tx.clone(),
1✔
3909
                alice.clone(),
1✔
3910
                peer_address,
1✔
3911
                alice_hsd,
1✔
3912
                false,
1✔
3913
                1,
1✔
3914
            );
1✔
3915
            alice_peer_loop_handler
1✔
3916
                .run_wrapper(alice_p2p_messages, alice_main_to_peer_rx)
1✔
3917
                .await
1✔
3918
                .unwrap();
1✔
3919

1✔
3920
            drop(alice_peer_to_main_rx);
1✔
3921

1✔
3922
            let latest_sanction = alice
1✔
3923
                .lock_guard()
1✔
3924
                .await
1✔
3925
                .net
1✔
3926
                .get_peer_standing_from_database(peer_address.ip())
1✔
3927
                .await
1✔
3928
                .unwrap();
1✔
3929
            assert_eq!(
1✔
3930
                NegativePeerSanction::InvalidSyncChallenge,
1✔
3931
                latest_sanction
1✔
3932
                    .latest_punishment
1✔
3933
                    .expect("peer must be sanctioned")
1✔
3934
                    .0
1✔
3935
            );
1✔
3936
        }
1✔
3937

3938
        #[traced_test]
×
3939
        #[tokio::test]
3940
        async fn sync_challenge_happy_path() -> Result<()> {
1✔
3941
            // Bob notifies Alice of a block whose parameters satisfy the sync mode
1✔
3942
            // criterion. Alice issues a challenge. Bob responds. Alice enters into
1✔
3943
            // sync mode.
1✔
3944

1✔
3945
            let mut rng = rand::rng();
1✔
3946
            let network = Network::Main;
1✔
3947
            let genesis_block: Block = Block::genesis(network);
1✔
3948

1✔
3949
            const ALICE_SYNC_MODE_THRESHOLD: usize = 10;
1✔
3950
            let alice_cli = cli_args::Args {
1✔
3951
                sync_mode_threshold: ALICE_SYNC_MODE_THRESHOLD,
1✔
3952
                ..Default::default()
1✔
3953
            };
1✔
3954
            let (
1✔
3955
                _alice_main_to_peer_tx,
1✔
3956
                alice_main_to_peer_rx,
1✔
3957
                alice_peer_to_main_tx,
1✔
3958
                mut alice_peer_to_main_rx,
1✔
3959
                mut alice,
1✔
3960
                alice_hsd,
1✔
3961
            ) = get_test_genesis_setup(network, 0, alice_cli).await?;
1✔
3962
            let _alice_socket_address = get_dummy_socket_address(0);
1✔
3963

1✔
3964
            let (
1✔
3965
                _bob_main_to_peer_tx,
1✔
3966
                _bob_main_to_peer_rx,
1✔
3967
                _bob_peer_to_main_tx,
1✔
3968
                _bob_peer_to_main_rx,
1✔
3969
                mut bob,
1✔
3970
                _bob_hsd,
1✔
3971
            ) = get_test_genesis_setup(network, 0, cli_args::Args::default()).await?;
1✔
3972
            let bob_socket_address = get_dummy_socket_address(0);
1✔
3973

1✔
3974
            let now = genesis_block.header().timestamp + Timestamp::hours(1);
1✔
3975
            let block_1 = fake_valid_block_for_tests(&alice, rng.random()).await;
1✔
3976
            assert!(
1✔
3977
                block_1.is_valid(&genesis_block, now).await,
1✔
3978
                "Block must be valid for this test to make sense"
1✔
3979
            );
1✔
3980
            let alice_tip = &block_1;
1✔
3981
            alice.set_new_tip(block_1.clone()).await?;
1✔
3982
            bob.set_new_tip(block_1.clone()).await?;
1✔
3983

1✔
3984
            // produce enough blocks to ensure alice needs to go into sync mode
1✔
3985
            // with this block notification.
1✔
3986
            let blocks = fake_valid_sequence_of_blocks_for_tests_dyn(
1✔
3987
                &block_1,
1✔
3988
                TARGET_BLOCK_INTERVAL,
1✔
3989
                rng.random(),
1✔
3990
                rng.random_range(ALICE_SYNC_MODE_THRESHOLD + 1..20),
1✔
3991
            )
1✔
3992
            .await;
1✔
3993
            for block in &blocks {
19✔
3994
                bob.set_new_tip(block.clone()).await?;
18✔
3995
            }
1✔
3996
            let bob_tip = blocks.last().unwrap();
1✔
3997

1✔
3998
            let block_notification_from_bob = PeerBlockNotification {
1✔
3999
                hash: bob_tip.hash(),
1✔
4000
                height: bob_tip.header().height,
1✔
4001
                cumulative_proof_of_work: bob_tip.header().cumulative_proof_of_work,
1✔
4002
            };
1✔
4003

1✔
4004
            let alice_rng_seed = rng.random::<[u8; 32]>();
1✔
4005
            let mut alice_rng_clone = StdRng::from_seed(alice_rng_seed);
1✔
4006
            let sync_challenge_from_alice = SyncChallenge::generate(
1✔
4007
                &block_notification_from_bob,
1✔
4008
                alice_tip.header().height,
1✔
4009
                alice_rng_clone.random(),
1✔
4010
            );
1✔
4011

1✔
4012
            println!(
1✔
4013
                "sync challenge from alice:\n{:?}",
1✔
4014
                sync_challenge_from_alice
1✔
4015
            );
1✔
4016

1✔
4017
            let sync_challenge_response_from_bob = bob
1✔
4018
                .lock_guard()
1✔
4019
                .await
1✔
4020
                .response_to_sync_challenge(sync_challenge_from_alice)
1✔
4021
                .await
1✔
4022
                .expect("should be able to respond to sync challenge");
1✔
4023

1✔
4024
            let alice_p2p_messages = Mock::new(vec![
1✔
4025
                Action::Read(PeerMessage::BlockNotification(block_notification_from_bob)),
1✔
4026
                Action::Write(PeerMessage::SyncChallenge(sync_challenge_from_alice)),
1✔
4027
                Action::Read(PeerMessage::SyncChallengeResponse(Box::new(
1✔
4028
                    sync_challenge_response_from_bob,
1✔
4029
                ))),
1✔
4030
                Action::Read(PeerMessage::BlockNotification(block_notification_from_bob)),
1✔
4031
                // Ensure no 2nd sync challenge is sent, as timeout has not yet passed.
1✔
4032
                // The absence of a Write here checks that a 2nd challenge isn't sent
1✔
4033
                // when a successful was just received.
1✔
4034
                Action::Read(PeerMessage::Bye),
1✔
4035
            ]);
1✔
4036

1✔
4037
            let mut alice_peer_loop_handler = PeerLoopHandler::with_mocked_time(
1✔
4038
                alice_peer_to_main_tx.clone(),
1✔
4039
                alice.clone(),
1✔
4040
                bob_socket_address,
1✔
4041
                alice_hsd,
1✔
4042
                false,
1✔
4043
                1,
1✔
4044
                bob_tip.header().timestamp,
1✔
4045
            );
1✔
4046
            alice_peer_loop_handler.set_rng(StdRng::from_seed(alice_rng_seed));
1✔
4047
            alice_peer_loop_handler
1✔
4048
                .run_wrapper(alice_p2p_messages, alice_main_to_peer_rx)
1✔
4049
                .await?;
1✔
4050

1✔
4051
            // AddPeerMaxBlockHeight message triggered *after* sync challenge
1✔
4052
            let mut expected_anchor_mmra = bob_tip.body().block_mmr_accumulator.clone();
1✔
4053
            expected_anchor_mmra.append(bob_tip.hash());
1✔
4054
            let expected_message_from_alice_peer_loop = PeerTaskToMain::AddPeerMaxBlockHeight {
1✔
4055
                peer_address: bob_socket_address,
1✔
4056
                claimed_height: bob_tip.header().height,
1✔
4057
                claimed_cumulative_pow: bob_tip.header().cumulative_proof_of_work,
1✔
4058
                claimed_block_mmra: expected_anchor_mmra,
1✔
4059
            };
1✔
4060
            let observed_message_from_alice_peer_loop = alice_peer_to_main_rx.recv().await.unwrap();
1✔
4061
            assert_eq!(
1✔
4062
                expected_message_from_alice_peer_loop,
1✔
4063
                observed_message_from_alice_peer_loop
1✔
4064
            );
1✔
4065

1✔
4066
            Ok(())
1✔
4067
        }
1✔
4068
    }
4069
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc