• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Neptune-Crypto / neptune-core / 13900163354

17 Mar 2025 01:04PM UTC coverage: 84.344% (+0.2%) from 84.172%
13900163354

Pull #503

github

web-flow
Merge 8872cc52d into 5618528a4
Pull Request #503: feat: Allow restriction of number of inputs per tx

359 of 377 new or added lines in 11 files covered. (95.23%)

1264 existing lines in 19 files now uncovered.

51079 of 60560 relevant lines covered (84.34%)

175910.47 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.68
/src/main_loop.rs
1
pub mod proof_upgrader;
2

3
use std::cmp::Ordering;
4
use std::collections::HashMap;
5
use std::net::SocketAddr;
6
use std::time::Duration;
7
use std::time::SystemTime;
8

9
use anyhow::Result;
10
use itertools::Itertools;
11
use proof_upgrader::get_upgrade_task_from_mempool;
12
use proof_upgrader::UpdateMutatorSetDataJob;
13
use proof_upgrader::UpgradeJob;
14
use rand::prelude::IteratorRandom;
15
use rand::seq::IndexedRandom;
16
use tokio::net::TcpListener;
17
use tokio::select;
18
use tokio::signal;
19
use tokio::sync::broadcast;
20
use tokio::sync::mpsc;
21
use tokio::task::JoinHandle;
22
use tokio::time;
23
use tracing::debug;
24
use tracing::error;
25
use tracing::info;
26
use tracing::trace;
27
use tracing::warn;
28

29
use crate::config_models::cli_args;
30
use crate::connect_to_peers::answer_peer;
31
use crate::connect_to_peers::call_peer;
32
use crate::job_queue::triton_vm::TritonVmJobPriority;
33
use crate::job_queue::triton_vm::TritonVmJobQueue;
34
use crate::macros::fn_name;
35
use crate::macros::log_slow_scope;
36
use crate::models::blockchain::block::block_header::BlockHeader;
37
use crate::models::blockchain::block::block_height::BlockHeight;
38
use crate::models::blockchain::block::difficulty_control::ProofOfWork;
39
use crate::models::blockchain::block::Block;
40
use crate::models::blockchain::transaction::Transaction;
41
use crate::models::blockchain::transaction::TransactionProof;
42
use crate::models::channel::MainToMiner;
43
use crate::models::channel::MainToPeerTask;
44
use crate::models::channel::MainToPeerTaskBatchBlockRequest;
45
use crate::models::channel::MinerToMain;
46
use crate::models::channel::PeerTaskToMain;
47
use crate::models::channel::RPCServerToMain;
48
use crate::models::peer::bootstrap_info::BootstrapStatus;
49
use crate::models::peer::handshake_data::HandshakeData;
50
use crate::models::peer::peer_info::PeerInfo;
51
use crate::models::peer::transaction_notification::TransactionNotification;
52
use crate::models::peer::InstanceId;
53
use crate::models::peer::PeerSynchronizationState;
54
use crate::models::proof_abstractions::tasm::program::TritonVmProofJobOptions;
55
use crate::models::state::block_proposal::BlockProposal;
56
use crate::models::state::mempool::TransactionOrigin;
57
use crate::models::state::networking_state::NetworkingState;
58
use crate::models::state::networking_state::SyncAnchor;
59
use crate::models::state::tx_proving_capability::TxProvingCapability;
60
use crate::models::state::GlobalState;
61
use crate::models::state::GlobalStateLock;
62
use crate::SUCCESS_EXIT_CODE;
63

64
const PEER_DISCOVERY_INTERVAL_IN_SECONDS: u64 = 120;
65
const SYNC_REQUEST_INTERVAL_IN_SECONDS: u64 = 3;
66
const MEMPOOL_PRUNE_INTERVAL_IN_SECS: u64 = 30 * 60; // 30mins
67
const MP_RESYNC_INTERVAL_IN_SECS: u64 = 59;
68
const EXPECTED_UTXOS_PRUNE_INTERVAL_IN_SECS: u64 = 19 * 60; // 19 mins
69

70
/// Interval for when transaction-upgrade checker is run. Note that this does
71
/// *not* define how often a transaction-proof upgrade is actually performed.
72
/// Only how often we check if we're ready to perform an upgrade.
73
const TRANSACTION_UPGRADE_CHECK_INTERVAL_IN_SECONDS: u64 = 60; // 1 minute
74

75
const SANCTION_PEER_TIMEOUT_FACTOR: u64 = 40;
76

77
/// Number of seconds within which an individual peer is expected to respond
78
/// to a synchronization request.
79
const INDIVIDUAL_PEER_SYNCHRONIZATION_TIMEOUT_IN_SECONDS: u64 =
80
    SANCTION_PEER_TIMEOUT_FACTOR * SYNC_REQUEST_INTERVAL_IN_SECONDS;
81

82
/// Number of seconds that a synchronization may run without any progress.
83
const GLOBAL_SYNCHRONIZATION_TIMEOUT_IN_SECONDS: u64 =
84
    INDIVIDUAL_PEER_SYNCHRONIZATION_TIMEOUT_IN_SECONDS * 4;
85

86
const POTENTIAL_PEER_MAX_COUNT_AS_A_FACTOR_OF_MAX_PEERS: usize = 20;
87
pub(crate) const MAX_NUM_DIGESTS_IN_BATCH_REQUEST: usize = 200;
88
const TX_UPDATER_CHANNEL_CAPACITY: usize = 1;
89

90
/// Wraps a transmission channel.
91
///
92
/// To be used for the transmission channel to the miner, because
93
///  a) the miner might not exist in which case there would be no-one to empty
94
///     the channel; and
95
///  b) contrary to other channels, transmission failures here are not critical.
96
#[derive(Debug)]
97
struct MainToMinerChannel(Option<mpsc::Sender<MainToMiner>>);
98

99
impl MainToMinerChannel {
100
    /// Send a message to the miner task (if any).
101
    fn send(&self, message: MainToMiner) {
×
102
        // Do no use the async `send` function because it blocks until there
103
        // is spare capacity on the channel. Messages to the miner are not
104
        // critical so if there is no capacity left, just log an error
105
        // message.
106
        if let Some(channel) = &self.0 {
×
UNCOV
107
            if let Err(e) = channel.try_send(message) {
×
UNCOV
108
                error!("Failed to send pause message to miner thread:\n{e}");
×
UNCOV
109
            }
×
UNCOV
110
        }
×
UNCOV
111
    }
×
112
}
113

114
/// MainLoop is the immutable part of the input for the main loop function
115
#[derive(Debug)]
116
pub struct MainLoopHandler {
117
    incoming_peer_listener: TcpListener,
118
    global_state_lock: GlobalStateLock,
119

120
    // note: broadcast::Sender::send() does not block
121
    main_to_peer_broadcast_tx: broadcast::Sender<MainToPeerTask>,
122

123
    // note: mpsc::Sender::send() blocks if channel full.
124
    // locks should not be held across it.
125
    peer_task_to_main_tx: mpsc::Sender<PeerTaskToMain>,
126

127
    // note: MainToMinerChannel::send() does not block.  might log error.
128
    main_to_miner_tx: MainToMinerChannel,
129

130
    #[cfg(test)]
131
    mock_now: Option<SystemTime>,
132
}
133

134
/// The mutable part of the main loop function
135
struct MutableMainLoopState {
136
    /// Information used to batch-download blocks.
137
    sync_state: SyncState,
138

139
    /// Information about potential peers for new connections.
140
    potential_peers: PotentialPeersState,
141

142
    /// A list of joinhandles to spawned tasks.
143
    task_handles: Vec<JoinHandle<()>>,
144

145
    /// A joinhandle to a task performing transaction-proof upgrades.
146
    proof_upgrader_task: Option<JoinHandle<()>>,
147

148
    /// A joinhandle to a task running the update of the mempool transactions.
149
    update_mempool_txs_handle: Option<JoinHandle<()>>,
150

151
    /// A channel that the task updating mempool transactions can use to
152
    /// communicate its result.
153
    update_mempool_receiver: mpsc::Receiver<Vec<Transaction>>,
154
}
155

156
impl MutableMainLoopState {
157
    fn new(cli_args: &cli_args::Args, task_handles: Vec<JoinHandle<()>>) -> Self {
6✔
158
        let (_dummy_sender, dummy_receiver) = mpsc::channel(TX_UPDATER_CHANNEL_CAPACITY);
6✔
159
        Self {
6✔
160
            sync_state: SyncState::default(),
6✔
161
            potential_peers: PotentialPeersState::new(cli_args),
6✔
162
            task_handles,
6✔
163
            proof_upgrader_task: None,
6✔
164
            update_mempool_txs_handle: None,
6✔
165
            update_mempool_receiver: dummy_receiver,
6✔
166
        }
6✔
167
    }
6✔
168
}
169

170
/// handles batch-downloading of blocks if we are more than n blocks behind
171
#[derive(Default, Debug)]
172
struct SyncState {
173
    peer_sync_states: HashMap<SocketAddr, PeerSynchronizationState>,
174
    last_sync_request: Option<(SystemTime, BlockHeight, SocketAddr)>,
175
}
176

177
impl SyncState {
178
    fn record_request(
1✔
179
        &mut self,
1✔
180
        requested_block_height: BlockHeight,
1✔
181
        peer: SocketAddr,
1✔
182
        now: SystemTime,
1✔
183
    ) {
1✔
184
        self.last_sync_request = Some((now, requested_block_height, peer));
1✔
185
    }
1✔
186

187
    /// Return a list of peers that have reported to be in possession of blocks
188
    /// with a PoW above a threshold.
189
    fn get_potential_peers_for_sync_request(&self, threshold_pow: ProofOfWork) -> Vec<SocketAddr> {
2✔
190
        self.peer_sync_states
2✔
191
            .iter()
2✔
192
            .filter(|(_sa, sync_state)| sync_state.claimed_max_pow > threshold_pow)
2✔
193
            .map(|(sa, _)| *sa)
2✔
194
            .collect()
2✔
195
    }
2✔
196

197
    /// Determine if a peer should be sanctioned for failing to respond to a
198
    /// synchronization request fast enough. Also determine if a new request
199
    /// should be made or the previous one should be allowed to run for longer.
200
    ///
201
    /// Returns (peer to be sanctioned, attempt new request).
202
    fn get_status_of_last_request(
1✔
203
        &self,
1✔
204
        current_block_height: BlockHeight,
1✔
205
        now: SystemTime,
1✔
206
    ) -> (Option<SocketAddr>, bool) {
1✔
207
        // A peer is sanctioned if no answer has been received after N times the sync request
1✔
208
        // interval.
1✔
209
        match self.last_sync_request {
1✔
210
            None => {
211
                // No sync request has been made since startup of program
212
                (None, true)
1✔
213
            }
214
            Some((req_time, requested_height, peer_sa)) => {
×
215
                if requested_height < current_block_height {
×
216
                    // The last sync request updated the state
UNCOV
217
                    (None, true)
×
UNCOV
218
                } else if req_time
×
UNCOV
219
                    + Duration::from_secs(INDIVIDUAL_PEER_SYNCHRONIZATION_TIMEOUT_IN_SECONDS)
×
220
                    < now
×
221
                {
222
                    // The last sync request was not answered, sanction peer
223
                    // and make a new sync request.
224
                    (Some(peer_sa), true)
×
225
                } else {
226
                    // The last sync request has not yet been answered. But it has
227
                    // not timed out yet.
UNCOV
228
                    (None, false)
×
229
                }
230
            }
231
        }
232
    }
1✔
233
}
234

235
/// A potential peer in the process of peer discovery.
236
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
237
struct PeerCandidate {
238
    address: SocketAddr,
239
    id: InstanceId,
240
    distance: u8,
241
}
242

243
impl PeerCandidate {
244
    fn new(address: SocketAddr, id: InstanceId, distance: u8) -> Self {
2✔
245
        Self {
2✔
246
            address,
2✔
247
            id,
2✔
248
            distance,
2✔
249
        }
2✔
250
    }
2✔
251
}
252

253
/// The potential peers in the process of peer discovery.
254
#[derive(Debug, Clone)]
255
struct PotentialPeersState {
256
    /// A copy of the corresponding field from the
257
    /// [command line arguments](cli_args::Args).
258
    max_num_peers: usize,
259

260
    candidates: HashMap<SocketAddr, PeerCandidate>,
261
}
262

263
impl PotentialPeersState {
264
    // Takes (a reference to) the entire command line arguments to facilitate
265
    // future refactors: in case additional information is needed, is can just
266
    // be grabbed here.
267
    // Storing the entire argument list is either a potentially big cloning
268
    // operation or a big lifetime challenge.
269
    fn new(cli_args: &cli_args::Args) -> Self {
8✔
270
        Self {
8✔
271
            max_num_peers: cli_args.max_num_peers,
8✔
272
            candidates: HashMap::new(),
8✔
273
        }
8✔
274
    }
8✔
275

276
    fn add(&mut self, candidate: PeerCandidate) {
2✔
277
        // always use the lowest observed distance for a given candidate
278
        if let Some(existing_candidate) = self.candidates.get_mut(&candidate.address) {
2✔
279
            if candidate.distance < existing_candidate.distance {
×
280
                *existing_candidate = candidate;
×
281
            }
×
282
            return;
×
283
        }
2✔
284

2✔
285
        // if the candidate list is full, remove random entries
2✔
286
        let max_num_candidates =
2✔
287
            self.max_num_peers * POTENTIAL_PEER_MAX_COUNT_AS_A_FACTOR_OF_MAX_PEERS;
2✔
288
        while self.candidates.len() >= max_num_candidates {
2✔
289
            let Some(&random_candidate) = self.candidates.keys().choose(&mut rand::rng()) else {
×
290
                warn!("Failed to shrink full potential peer list: couldn't find element to remove");
×
291
                return;
×
292
            };
293
            if self.candidates.remove(&random_candidate).is_none() {
×
294
                warn!("Failed to shrink full potential peer list: couldn't remove chosen element");
×
UNCOV
295
                return;
×
296
            };
×
297
        }
298

299
        self.candidates.insert(candidate.address, candidate);
2✔
300
    }
2✔
301

302
    /// Select a node that
303
    /// - is not us,
304
    /// - is no peer, and
305
    /// - if we are well-connected, is not a bootstrap node.
306
    ///
307
    /// Favors peers with a large distance and those with IP addresses we are
308
    /// not already connected to.
309
    ///
310
    /// The decision whether a potential peer is identical to the current node
311
    /// is made _only_ based on the [`InstanceId`]. This means that different
312
    /// nodes that run on the same computer _can_ connect to each other.
313
    ///
314
    /// This method requires (immutable) access to the [`NetworkingState`],
315
    /// which in turn requires the caller of this function to acquire a read
316
    /// lock of the `global_state_lock`. This design is a tradeoff; the
317
    /// alternative is to clone the various fields and then release the lock.
318
    //
319
    // A note on the design: The method uses various filters as well as a way to
320
    // compare two peer candidates in a simple iterator-combinator chain to
321
    // select the candidate.
322
    // The design's intention is to simplify future changes: declaring a new
323
    // filter as well as modifying or dropping existing filters has little to no
324
    // impact on other filters. Similarly, modifying the comparison function
325
    // does not require changes to the filters.
326
    fn peer_candidate(&self, networking_state: &NetworkingState) -> Option<PeerCandidate> {
3✔
327
        let is_self = |candidate: &PeerCandidate| candidate.id == networking_state.instance_id;
3✔
328
        let same_id_is_connected = |candidate: &PeerCandidate| {
3✔
329
            networking_state
2✔
330
                .peer_map
2✔
331
                .values()
2✔
332
                .any(|peer| peer.instance_id() == candidate.id)
42✔
333
        };
2✔
334
        let same_socket_is_connected = |candidate: &PeerCandidate| {
3✔
335
            networking_state
2✔
336
                .peer_map
2✔
337
                .values()
2✔
338
                .filter_map(|peer| peer.listen_address())
42✔
339
                .any(|address| address == candidate.address)
42✔
340
        };
2✔
341
        let is_bootstrap_node = |candidate: &PeerCandidate| {
3✔
342
            let candidate_bootstrap_status = networking_state
1✔
343
                .bootstrap_status
1✔
344
                .get(&candidate.address)
1✔
345
                .map(|bootstrap_info| bootstrap_info.status)
1✔
346
                .unwrap_or_default();
1✔
347
            candidate_bootstrap_status == BootstrapStatus::Bootstrap
1✔
348
        };
1✔
349

350
        let curr_num_peers = networking_state.peer_map.len();
3✔
351
        let self_is_well_connected = self.is_well_connected(curr_num_peers);
3✔
352

3✔
353
        self.candidates
3✔
354
            .values()
3✔
355
            .filter(|candidate| !is_self(candidate))
3✔
356
            .filter(|candidate| !same_id_is_connected(candidate))
3✔
357
            .filter(|candidate| !same_socket_is_connected(candidate))
3✔
358
            .filter(|candidate| !(self_is_well_connected && is_bootstrap_node(candidate)))
3✔
359
            .max_by(|l, r| Self::candidate_ordering(networking_state.peer_map.values(), l, r))
3✔
360
            .copied()
3✔
361
    }
3✔
362

363
    fn is_well_connected(&self, curr_num_peers: usize) -> bool {
3✔
364
        /// A node is considered minimally well-connected if it has at least
365
        /// this many peers.
366
        const NUM_PEERS_MINIMALLY_WELL_CONNECTED: usize = 3;
367

368
        /// A node is considered sufficiently well-connected if it has more
369
        /// peers than this ratio of its maximum number of peers.
370
        const SUFFICIENTLY_WELL_CONNECTED_RATIO: f32 = 0.5;
371

372
        let is_minimally_well_connected = curr_num_peers >= NUM_PEERS_MINIMALLY_WELL_CONNECTED;
3✔
373

3✔
374
        let is_sufficiently_well_connected =
3✔
375
            curr_num_peers as f32 > SUFFICIENTLY_WELL_CONNECTED_RATIO * self.max_num_peers as f32;
3✔
376

3✔
377
        is_minimally_well_connected && is_sufficiently_well_connected
3✔
378
    }
3✔
379

380
    /// Order candidates by connection status of their IP address and distance.
381
    ///
382
    /// The resulting [Ordering] is used like in [`Ord::cmp`]. For example,
383
    /// `candidate_ordering(&[], left, right) == `[`Ordering::Less`] means that
384
    /// `left` is the less-suitable candidate.
385
    //
386
    // This is not `impl Ord for PeerCandidate` because it depends on the
387
    // current peers.
UNCOV
388
    fn candidate_ordering<'pi>(
×
UNCOV
389
        current_peers: impl Iterator<Item = &'pi PeerInfo> + Clone,
×
UNCOV
390
        left: &PeerCandidate,
×
UNCOV
391
        right: &PeerCandidate,
×
UNCOV
392
    ) -> Ordering {
×
UNCOV
393
        // Does a connection to the candidate's IP exist?
×
UNCOV
394
        let ip_is_connected = |candidate: &PeerCandidate| {
×
UNCOV
395
            current_peers
×
UNCOV
396
                .clone()
×
397
                .filter_map(|peer| peer.listen_address())
×
UNCOV
398
                .any(|address| address.ip() == candidate.address.ip())
×
UNCOV
399
        };
×
400

UNCOV
401
        match (ip_is_connected(left), ip_is_connected(right)) {
×
UNCOV
402
            (true, false) => Ordering::Less,    // prefer `right`
×
UNCOV
403
            (false, true) => Ordering::Greater, // prefer `left`
×
UNCOV
404
            _ => left.distance.cmp(&right.distance),
×
405
        }
UNCOV
406
    }
×
407
}
408

409
/// Return a boolean indicating if synchronization mode should be left
UNCOV
410
fn stay_in_sync_mode(
×
UNCOV
411
    own_block_tip_header: &BlockHeader,
×
UNCOV
412
    sync_state: &SyncState,
×
UNCOV
413
    sync_mode_threshold: usize,
×
UNCOV
414
) -> bool {
×
UNCOV
415
    let max_claimed_pow = sync_state
×
UNCOV
416
        .peer_sync_states
×
UNCOV
417
        .values()
×
UNCOV
418
        .max_by_key(|x| x.claimed_max_pow);
×
UNCOV
419
    match max_claimed_pow {
×
UNCOV
420
        None => false, // No peer have passed the sync challenge phase.
×
421

422
        // Synchronization is left when the remaining number of block is half of what has
423
        // been indicated to fit into RAM
UNCOV
424
        Some(max_claim) => {
×
UNCOV
425
            own_block_tip_header.cumulative_proof_of_work < max_claim.claimed_max_pow
×
UNCOV
426
                && max_claim.claimed_max_height - own_block_tip_header.height
×
UNCOV
427
                    > sync_mode_threshold as i128 / 2
×
428
        }
429
    }
UNCOV
430
}
×
431

432
impl MainLoopHandler {
433
    pub(crate) fn new(
10✔
434
        incoming_peer_listener: TcpListener,
10✔
435
        global_state_lock: GlobalStateLock,
10✔
436
        main_to_peer_broadcast_tx: broadcast::Sender<MainToPeerTask>,
10✔
437
        peer_task_to_main_tx: mpsc::Sender<PeerTaskToMain>,
10✔
438
        main_to_miner_tx: mpsc::Sender<MainToMiner>,
10✔
439
    ) -> Self {
10✔
440
        let maybe_main_to_miner_tx = if global_state_lock.cli().mine() {
10✔
441
            Some(main_to_miner_tx)
×
442
        } else {
443
            None
10✔
444
        };
445
        Self {
10✔
446
            incoming_peer_listener,
10✔
447
            global_state_lock,
10✔
448
            main_to_miner_tx: MainToMinerChannel(maybe_main_to_miner_tx),
10✔
449
            main_to_peer_broadcast_tx,
10✔
450
            peer_task_to_main_tx,
10✔
451
            #[cfg(test)]
10✔
452
            mock_now: None,
10✔
453
        }
10✔
454
    }
10✔
455

456
    /// Allows for mocked timestamps such that time dependencies may be tested.
457
    #[cfg(test)]
458
    fn with_mocked_time(mut self, mocked_time: SystemTime) -> Self {
3✔
459
        self.mock_now = Some(mocked_time);
3✔
460
        self
3✔
461
    }
3✔
462

463
    fn now(&self) -> SystemTime {
7✔
464
        #[cfg(not(test))]
7✔
465
        {
7✔
466
            SystemTime::now()
7✔
467
        }
7✔
468
        #[cfg(test)]
7✔
469
        {
7✔
470
            self.mock_now.unwrap_or(SystemTime::now())
7✔
471
        }
7✔
472
    }
7✔
473

474
    /// Run a list of Triton VM prover jobs that update the mutator set state
475
    /// for transactions.
476
    ///
477
    /// Sends the result back through the provided channel.
UNCOV
478
    async fn update_mempool_jobs(
×
479
        update_jobs: Vec<UpdateMutatorSetDataJob>,
×
480
        job_queue: &TritonVmJobQueue,
×
481
        transaction_update_sender: mpsc::Sender<Vec<Transaction>>,
×
482
        proof_job_options: TritonVmProofJobOptions,
×
483
    ) {
×
484
        debug!(
×
485
            "Attempting to update transaction proofs of {} transactions",
×
UNCOV
486
            update_jobs.len()
×
487
        );
488
        let mut result = vec![];
×
489
        for job in update_jobs {
×
490
            // Jobs for updating txs in the mempool have highest priority since
491
            // they block the composer from continuing.
492
            // TODO: Handle errors better here.
UNCOV
493
            let job_result = job
×
UNCOV
494
                .upgrade(job_queue, proof_job_options.clone())
×
UNCOV
495
                .await
×
UNCOV
496
                .unwrap();
×
UNCOV
497
            result.push(job_result);
×
498
        }
499

UNCOV
500
        transaction_update_sender
×
UNCOV
501
            .send(result)
×
UNCOV
502
            .await
×
UNCOV
503
            .expect("Receiver for updated txs in main loop must still exist");
×
UNCOV
504
    }
×
505

506
    /// Handles a list of transactions whose proof has been updated with new
507
    /// mutator set data.
508
    async fn handle_updated_mempool_txs(&mut self, updated_txs: Vec<Transaction>) {
×
509
        // Update mempool with updated transactions
510
        {
UNCOV
511
            let mut state = self.global_state_lock.lock_guard_mut().await;
×
UNCOV
512
            for updated in &updated_txs {
×
UNCOV
513
                let txid = updated.kernel.txid();
×
UNCOV
514
                if let Some(tx) = state.mempool.get_mut(txid) {
×
UNCOV
515
                    *tx = updated.to_owned();
×
UNCOV
516
                } else {
×
UNCOV
517
                    warn!("Updated transaction which is no longer in mempool");
×
518
                }
519
            }
520
        }
521

522
        // Then notify all peers
UNCOV
523
        for updated in updated_txs {
×
UNCOV
524
            self.main_to_peer_broadcast_tx
×
UNCOV
525
                .send(MainToPeerTask::TransactionNotification(
×
UNCOV
526
                    (&updated).try_into().unwrap(),
×
UNCOV
527
                ))
×
UNCOV
528
                .unwrap();
×
UNCOV
529
        }
×
530

531
        // Tell miner that it can now start composing next block.
532
        self.main_to_miner_tx.send(MainToMiner::Continue);
×
533
    }
×
534

535
    /// Process a block whose PoW solution was solved by this client (or an
536
    /// external program) and has not been seen by the rest of the network yet.
537
    ///
538
    /// Shares block with all connected peers, updates own state, and updates
539
    /// any mempool transactions to be valid under this new block.
540
    ///
541
    /// Locking:
542
    ///  * acquires `global_state_lock` for write
543
    async fn handle_self_guessed_block(
1✔
544
        &mut self,
1✔
545
        main_loop_state: &mut MutableMainLoopState,
1✔
546
        new_block: Box<Block>,
1✔
547
    ) -> Result<()> {
1✔
548
        let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
1✔
549

550
        if !global_state_mut.incoming_block_is_more_canonical(&new_block) {
1✔
551
            drop(global_state_mut); // don't hold across send()
×
552
            warn!("Got new block from miner that was not child of tip. Discarding.");
×
553
            self.main_to_miner_tx.send(MainToMiner::Continue);
×
UNCOV
554
            return Ok(());
×
555
        }
1✔
556
        info!("Locally-mined block is new tip: {}", new_block.hash());
1✔
557

558
        // Share block with peers first thing.
559
        info!("broadcasting new block to peers");
1✔
560
        self.main_to_peer_broadcast_tx
1✔
561
            .send(MainToPeerTask::Block(new_block.clone()))
1✔
562
            .expect("Peer handler broadcast channel prematurely closed.");
1✔
563

564
        let update_jobs = global_state_mut.set_new_tip(*new_block).await?;
1✔
565
        drop(global_state_mut);
1✔
566

1✔
567
        self.spawn_mempool_txs_update_job(main_loop_state, update_jobs);
1✔
568

1✔
569
        Ok(())
1✔
570
    }
1✔
571

572
    /// Locking:
573
    ///   * acquires `global_state_lock` for write
574
    async fn handle_miner_task_message(
×
575
        &mut self,
×
576
        msg: MinerToMain,
×
577
        main_loop_state: &mut MutableMainLoopState,
×
578
    ) -> Result<Option<i32>> {
×
579
        match msg {
×
580
            MinerToMain::NewBlockFound(new_block_info) => {
×
581
                log_slow_scope!(fn_name!() + "::MinerToMain::NewBlockFound");
×
UNCOV
582

×
UNCOV
583
                let new_block = new_block_info.block;
×
UNCOV
584

×
UNCOV
585
                info!("Miner found new block: {}", new_block.kernel.header.height);
×
586
                self.handle_self_guessed_block(main_loop_state, new_block)
×
587
                    .await?;
×
588
            }
589
            MinerToMain::BlockProposal(boxed_proposal) => {
×
UNCOV
590
                let (block, expected_utxos) = *boxed_proposal;
×
591

592
                // If block proposal from miner does not build on current tip,
593
                // don't broadcast it. This check covers reorgs as well.
594
                let current_tip = self
×
UNCOV
595
                    .global_state_lock
×
596
                    .lock_guard()
×
597
                    .await
×
598
                    .chain
UNCOV
599
                    .light_state()
×
UNCOV
600
                    .clone();
×
601
                if block.header().prev_block_digest != current_tip.hash() {
×
602
                    warn!(
×
UNCOV
603
                        "Got block proposal from miner that does not build on current tip. \
×
UNCOV
604
                           Rejecting. If this happens a lot, then maybe this machine is too \
×
UNCOV
605
                           slow to competitively compose blocks. Consider running the client only \
×
UNCOV
606
                           with the guesser flag set and not the compose flag."
×
607
                    );
UNCOV
608
                    self.main_to_miner_tx.send(MainToMiner::Continue);
×
UNCOV
609
                    return Ok(None);
×
UNCOV
610
                }
×
UNCOV
611

×
UNCOV
612
                // Ensure proposal validity before sharing
×
UNCOV
613
                if !block.is_valid(&current_tip, block.header().timestamp).await {
×
614
                    error!("Own block proposal invalid. This should not happen.");
×
615
                    self.main_to_miner_tx.send(MainToMiner::Continue);
×
616
                    return Ok(None);
×
617
                }
×
618

×
UNCOV
619
                if !self.global_state_lock.cli().secret_compositions {
×
UNCOV
620
                    self.main_to_peer_broadcast_tx
×
UNCOV
621
                    .send(MainToPeerTask::BlockProposalNotification((&block).into()))
×
622
                    .expect(
×
623
                        "Peer handler broadcast channel prematurely closed. This should never happen.",
×
624
                    );
×
625
                }
×
626

627
                {
628
                    // Use block proposal and add expected UTXOs from this
629
                    // proposal.
630
                    let mut state = self.global_state_lock.lock_guard_mut().await;
×
631
                    state.mining_state.block_proposal =
×
632
                        BlockProposal::own_proposal(block.clone(), expected_utxos.clone());
×
UNCOV
633
                    state.wallet_state.add_expected_utxos(expected_utxos).await;
×
634
                }
635

636
                // Indicate to miner that block proposal was successfully
637
                // received by main-loop.
638
                self.main_to_miner_tx.send(MainToMiner::Continue);
×
639
            }
640
            MinerToMain::Shutdown(exit_code) => {
×
641
                return Ok(Some(exit_code));
×
642
            }
643
        }
644

UNCOV
645
        Ok(None)
×
646
    }
×
647

648
    /// Locking:
649
    ///   * acquires `global_state_lock` for write
650
    async fn handle_peer_task_message(
1✔
651
        &mut self,
1✔
652
        msg: PeerTaskToMain,
1✔
653
        main_loop_state: &mut MutableMainLoopState,
1✔
654
    ) -> Result<()> {
1✔
655
        debug!("Received {} from a peer task", msg.get_type());
1✔
656
        let cli_args = self.global_state_lock.cli().clone();
1✔
657
        match msg {
1✔
UNCOV
658
            PeerTaskToMain::NewBlocks(blocks) => {
×
UNCOV
659
                log_slow_scope!(fn_name!() + "::PeerTaskToMain::NewBlocks");
×
660

×
661
                let last_block = blocks.last().unwrap().to_owned();
×
662
                let update_jobs = {
×
663
                    // The peer tasks also check this condition, if block is more canonical than current
664
                    // tip, but we have to check it again since the block update might have already been applied
665
                    // through a message from another peer (or from own miner).
666
                    let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
×
667
                    let new_canonical =
×
668
                        global_state_mut.incoming_block_is_more_canonical(&last_block);
×
669

×
670
                    if !new_canonical {
×
671
                        // The blocks are not canonical, but: if we are in sync
672
                        // mode and these blocks beat our current champion, then
673
                        // we store them anyway, without marking them as tip.
674
                        let Some(sync_anchor) = global_state_mut.net.sync_anchor.as_mut() else {
×
UNCOV
675
                            warn!(
×
676
                                "Blocks were not new, and we're not syncing. Not storing blocks."
×
677
                            );
678
                            return Ok(());
×
679
                        };
680
                        if sync_anchor
×
681
                            .champion
×
682
                            .is_some_and(|(height, _)| height >= last_block.header().height)
×
683
                        {
UNCOV
684
                            warn!("Repeated blocks received in sync mode, not storing");
×
UNCOV
685
                            return Ok(());
×
UNCOV
686
                        }
×
UNCOV
687

×
UNCOV
688
                        sync_anchor.catch_up(last_block.header().height, last_block.hash());
×
689

UNCOV
690
                        for block in blocks {
×
UNCOV
691
                            global_state_mut.store_block_not_tip(block).await?;
×
692
                        }
693

694
                        return Ok(());
×
695
                    }
×
UNCOV
696

×
UNCOV
697
                    info!(
×
698
                        "Last block from peer is new canonical tip: {}; height: {}",
×
699
                        last_block.hash(),
×
700
                        last_block.header().height
×
701
                    );
702

703
                    // Ask miner to stop work until state update is completed
704
                    self.main_to_miner_tx.send(MainToMiner::WaitForContinue);
×
705

×
706
                    // Get out of sync mode if needed
×
707
                    if global_state_mut.net.sync_anchor.is_some() {
×
708
                        let stay_in_sync_mode = stay_in_sync_mode(
×
709
                            &last_block.kernel.header,
×
710
                            &main_loop_state.sync_state,
×
711
                            cli_args.sync_mode_threshold,
×
712
                        );
×
UNCOV
713
                        if !stay_in_sync_mode {
×
UNCOV
714
                            info!("Exiting sync mode");
×
715
                            global_state_mut.net.sync_anchor = None;
×
716
                            self.main_to_miner_tx.send(MainToMiner::StopSyncing);
×
717
                        }
×
718
                    }
×
719

720
                    let mut update_jobs: Vec<UpdateMutatorSetDataJob> = vec![];
×
721
                    for new_block in blocks {
×
722
                        debug!(
×
723
                            "Storing block {} in database. Height: {}, Mined: {}",
×
724
                            new_block.hash(),
×
725
                            new_block.kernel.header.height,
×
726
                            new_block.kernel.header.timestamp.standard_format()
×
727
                        );
728

729
                        // Potential race condition here.
730
                        // What if last block is new and canonical, but first
731
                        // block is already known then we'll store the same block
732
                        // twice. That should be OK though, as the appropriate
733
                        // database entries are simply overwritten with the new
734
                        // block info. See the
735
                        // [GlobalState::test::setting_same_tip_twice_is_allowed]
736
                        // test for a test of this phenomenon.
737

738
                        let update_jobs_ = global_state_mut.set_new_tip(new_block).await?;
×
739
                        update_jobs.extend(update_jobs_);
×
740
                    }
741

742
                    update_jobs
×
UNCOV
743
                };
×
UNCOV
744

×
745
                // Inform all peers about new block
×
746
                self.main_to_peer_broadcast_tx
×
747
                    .send(MainToPeerTask::Block(Box::new(last_block.clone())))
×
748
                    .expect("Peer handler broadcast was closed. This should never happen");
×
UNCOV
749

×
750
                // Spawn task to handle mempool tx-updating after new blocks.
×
751
                // TODO: Do clever trick to collapse all jobs relating to the same transaction,
×
752
                //       identified by transaction-ID, into *one* update job.
×
753
                self.spawn_mempool_txs_update_job(main_loop_state, update_jobs);
×
754

×
UNCOV
755
                // Inform miner about new block.
×
UNCOV
756
                self.main_to_miner_tx.send(MainToMiner::NewBlock);
×
757
            }
758
            PeerTaskToMain::AddPeerMaxBlockHeight {
759
                peer_address,
×
760
                claimed_height,
×
UNCOV
761
                claimed_cumulative_pow,
×
UNCOV
762
                claimed_block_mmra,
×
763
            } => {
×
UNCOV
764
                log_slow_scope!(fn_name!() + "::PeerTaskToMain::AddPeerMaxBlockHeight");
×
765

×
766
                let claimed_state =
×
767
                    PeerSynchronizationState::new(claimed_height, claimed_cumulative_pow);
×
768
                main_loop_state
×
769
                    .sync_state
×
770
                    .peer_sync_states
×
771
                    .insert(peer_address, claimed_state);
×
772

773
                // Check if synchronization mode should be activated.
774
                // Synchronization mode is entered if accumulated PoW exceeds
775
                // our tip and if the height difference is positive and beyond
776
                // a threshold value.
777
                let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
×
778
                if global_state_mut.sync_mode_criterion(claimed_height, claimed_cumulative_pow)
×
779
                    && global_state_mut
×
780
                        .net
×
781
                        .sync_anchor
×
782
                        .as_ref()
×
783
                        .is_none_or(|sa| sa.cumulative_proof_of_work < claimed_cumulative_pow)
×
784
                {
785
                    info!(
×
786
                        "Entering synchronization mode due to peer {} indicating tip height {}; cumulative pow: {:?}",
×
787
                        peer_address, claimed_height, claimed_cumulative_pow
788
                    );
789
                    global_state_mut.net.sync_anchor =
×
UNCOV
790
                        Some(SyncAnchor::new(claimed_cumulative_pow, claimed_block_mmra));
×
791
                    self.main_to_miner_tx.send(MainToMiner::StartSyncing);
×
792
                }
×
793
            }
794
            PeerTaskToMain::RemovePeerMaxBlockHeight(socket_addr) => {
×
795
                log_slow_scope!(fn_name!() + "::PeerTaskToMain::RemovePeerMaxBlockHeight");
×
796

×
797
                debug!(
×
798
                    "Removing max block height from sync data structure for peer {}",
×
799
                    socket_addr
800
                );
801
                main_loop_state
×
802
                    .sync_state
×
803
                    .peer_sync_states
×
UNCOV
804
                    .remove(&socket_addr);
×
805

806
                // Get out of sync mode if needed.
807
                let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
×
808

NEW
809
                if global_state_mut.net.sync_anchor.is_some() {
×
NEW
810
                    let stay_in_sync_mode = stay_in_sync_mode(
×
811
                        global_state_mut.chain.light_state().header(),
×
812
                        &main_loop_state.sync_state,
×
813
                        cli_args.sync_mode_threshold,
×
814
                    );
×
NEW
815
                    if !stay_in_sync_mode {
×
NEW
816
                        info!("Exiting sync mode");
×
NEW
817
                        global_state_mut.net.sync_anchor = None;
×
NEW
818
                    }
×
NEW
819
                }
×
820
            }
NEW
821
            PeerTaskToMain::PeerDiscoveryAnswer(potential_peers, distance) => {
×
822
                log_slow_scope!(fn_name!() + "::PeerTaskToMain::PeerDiscoveryAnswer");
×
823
                for (address, id) in potential_peers {
×
824
                    let candidate = PeerCandidate::new(address, id, distance);
×
825
                    main_loop_state.potential_peers.add(candidate);
×
826
                }
×
827
            }
UNCOV
828
            PeerTaskToMain::Transaction(pt2m_transaction) => {
×
829
                log_slow_scope!(fn_name!() + "::PeerTaskToMain::Transaction");
×
830

×
831
                debug!(
×
832
                    "`peer_loop` received following transaction from peer. {} inputs, {} outputs. Synced to mutator set hash: {}",
×
UNCOV
833
                    pt2m_transaction.transaction.kernel.inputs.len(),
×
UNCOV
834
                    pt2m_transaction.transaction.kernel.outputs.len(),
×
UNCOV
835
                    pt2m_transaction.transaction.kernel.mutator_set_hash
×
836
                );
837

UNCOV
838
                let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
×
UNCOV
839
                if pt2m_transaction.confirmable_for_block
×
UNCOV
840
                    != global_state_mut.chain.light_state().hash()
×
841
                {
UNCOV
842
                    warn!("main loop got unmined transaction with bad mutator set data, discarding transaction");
×
843
                    return Ok(());
×
844
                }
×
845

846
                // Insert into mempool, if allowed.
847
                if let Err(err) = global_state_mut
×
848
                    .mempool_insert(
×
849
                        pt2m_transaction.transaction.to_owned(),
×
850
                        TransactionOrigin::Foreign,
×
851
                    )
×
852
                    .await
×
853
                {
854
                    warn!("cannot add transaction into mempool: {err}");
×
855
                    return Ok(());
×
856
                }
×
857

858
                // send notification to peers, if tx accepted by mempool.
859
                let transaction_notification: TransactionNotification =
×
860
                    (&pt2m_transaction.transaction).try_into()?;
×
UNCOV
861
                self.main_to_peer_broadcast_tx
×
862
                    .send(MainToPeerTask::TransactionNotification(
×
UNCOV
863
                        transaction_notification,
×
UNCOV
864
                    ))?;
×
865
            }
UNCOV
866
            PeerTaskToMain::BlockProposal(block) => {
×
UNCOV
867
                log_slow_scope!(fn_name!() + "::PeerTaskToMain::BlockProposal");
×
UNCOV
868

×
UNCOV
869
                debug!("main loop received block proposal from peer loop");
×
870

871
                // Due to race-conditions, we need to verify that this
872
                // block proposal is still the immediate child of tip. If it is,
873
                // and it has a higher guesser fee than what we're currently
874
                // working on, then we switch to this, and notify the miner to
875
                // mine on this new block. We don't need to verify the block's
876
                // validity, since that was done in peer loop.
877
                // To ensure atomicity, a write-lock must be held over global
878
                // state while we check if this proposal is favorable.
879
                {
UNCOV
880
                    info!("Received new favorable block proposal for mining operation.");
×
UNCOV
881
                    let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
×
UNCOV
882
                    let verdict = global_state_mut.favor_incoming_block_proposal(
×
UNCOV
883
                        block.header().height,
×
UNCOV
884
                        block.total_guesser_reward(),
×
UNCOV
885
                    );
×
UNCOV
886
                    if let Err(reject_reason) = verdict {
×
UNCOV
887
                        warn!("main loop got unfavorable block proposal. Reason: {reject_reason}");
×
888
                        return Ok(());
×
UNCOV
889
                    }
×
UNCOV
890

×
UNCOV
891
                    global_state_mut.mining_state.block_proposal =
×
UNCOV
892
                        BlockProposal::foreign_proposal(*block.clone());
×
UNCOV
893
                }
×
UNCOV
894

×
UNCOV
895
                // Notify all peers of the block proposal we just accepted
×
UNCOV
896
                self.main_to_peer_broadcast_tx
×
UNCOV
897
                    .send(MainToPeerTask::BlockProposalNotification((&*block).into()))?;
×
898

UNCOV
899
                self.main_to_miner_tx.send(MainToMiner::NewBlockProposal);
×
900
            }
901
            PeerTaskToMain::DisconnectFromLongestLivedPeer => {
902
                let global_state = self.global_state_lock.lock_guard().await;
1✔
903

904
                // get all peers
905
                let all_peers = global_state.net.peer_map.iter();
1✔
906

1✔
907
                // filter out CLI peers
1✔
908
                let disconnect_candidates =
1✔
909
                    all_peers.filter(|p| !global_state.cli_peers().contains(p.0));
5✔
910

1✔
911
                // find the one with the oldest connection
1✔
912
                let longest_lived_peer = disconnect_candidates.min_by(
1✔
913
                    |(_socket_address_left, peer_info_left),
1✔
914
                     (_socket_address_right, peer_info_right)| {
4✔
915
                        peer_info_left
4✔
916
                            .connection_established()
4✔
917
                            .cmp(&peer_info_right.connection_established())
4✔
918
                    },
4✔
919
                );
1✔
920

921
                // tell to disconnect
922
                if let Some((peer_socket, _peer_info)) = longest_lived_peer {
1✔
923
                    self.main_to_peer_broadcast_tx
1✔
924
                        .send(MainToPeerTask::Disconnect(peer_socket.to_owned()))?;
1✔
UNCOV
925
                }
×
926
            }
927
        }
928

929
        Ok(())
1✔
930
    }
1✔
931

932
    /// If necessary, disconnect from peers.
933
    ///
934
    /// While a reasonable effort is made to never have more connections than
935
    /// [`max_num_peers`](crate::config_models::cli_args::Args::max_num_peers),
936
    /// this is not guaranteed. For example, bootstrap nodes temporarily allow a
937
    /// surplus of incoming connections to provide their service more reliably.
938
    ///
939
    /// Never disconnects peers listed as CLI arguments.
940
    ///
941
    /// Locking:
942
    ///   * acquires `global_state_lock` for read
943
    async fn prune_peers(&self) -> Result<()> {
2✔
944
        // fetch all relevant info from global state; don't hold the lock
2✔
945
        let cli_args = self.global_state_lock.cli();
2✔
946
        let connected_peers = self
2✔
947
            .global_state_lock
2✔
948
            .lock_guard()
2✔
949
            .await
2✔
950
            .net
951
            .peer_map
952
            .values()
2✔
953
            .cloned()
2✔
954
            .collect_vec();
2✔
955

2✔
956
        let num_peers = connected_peers.len();
2✔
957
        let max_num_peers = cli_args.max_num_peers;
2✔
958
        if num_peers <= max_num_peers {
2✔
959
            debug!("No need to prune any peer connections.");
1✔
960
            return Ok(());
1✔
961
        }
1✔
962
        warn!("Connected to {num_peers} peers, which exceeds the maximum ({max_num_peers}).");
1✔
963

964
        // If all connections are outbound, it's OK to exceed the max.
965
        if connected_peers.iter().all(|p| p.connection_is_outbound()) {
9✔
966
            warn!("Not disconnecting from any peer because all connections are outbound.");
×
967
            return Ok(());
×
968
        }
1✔
969

1✔
970
        let num_peers_to_disconnect = num_peers - max_num_peers;
1✔
971
        let peers_to_disconnect = connected_peers
1✔
972
            .into_iter()
1✔
973
            .filter(|peer| !cli_args.peers.contains(&peer.connected_address()))
14✔
974
            .choose_multiple(&mut rand::rng(), num_peers_to_disconnect);
1✔
975
        match peers_to_disconnect.len() {
1✔
UNCOV
976
            0 => warn!("Not disconnecting from any peer because of manual override."),
×
977
            i => info!("Disconnecting from {i} peers."),
1✔
978
        }
979
        for peer in peers_to_disconnect {
5✔
980
            self.main_to_peer_broadcast_tx
4✔
981
                .send(MainToPeerTask::Disconnect(peer.connected_address()))?;
4✔
982
        }
983

984
        Ok(())
1✔
985
    }
2✔
986

987
    /// If necessary, reconnect to the peers listed as CLI arguments.
988
    ///
989
    /// Locking:
990
    ///   * acquires `global_state_lock` for read
991
    async fn reconnect(&self, main_loop_state: &mut MutableMainLoopState) -> Result<()> {
×
992
        let connected_peers = self
×
993
            .global_state_lock
×
994
            .lock_guard()
×
995
            .await
×
996
            .net
997
            .peer_map
998
            .keys()
×
999
            .copied()
×
1000
            .collect_vec();
×
1001
        let peers_with_lost_connection = self
×
1002
            .global_state_lock
×
1003
            .cli()
×
1004
            .peers
×
1005
            .iter()
×
1006
            .filter(|peer| !connected_peers.contains(peer));
×
1007

×
1008
        // If no connection was lost, there's nothing to do.
×
1009
        if peers_with_lost_connection.clone().count() == 0 {
×
1010
            return Ok(());
×
1011
        }
×
1012

1013
        // Else, try to reconnect.
1014
        let own_handshake_data = self
×
1015
            .global_state_lock
×
1016
            .lock_guard()
×
UNCOV
1017
            .await
×
1018
            .get_own_handshakedata();
×
1019
        for &peer_with_lost_connection in peers_with_lost_connection {
×
1020
            // Disallow reconnection if peer is in bad standing
UNCOV
1021
            let peer_standing = self
×
UNCOV
1022
                .global_state_lock
×
UNCOV
1023
                .lock_guard()
×
UNCOV
1024
                .await
×
1025
                .net
UNCOV
1026
                .get_peer_standing_from_database(peer_with_lost_connection.ip())
×
UNCOV
1027
                .await;
×
UNCOV
1028
            if peer_standing.is_some_and(|standing| standing.is_bad()) {
×
UNCOV
1029
                info!("Not reconnecting to peer in bad standing: {peer_with_lost_connection}");
×
UNCOV
1030
                continue;
×
UNCOV
1031
            }
×
UNCOV
1032

×
UNCOV
1033
            info!("Attempting to reconnect to peer: {peer_with_lost_connection}");
×
UNCOV
1034
            let global_state_lock = self.global_state_lock.clone();
×
UNCOV
1035
            let main_to_peer_broadcast_rx = self.main_to_peer_broadcast_tx.subscribe();
×
UNCOV
1036
            let peer_task_to_main_tx = self.peer_task_to_main_tx.to_owned();
×
UNCOV
1037
            let own_handshake_data = own_handshake_data.clone();
×
UNCOV
1038
            let outgoing_connection_task = tokio::task::Builder::new()
×
UNCOV
1039
                .name("call_peer_wrapper_1")
×
UNCOV
1040
                .spawn(async move {
×
UNCOV
1041
                    call_peer(
×
UNCOV
1042
                        peer_with_lost_connection,
×
UNCOV
1043
                        global_state_lock,
×
UNCOV
1044
                        main_to_peer_broadcast_rx,
×
UNCOV
1045
                        peer_task_to_main_tx,
×
UNCOV
1046
                        own_handshake_data,
×
UNCOV
1047
                        1, // All CLI-specified peers have distance 1
×
UNCOV
1048
                    )
×
UNCOV
1049
                    .await;
×
UNCOV
1050
                })?;
×
UNCOV
1051
            main_loop_state.task_handles.push(outgoing_connection_task);
×
UNCOV
1052
            main_loop_state.task_handles.retain(|th| !th.is_finished());
×
UNCOV
1053
        }
×
1054

UNCOV
1055
        Ok(())
×
UNCOV
1056
    }
×
1057

1058
    /// Perform peer discovery.
1059
    ///
1060
    /// Peer discovery involves finding potential peers from connected peers
1061
    /// and attempts to establish a connection with one of them.
1062
    ///
1063
    /// Locking:
1064
    ///   * acquires `global_state_lock` for read
1065
    async fn discover_peers(&self, main_loop_state: &mut MutableMainLoopState) -> Result<()> {
2✔
1066
        // fetch all relevant info from global state, then release the lock
2✔
1067
        let cli_args = self.global_state_lock.cli();
2✔
1068
        let global_state = self.global_state_lock.lock_guard().await;
2✔
1069
        let num_peers = global_state.net.peer_map.len();
2✔
1070
        let own_handshake_data = global_state.get_own_handshakedata();
2✔
1071
        drop(global_state);
2✔
1072

2✔
1073
        let max_num_peers = cli_args.max_num_peers;
2✔
1074

2✔
1075
        // Don't make an outgoing connection if
2✔
1076
        // - the peer limit is reached (or exceeded), or
2✔
1077
        // - the peer limit is _almost_ reached; reserve the last slot for an
2✔
1078
        //   incoming connection.
2✔
1079
        if num_peers >= max_num_peers || num_peers > 2 && num_peers - 1 == max_num_peers {
2✔
1080
            info!("Connected to {num_peers} peers. The configured max is {max_num_peers} peers.");
1✔
1081
            info!("Skipping peer discovery.");
1✔
1082
            return Ok(());
1✔
1083
        }
1✔
1084

1✔
1085
        info!("Performing peer discovery");
1✔
1086

1087
        // Ask all peers for their peer lists. This will eventually – once the
1088
        // responses have come in – update the list of potential peers.
1089
        self.main_to_peer_broadcast_tx
1✔
1090
            .send(MainToPeerTask::MakePeerDiscoveryRequest)?;
1✔
1091

1092
        // Get a peer candidate from the list of potential peers. Generally,
1093
        // the peer lists requested in the previous step will not have come in
1094
        // yet. Therefore, the new candidate is selected based on somewhat
1095
        // (but not overly) old information.
1096
        let Some(peer_candidate) = main_loop_state
1✔
1097
            .potential_peers
1✔
1098
            .peer_candidate(&self.global_state_lock.lock_guard().await.net)
1✔
1099
        else {
1100
            info!("Found no peer candidate to connect to. Not making new connection.");
1✔
1101
            return Ok(());
1✔
1102
        };
1103

1104
        // Try to connect to the selected candidate.
UNCOV
1105
        info!(
×
UNCOV
1106
            "Connecting to peer {address} with distance {distance}",
×
1107
            address = peer_candidate.address,
1108
            distance = peer_candidate.distance
1109
        );
UNCOV
1110
        let global_state_lock = self.global_state_lock.clone();
×
UNCOV
1111
        let main_to_peer_broadcast_rx = self.main_to_peer_broadcast_tx.subscribe();
×
UNCOV
1112
        let peer_task_to_main_tx = self.peer_task_to_main_tx.to_owned();
×
UNCOV
1113
        let outgoing_connection_task = tokio::task::Builder::new()
×
UNCOV
1114
            .name("call_peer_for_new_connection_from_peer_discovery")
×
UNCOV
1115
            .spawn(async move {
×
UNCOV
1116
                call_peer(
×
UNCOV
1117
                    peer_candidate.address,
×
UNCOV
1118
                    global_state_lock,
×
UNCOV
1119
                    main_to_peer_broadcast_rx,
×
UNCOV
1120
                    peer_task_to_main_tx,
×
UNCOV
1121
                    own_handshake_data,
×
UNCOV
1122
                    peer_candidate.distance,
×
UNCOV
1123
                )
×
UNCOV
1124
                .await;
×
UNCOV
1125
            })?;
×
UNCOV
1126
        main_loop_state.task_handles.push(outgoing_connection_task);
×
UNCOV
1127
        main_loop_state.task_handles.retain(|th| !th.is_finished());
×
UNCOV
1128

×
UNCOV
1129
        // Immediately request the new peer's peer list. This allows
×
UNCOV
1130
        // incorporating the new peer's peers into the list of potential peers,
×
UNCOV
1131
        // to be used in the next round of peer discovery.
×
UNCOV
1132
        let peer_discovery_request =
×
UNCOV
1133
            MainToPeerTask::MakeSpecificPeerDiscoveryRequest(peer_candidate.address);
×
UNCOV
1134
        self.main_to_peer_broadcast_tx
×
UNCOV
1135
            .send(peer_discovery_request)?;
×
1136

UNCOV
1137
        Ok(())
×
1138
    }
2✔
1139

1140
    /// Return a list of block heights for a block-batch request.
1141
    ///
1142
    /// Returns an ordered list of the heights of *most preferred block*
1143
    /// to build on, where current tip is always the most preferred block.
1144
    ///
1145
    /// Uses a factor to ensure that the peer will always have something to
1146
    /// build on top of by providing potential starting points all the way
1147
    /// back to genesis.
1148
    fn batch_request_uca_candidate_heights(own_tip_height: BlockHeight) -> Vec<BlockHeight> {
258✔
1149
        const FACTOR: f64 = 1.07f64;
1150

1151
        let mut look_behind = 0;
258✔
1152
        let mut ret = vec![];
258✔
1153

1154
        // A factor of 1.07 can look back ~1m blocks in 200 digests.
1155
        while ret.len() < MAX_NUM_DIGESTS_IN_BATCH_REQUEST - 1 {
51,374✔
1156
            let height = match own_tip_height.checked_sub(look_behind) {
51,118✔
1157
                None => break,
1✔
1158
                Some(height) if height.is_genesis() => break,
51,117✔
1159
                Some(height) => height,
51,116✔
1160
            };
51,116✔
1161

51,116✔
1162
            ret.push(height);
51,116✔
1163
            look_behind = ((look_behind as f64 + 1.0) * FACTOR).floor() as u64;
51,116✔
1164
        }
1165

1166
        ret.push(BlockHeight::genesis());
258✔
1167

258✔
1168
        ret
258✔
1169
    }
258✔
1170

1171
    /// Logic for requesting the batch-download of blocks from peers
1172
    ///
1173
    /// Locking:
1174
    ///   * acquires `global_state_lock` for read
1175
    async fn block_sync(&mut self, main_loop_state: &mut MutableMainLoopState) -> Result<()> {
3✔
1176
        let global_state = self.global_state_lock.lock_guard().await;
3✔
1177

1178
        // Check if we are in sync mode
1179
        let Some(anchor) = &global_state.net.sync_anchor else {
3✔
1180
            return Ok(());
1✔
1181
        };
1182

1183
        info!("Running sync");
2✔
1184

1185
        let (own_tip_hash, own_tip_height, own_cumulative_pow) = (
2✔
1186
            global_state.chain.light_state().hash(),
2✔
1187
            global_state.chain.light_state().kernel.header.height,
2✔
1188
            global_state
2✔
1189
                .chain
2✔
1190
                .light_state()
2✔
1191
                .kernel
2✔
1192
                .header
2✔
1193
                .cumulative_proof_of_work,
2✔
1194
        );
2✔
1195

2✔
1196
        // Check if sync mode has timed out entirely, in which case it should
2✔
1197
        // be abandoned.
2✔
1198
        let anchor = anchor.to_owned();
2✔
1199
        if self.now().duration_since(anchor.updated)?.as_secs()
2✔
1200
            > GLOBAL_SYNCHRONIZATION_TIMEOUT_IN_SECONDS
1201
        {
1202
            warn!("Sync mode has timed out. Abandoning sync mode.");
1✔
1203

1204
            // Abandon attempt, and punish all peers claiming to serve these
1205
            // blocks.
1206
            drop(global_state);
1✔
1207
            self.global_state_lock
1✔
1208
                .lock_guard_mut()
1✔
1209
                .await
1✔
1210
                .net
1✔
1211
                .sync_anchor = None;
1✔
1212

1213
            let peers_to_punish = main_loop_state
1✔
1214
                .sync_state
1✔
1215
                .get_potential_peers_for_sync_request(own_cumulative_pow);
1✔
1216

1217
            for peer in peers_to_punish {
2✔
1218
                self.main_to_peer_broadcast_tx
1✔
1219
                    .send(MainToPeerTask::PeerSynchronizationTimeout(peer))?;
1✔
1220
            }
1221

1222
            return Ok(());
1✔
1223
        }
1✔
1224

1✔
1225
        let (peer_to_sanction, try_new_request): (Option<SocketAddr>, bool) = main_loop_state
1✔
1226
            .sync_state
1✔
1227
            .get_status_of_last_request(own_tip_height, self.now());
1✔
1228

1229
        // Sanction peer if they failed to respond
1230
        if let Some(peer) = peer_to_sanction {
1✔
UNCOV
1231
            self.main_to_peer_broadcast_tx
×
UNCOV
1232
                .send(MainToPeerTask::PeerSynchronizationTimeout(peer))?;
×
1233
        }
1✔
1234

1235
        if !try_new_request {
1✔
UNCOV
1236
            info!("Waiting for last sync to complete.");
×
UNCOV
1237
            return Ok(());
×
1238
        }
1✔
1239

1✔
1240
        // Create the next request from the reported
1✔
1241
        info!("Creating new sync request");
1✔
1242

1243
        // Pick a random peer that has reported to have relevant blocks
1244
        let candidate_peers = main_loop_state
1✔
1245
            .sync_state
1✔
1246
            .get_potential_peers_for_sync_request(own_cumulative_pow);
1✔
1247
        let mut rng = rand::rng();
1✔
1248
        let chosen_peer = candidate_peers.choose(&mut rng);
1✔
1249
        assert!(
1✔
1250
            chosen_peer.is_some(),
1✔
UNCOV
1251
            "A synchronization candidate must be available for a request. \
×
UNCOV
1252
            Otherwise, the data structure is in an invalid state and syncing should not be active"
×
1253
        );
1254

1255
        let ordered_preferred_block_digests = match anchor.champion {
1✔
UNCOV
1256
            Some((_height, digest)) => vec![digest],
×
1257
            None => {
1258
                // Find candidate-UCA digests based on a sparse distribution of
1259
                // block heights skewed towards own tip height
1260
                let request_heights = Self::batch_request_uca_candidate_heights(own_tip_height);
1✔
1261
                let mut ordered_preferred_block_digests = vec![];
1✔
1262
                for height in request_heights {
2✔
1263
                    let digest = global_state
1✔
1264
                        .chain
1✔
1265
                        .archival_state()
1✔
1266
                        .archival_block_mmr
1✔
1267
                        .ammr()
1✔
1268
                        .get_leaf_async(height.into())
1✔
1269
                        .await;
1✔
1270
                    ordered_preferred_block_digests.push(digest);
1✔
1271
                }
1272
                ordered_preferred_block_digests
1✔
1273
            }
1274
        };
1275

1276
        // Send message to the relevant peer loop to request the blocks
1277
        let chosen_peer = chosen_peer.unwrap();
1✔
1278
        info!(
1✔
UNCOV
1279
            "Sending block batch request to {}\nrequesting blocks descending from {}\n height {}",
×
1280
            chosen_peer, own_tip_hash, own_tip_height
1281
        );
1282
        self.main_to_peer_broadcast_tx
1✔
1283
            .send(MainToPeerTask::RequestBlockBatch(
1✔
1284
                MainToPeerTaskBatchBlockRequest {
1✔
1285
                    peer_addr_target: *chosen_peer,
1✔
1286
                    known_blocks: ordered_preferred_block_digests,
1✔
1287
                    anchor_mmr: anchor.block_mmr.clone(),
1✔
1288
                },
1✔
1289
            ))
1✔
1290
            .expect("Sending message to peers must succeed");
1✔
1291

1✔
1292
        // Record that this request was sent to the peer
1✔
1293
        let requested_block_height = own_tip_height.next();
1✔
1294
        main_loop_state
1✔
1295
            .sync_state
1✔
1296
            .record_request(requested_block_height, *chosen_peer, self.now());
1✔
1297

1✔
1298
        Ok(())
1✔
1299
    }
3✔
1300

1301
    /// Scheduled task for upgrading the proofs of transactions in the mempool.
1302
    ///
1303
    /// Will either perform a merge of two transactions supported with single
1304
    /// proofs, or will upgrade a transaction proof of the type
1305
    /// `ProofCollection` to `SingleProof`.
1306
    ///
1307
    /// All proving takes place in a spawned task such that it doesn't block
1308
    /// the main loop. The MutableMainLoopState gets the JoinHandle of the
1309
    /// spawned upgrade task such that its status can be expected.
1310
    async fn proof_upgrader(&mut self, main_loop_state: &mut MutableMainLoopState) -> Result<()> {
3✔
1311
        fn attempt_upgrade(
3✔
1312
            global_state: &GlobalState,
3✔
1313
            now: SystemTime,
3✔
1314
            tx_upgrade_interval: Option<Duration>,
3✔
1315
            main_loop_state: &MutableMainLoopState,
3✔
1316
        ) -> Result<bool> {
3✔
1317
            let duration_since_last_upgrade =
3✔
1318
                now.duration_since(global_state.net.last_tx_proof_upgrade_attempt)?;
3✔
1319
            let previous_upgrade_task_is_still_running = main_loop_state
3✔
1320
                .proof_upgrader_task
3✔
1321
                .as_ref()
3✔
1322
                .is_some_and(|x| !x.is_finished());
3✔
1323
            Ok(global_state.net.sync_anchor.is_none()
3✔
1324
                && global_state.proving_capability() == TxProvingCapability::SingleProof
3✔
1325
                && !previous_upgrade_task_is_still_running
3✔
1326
                && tx_upgrade_interval
3✔
1327
                    .is_some_and(|upgrade_interval| duration_since_last_upgrade > upgrade_interval))
3✔
1328
        }
3✔
1329

1330
        trace!("Running proof upgrader scheduled task");
3✔
1331

1332
        // Check if it's time to run the proof-upgrader, and if we're capable
1333
        // of upgrading a transaction proof.
1334
        let tx_upgrade_interval = self.global_state_lock.cli().tx_upgrade_interval();
3✔
1335
        let (upgrade_candidate, tx_origin) = {
1✔
1336
            let global_state = self.global_state_lock.lock_guard().await;
3✔
1337
            let now = self.now();
3✔
1338
            if !attempt_upgrade(&global_state, now, tx_upgrade_interval, main_loop_state)? {
3✔
1339
                trace!("Not attempting upgrade.");
2✔
1340
                return Ok(());
2✔
1341
            }
1✔
1342

1✔
1343
            debug!("Attempting to run transaction-proof-upgrade");
1✔
1344

1345
            // Find a candidate for proof upgrade
1346
            let Some((upgrade_candidate, tx_origin)) = get_upgrade_task_from_mempool(&global_state)
1✔
1347
            else {
UNCOV
1348
                debug!("Found no transaction-proof to upgrade");
×
UNCOV
1349
                return Ok(());
×
1350
            };
1351

1352
            (upgrade_candidate, tx_origin)
1✔
1353
        };
1✔
1354

1✔
1355
        info!(
1✔
UNCOV
1356
            "Attempting to upgrade transaction proofs of: {}",
×
UNCOV
1357
            upgrade_candidate.affected_txids().iter().join("; ")
×
1358
        );
1359

1360
        // Perform the upgrade, if we're not using the prover for anything else,
1361
        // like mining, or proving our own transaction. Running the prover takes
1362
        // a long time (minutes), so we spawn a task for this such that we do
1363
        // not block the main loop.
1364
        let vm_job_queue = self.global_state_lock.vm_job_queue().clone();
1✔
1365
        let perform_ms_update_if_needed =
1✔
1366
            self.global_state_lock.cli().proving_capability() == TxProvingCapability::SingleProof;
1✔
1367

1✔
1368
        let global_state_lock_clone = self.global_state_lock.clone();
1✔
1369
        let main_to_peer_broadcast_tx_clone = self.main_to_peer_broadcast_tx.clone();
1✔
1370
        let proof_upgrader_task =
1✔
1371
            tokio::task::Builder::new()
1✔
1372
                .name("proof_upgrader")
1✔
1373
                .spawn(async move {
1✔
1374
                    upgrade_candidate
1✔
1375
                        .handle_upgrade(
1✔
1376
                            &vm_job_queue,
1✔
1377
                            tx_origin,
1✔
1378
                            perform_ms_update_if_needed,
1✔
1379
                            global_state_lock_clone,
1✔
1380
                            main_to_peer_broadcast_tx_clone,
1✔
1381
                        )
1✔
1382
                        .await
1✔
1383
                })?;
1✔
1384

1385
        main_loop_state.proof_upgrader_task = Some(proof_upgrader_task);
1✔
1386

1✔
1387
        Ok(())
1✔
1388
    }
3✔
1389

1390
    /// Post-processing when new block has arrived. Spawn a task to update
1391
    /// transactions in the mempool. Only when the spawned task has completed,
1392
    /// should the miner continue.
1393
    fn spawn_mempool_txs_update_job(
1✔
1394
        &self,
1✔
1395
        main_loop_state: &mut MutableMainLoopState,
1✔
1396
        update_jobs: Vec<UpdateMutatorSetDataJob>,
1✔
1397
    ) {
1✔
1398
        // job completion of the spawned task is communicated through the
1✔
1399
        // `update_mempool_txs_handle` channel.
1✔
1400
        let vm_job_queue = self.global_state_lock.vm_job_queue().clone();
1✔
1401
        if let Some(handle) = main_loop_state.update_mempool_txs_handle.as_ref() {
1✔
1402
            handle.abort();
×
1403
        }
1✔
1404
        let (update_sender, update_receiver) =
1✔
1405
            mpsc::channel::<Vec<Transaction>>(TX_UPDATER_CHANNEL_CAPACITY);
1✔
1406

1✔
1407
        // note: if this task is cancelled, the job will continue
1✔
1408
        // because TritonVmJobOptions::cancel_job_rx is None.
1✔
1409
        // see how compose_task handles cancellation in mine_loop.
1✔
1410
        let job_options = self
1✔
1411
            .global_state_lock
1✔
1412
            .cli()
1✔
1413
            .proof_job_options(TritonVmJobPriority::Highest);
1✔
1414
        main_loop_state.update_mempool_txs_handle = Some(
1✔
1415
            tokio::task::Builder::new()
1✔
1416
                .name("mempool tx ms-updater")
1✔
1417
                .spawn(async move {
1✔
1418
                    Self::update_mempool_jobs(
×
1419
                        update_jobs,
×
1420
                        &vm_job_queue,
×
1421
                        update_sender,
×
1422
                        job_options,
×
1423
                    )
×
1424
                    .await
×
1425
                })
1✔
1426
                .unwrap(),
1✔
1427
        );
1✔
1428
        main_loop_state.update_mempool_receiver = update_receiver;
1✔
1429
    }
1✔
1430

1431
    pub(crate) async fn run(
×
1432
        &mut self,
×
1433
        mut peer_task_to_main_rx: mpsc::Receiver<PeerTaskToMain>,
×
1434
        mut miner_to_main_rx: mpsc::Receiver<MinerToMain>,
×
1435
        mut rpc_server_to_main_rx: mpsc::Receiver<RPCServerToMain>,
×
1436
        task_handles: Vec<JoinHandle<()>>,
×
1437
    ) -> Result<i32> {
×
1438
        // Handle incoming connections, messages from peer tasks, and messages from the mining task
×
1439
        let mut main_loop_state =
×
1440
            MutableMainLoopState::new(self.global_state_lock.cli(), task_handles);
×
1441

×
1442
        // Set peer discovery to run every N seconds. All timers must be reset
×
UNCOV
1443
        // every time they have run.
×
1444
        let peer_discovery_timer_interval = Duration::from_secs(PEER_DISCOVERY_INTERVAL_IN_SECONDS);
×
1445
        let peer_discovery_timer = time::sleep(peer_discovery_timer_interval);
×
1446
        tokio::pin!(peer_discovery_timer);
×
UNCOV
1447

×
UNCOV
1448
        // Set synchronization to run every M seconds.
×
1449
        let block_sync_interval = Duration::from_secs(SYNC_REQUEST_INTERVAL_IN_SECONDS);
×
1450
        let block_sync_timer = time::sleep(block_sync_interval);
×
1451
        tokio::pin!(block_sync_timer);
×
1452

×
1453
        // Set removal of transactions from mempool.
×
1454
        let mempool_cleanup_interval = Duration::from_secs(MEMPOOL_PRUNE_INTERVAL_IN_SECS);
×
1455
        let mempool_cleanup_timer = time::sleep(mempool_cleanup_interval);
×
1456
        tokio::pin!(mempool_cleanup_timer);
×
1457

×
UNCOV
1458
        // Set removal of stale notifications for incoming UTXOs.
×
UNCOV
1459
        let utxo_notification_cleanup_interval =
×
1460
            Duration::from_secs(EXPECTED_UTXOS_PRUNE_INTERVAL_IN_SECS);
×
1461
        let utxo_notification_cleanup_timer = time::sleep(utxo_notification_cleanup_interval);
×
1462
        tokio::pin!(utxo_notification_cleanup_timer);
×
1463

×
1464
        // Set restoration of membership proofs to run every Q seconds.
×
1465
        let mp_resync_interval = Duration::from_secs(MP_RESYNC_INTERVAL_IN_SECS);
×
1466
        let mp_resync_timer = time::sleep(mp_resync_interval);
×
1467
        tokio::pin!(mp_resync_timer);
×
1468

×
UNCOV
1469
        // Set transasction-proof-upgrade-checker to run every R secnods.
×
UNCOV
1470
        let tx_proof_upgrade_interval =
×
1471
            Duration::from_secs(TRANSACTION_UPGRADE_CHECK_INTERVAL_IN_SECONDS);
×
1472
        let tx_proof_upgrade_timer = time::sleep(tx_proof_upgrade_interval);
×
1473
        tokio::pin!(tx_proof_upgrade_timer);
×
1474

×
1475
        // Spawn tasks to monitor for SIGTERM, SIGINT, and SIGQUIT. These
×
1476
        // signals are only used on Unix systems.
×
1477
        let (tx_term, mut rx_term): (mpsc::Sender<()>, mpsc::Receiver<()>) =
×
1478
            tokio::sync::mpsc::channel(2);
×
1479
        let (tx_int, mut rx_int): (mpsc::Sender<()>, mpsc::Receiver<()>) =
×
UNCOV
1480
            tokio::sync::mpsc::channel(2);
×
UNCOV
1481
        let (tx_quit, mut rx_quit): (mpsc::Sender<()>, mpsc::Receiver<()>) =
×
UNCOV
1482
            tokio::sync::mpsc::channel(2);
×
1483
        #[cfg(unix)]
UNCOV
1484
        {
×
1485
            use tokio::signal::unix::signal;
×
1486
            use tokio::signal::unix::SignalKind;
×
1487

1488
            // Monitor for SIGTERM
1489
            let mut sigterm = signal(SignalKind::terminate())?;
×
UNCOV
1490
            tokio::task::Builder::new()
×
UNCOV
1491
                .name("sigterm_handler")
×
UNCOV
1492
                .spawn(async move {
×
1493
                    if sigterm.recv().await.is_some() {
×
1494
                        info!("Received SIGTERM");
×
1495
                        tx_term.send(()).await.unwrap();
×
UNCOV
1496
                    }
×
1497
                })?;
×
1498

1499
            // Monitor for SIGINT
UNCOV
1500
            let mut sigint = signal(SignalKind::interrupt())?;
×
1501
            tokio::task::Builder::new()
×
1502
                .name("sigint_handler")
×
1503
                .spawn(async move {
×
UNCOV
1504
                    if sigint.recv().await.is_some() {
×
UNCOV
1505
                        info!("Received SIGINT");
×
UNCOV
1506
                        tx_int.send(()).await.unwrap();
×
1507
                    }
×
UNCOV
1508
                })?;
×
1509

1510
            // Monitor for SIGQUIT
1511
            let mut sigquit = signal(SignalKind::quit())?;
×
1512
            tokio::task::Builder::new()
×
1513
                .name("sigquit_handler")
×
UNCOV
1514
                .spawn(async move {
×
1515
                    if sigquit.recv().await.is_some() {
×
1516
                        info!("Received SIGQUIT");
×
1517
                        tx_quit.send(()).await.unwrap();
×
1518
                    }
×
1519
                })?;
×
1520
        }
1521

1522
        #[cfg(not(unix))]
1523
        drop((tx_term, tx_int, tx_quit));
1524

1525
        let exit_code: i32 = loop {
×
1526
            select! {
×
1527
                Ok(()) = signal::ctrl_c() => {
×
1528
                    info!("Detected Ctrl+c signal.");
×
1529
                    break SUCCESS_EXIT_CODE;
×
1530
                }
1531

1532
                // Monitor for SIGTERM, SIGINT, and SIGQUIT.
UNCOV
1533
                Some(_) = rx_term.recv() => {
×
1534
                    info!("Detected SIGTERM signal.");
×
1535
                    break SUCCESS_EXIT_CODE;
×
1536
                }
1537
                Some(_) = rx_int.recv() => {
×
UNCOV
1538
                    info!("Detected SIGINT signal.");
×
UNCOV
1539
                    break SUCCESS_EXIT_CODE;
×
1540
                }
1541
                Some(_) = rx_quit.recv() => {
×
1542
                    info!("Detected SIGQUIT signal.");
×
1543
                    break SUCCESS_EXIT_CODE;
×
1544
                }
1545

1546
                // Handle incoming connections from peer
UNCOV
1547
                Ok((stream, peer_address)) = self.incoming_peer_listener.accept() => {
×
1548
                    // Return early if no incoming connections are accepted. Do
1549
                    // not send application-handshake.
1550
                    if self.global_state_lock.cli().disallow_all_incoming_peer_connections() {
×
1551
                        warn!("Got incoming connection despite not accepting any. Ignoring");
×
UNCOV
1552
                        continue;
×
1553
                    }
×
1554

1555
                    let state = self.global_state_lock.lock_guard().await;
×
UNCOV
1556
                    let main_to_peer_broadcast_rx_clone: broadcast::Receiver<MainToPeerTask> = self.main_to_peer_broadcast_tx.subscribe();
×
UNCOV
1557
                    let peer_task_to_main_tx_clone: mpsc::Sender<PeerTaskToMain> = self.peer_task_to_main_tx.clone();
×
UNCOV
1558
                    let own_handshake_data: HandshakeData = state.get_own_handshakedata();
×
UNCOV
1559
                    let global_state_lock = self.global_state_lock.clone(); // bump arc refcount.
×
1560
                    let incoming_peer_task_handle = tokio::task::Builder::new()
×
1561
                        .name("answer_peer_wrapper")
×
UNCOV
1562
                        .spawn(async move {
×
UNCOV
1563
                        match answer_peer(
×
UNCOV
1564
                            stream,
×
1565
                            global_state_lock,
×
1566
                            peer_address,
×
1567
                            main_to_peer_broadcast_rx_clone,
×
1568
                            peer_task_to_main_tx_clone,
×
1569
                            own_handshake_data,
×
UNCOV
1570
                        ).await {
×
UNCOV
1571
                            Ok(()) => (),
×
UNCOV
1572
                            Err(err) => error!("Got error: {:?}", err),
×
1573
                        }
1574
                    })?;
×
1575
                    main_loop_state.task_handles.push(incoming_peer_task_handle);
×
1576
                    main_loop_state.task_handles.retain(|th| !th.is_finished());
×
1577
                }
×
1578

1579
                // Handle messages from peer tasks
1580
                Some(msg) = peer_task_to_main_rx.recv() => {
×
1581
                    debug!("Received message sent to main task.");
×
UNCOV
1582
                    self.handle_peer_task_message(
×
UNCOV
1583
                        msg,
×
1584
                        &mut main_loop_state,
×
UNCOV
1585
                    )
×
UNCOV
1586
                    .await?
×
1587
                }
1588

1589
                // Handle messages from miner task
1590
                Some(main_message) = miner_to_main_rx.recv() => {
×
1591
                    let exit_code = self.handle_miner_task_message(main_message, &mut main_loop_state).await?;
×
1592

UNCOV
1593
                    if let Some(exit_code) = exit_code {
×
UNCOV
1594
                        break exit_code;
×
1595
                    }
×
1596

1597
                }
1598

1599
                // Handle the completion of mempool tx-update jobs after new block.
1600
                Some(ms_updated_transactions) = main_loop_state.update_mempool_receiver.recv() => {
×
1601
                    self.handle_updated_mempool_txs(ms_updated_transactions).await;
×
1602
                }
1603

1604
                // Handle messages from rpc server task
UNCOV
1605
                Some(rpc_server_message) = rpc_server_to_main_rx.recv() => {
×
1606
                    let shutdown_after_execution = self.handle_rpc_server_message(rpc_server_message.clone(), &mut main_loop_state).await?;
×
UNCOV
1607
                    if shutdown_after_execution {
×
UNCOV
1608
                        break SUCCESS_EXIT_CODE
×
UNCOV
1609
                    }
×
1610
                }
1611

1612
                // Handle peer discovery
1613
                _ = &mut peer_discovery_timer => {
×
UNCOV
1614
                    log_slow_scope!(fn_name!() + "::select::peer_discovery_timer");
×
UNCOV
1615

×
UNCOV
1616
                    // Check number of peers we are connected to and connect to more peers
×
UNCOV
1617
                    // if needed.
×
UNCOV
1618
                    debug!("Timer: peer discovery job");
×
UNCOV
1619
                    self.prune_peers().await?;
×
UNCOV
1620
                    self.reconnect(&mut main_loop_state).await?;
×
UNCOV
1621
                    self.discover_peers(&mut main_loop_state).await?;
×
1622

1623
                    // Reset the timer to run this branch again in N seconds
1624
                    peer_discovery_timer.as_mut().reset(tokio::time::Instant::now() + peer_discovery_timer_interval);
×
1625
                }
1626

1627
                // Handle synchronization (i.e. batch-downloading of blocks)
1628
                _ = &mut block_sync_timer => {
×
1629
                    log_slow_scope!(fn_name!() + "::select::block_sync_timer");
×
1630

×
1631
                    trace!("Timer: block-synchronization job");
×
1632
                    self.block_sync(&mut main_loop_state).await?;
×
1633

1634
                    // Reset the timer to run this branch again in M seconds
UNCOV
1635
                    block_sync_timer.as_mut().reset(tokio::time::Instant::now() + block_sync_interval);
×
1636
                }
1637

1638
                // Handle mempool cleanup, i.e. removing stale/too old txs from mempool
1639
                _ = &mut mempool_cleanup_timer => {
×
1640
                    log_slow_scope!(fn_name!() + "::select::mempool_cleanup_timer");
×
1641

×
1642
                    debug!("Timer: mempool-cleaner job");
×
UNCOV
1643
                    self.global_state_lock.lock_guard_mut().await.mempool_prune_stale_transactions().await;
×
1644

1645
                    // Reset the timer to run this branch again in P seconds
UNCOV
1646
                    mempool_cleanup_timer.as_mut().reset(tokio::time::Instant::now() + mempool_cleanup_interval);
×
1647
                }
1648

1649
                // Handle incoming UTXO notification cleanup, i.e. removing stale/too old UTXO notification from pool
1650
                _ = &mut utxo_notification_cleanup_timer => {
×
1651
                    log_slow_scope!(fn_name!() + "::select::utxo_notification_cleanup_timer");
×
UNCOV
1652

×
1653
                    debug!("Timer: UTXO notification pool cleanup job");
×
1654

1655
                    // Danger: possible loss of funds.
1656
                    //
1657
                    // See description of prune_stale_expected_utxos().
1658
                    //
1659
                    // This call is disabled until such time as a thorough
1660
                    // evaluation and perhaps reimplementation determines that
1661
                    // it can be called safely without possible loss of funds.
1662
                    // self.global_state_lock.lock_mut(|s| s.wallet_state.prune_stale_expected_utxos()).await;
1663

1664
                    utxo_notification_cleanup_timer.as_mut().reset(tokio::time::Instant::now() + utxo_notification_cleanup_interval);
×
1665
                }
1666

1667
                // Handle membership proof resynchronization
1668
                _ = &mut mp_resync_timer => {
×
1669
                    log_slow_scope!(fn_name!() + "::select::mp_resync_timer");
×
UNCOV
1670

×
UNCOV
1671
                    debug!("Timer: Membership proof resync job");
×
UNCOV
1672
                    self.global_state_lock.resync_membership_proofs().await?;
×
1673

1674
                    mp_resync_timer.as_mut().reset(tokio::time::Instant::now() + mp_resync_interval);
×
1675
                }
1676

1677
                // Check if it's time to run the proof upgrader
NEW
1678
                _ = &mut tx_proof_upgrade_timer => {
×
UNCOV
1679
                    log_slow_scope!(fn_name!() + "::select::tx_upgrade_proof_timer");
×
UNCOV
1680

×
UNCOV
1681
                    trace!("Timer: tx-proof-upgrader");
×
1682
                    self.proof_upgrader(&mut main_loop_state).await?;
×
1683

1684
                    tx_proof_upgrade_timer.as_mut().reset(tokio::time::Instant::now() + tx_proof_upgrade_interval);
×
1685
                }
1686

1687
            }
1688
        };
1689

UNCOV
1690
        self.graceful_shutdown(main_loop_state.task_handles).await?;
×
UNCOV
1691
        info!("Shutdown completed.");
×
1692

1693
        Ok(exit_code)
×
1694
    }
×
1695

1696
    /// Handle messages from the RPC server. Returns `true` iff the client should shut down
1697
    /// after handling this message.
1698
    async fn handle_rpc_server_message(
×
1699
        &mut self,
×
1700
        msg: RPCServerToMain,
×
1701
        main_loop_state: &mut MutableMainLoopState,
×
1702
    ) -> Result<bool> {
×
1703
        match msg {
×
1704
            RPCServerToMain::BroadcastTx(transaction) => {
×
1705
                debug!(
×
1706
                    "`main` received following transaction from RPC Server. {} inputs, {} outputs. Synced to mutator set hash: {}",
×
1707
                    transaction.kernel.inputs.len(),
×
1708
                    transaction.kernel.outputs.len(),
×
1709
                    transaction.kernel.mutator_set_hash
×
1710
                );
1711

1712
                // insert transaction into mempool
1713
                self.global_state_lock
×
1714
                    .lock_guard_mut()
×
1715
                    .await
×
1716
                    .mempool_insert(*transaction.clone(), TransactionOrigin::Own)
×
UNCOV
1717
                    .await
×
UNCOV
1718
                    .unwrap();
×
1719

1720
                // Is this a transaction we can share with peers? If so, share
1721
                // it immediately.
UNCOV
1722
                if let Ok(notification) = transaction.as_ref().try_into() {
×
UNCOV
1723
                    self.main_to_peer_broadcast_tx
×
1724
                        .send(MainToPeerTask::TransactionNotification(notification))?;
×
1725
                } else {
1726
                    // Otherwise, upgrade its proof quality, and share it by
1727
                    // spinning up the proof upgrader.
1728
                    let TransactionProof::Witness(primitive_witness) = transaction.proof else {
×
1729
                        panic!("Expected Primitive witness. Got: {:?}", transaction.proof);
×
1730
                    };
1731

UNCOV
1732
                    let vm_job_queue = self.global_state_lock.vm_job_queue().clone();
×
1733

×
1734
                    let proving_capability = self.global_state_lock.cli().proving_capability();
×
1735
                    let upgrade_job =
×
1736
                        UpgradeJob::from_primitive_witness(proving_capability, primitive_witness);
×
1737

×
1738
                    // note: handle_upgrade() hands off proving to the
×
1739
                    //       triton-vm job queue and waits for job completion.
×
1740
                    // note: handle_upgrade() broadcasts to peers on success.
×
1741

×
UNCOV
1742
                    let global_state_lock_clone = self.global_state_lock.clone();
×
1743
                    let main_to_peer_broadcast_tx_clone = self.main_to_peer_broadcast_tx.clone();
×
1744
                    let _proof_upgrader_task = tokio::task::Builder::new()
×
UNCOV
1745
                        .name("proof_upgrader")
×
UNCOV
1746
                        .spawn(async move {
×
UNCOV
1747
                        upgrade_job
×
1748
                            .handle_upgrade(
×
UNCOV
1749
                                &vm_job_queue,
×
1750
                                TransactionOrigin::Own,
×
1751
                                true,
×
UNCOV
1752
                                global_state_lock_clone,
×
1753
                                main_to_peer_broadcast_tx_clone,
×
1754
                            )
×
1755
                            .await
×
UNCOV
1756
                    })?;
×
1757

1758
                    // main_loop_state.proof_upgrader_task = Some(proof_upgrader_task);
1759
                    // If transaction could not be shared immediately because
1760
                    // it contains secret data, upgrade its proof-type.
1761
                }
1762

1763
                // do not shut down
1764
                Ok(false)
×
1765
            }
1766
            RPCServerToMain::BroadcastMempoolTransactions => {
UNCOV
1767
                info!("Broadcasting transaction notifications for all shareable transactions in mempool");
×
UNCOV
1768
                let state = self.global_state_lock.lock_guard().await;
×
1769
                let txs = state.mempool.get_sorted_iter().collect_vec();
×
UNCOV
1770
                for (txid, _) in txs {
×
1771
                    // Since a read-lock is held over global state, the
1772
                    // transaction must exist in the mempool.
UNCOV
1773
                    let tx = state
×
UNCOV
1774
                        .mempool
×
1775
                        .get(txid)
×
UNCOV
1776
                        .expect("Transaction from iter must exist in mempool");
×
1777
                    let notification = TransactionNotification::try_from(tx);
×
1778
                    match notification {
×
UNCOV
1779
                        Ok(notification) => {
×
UNCOV
1780
                            self.main_to_peer_broadcast_tx
×
1781
                                .send(MainToPeerTask::TransactionNotification(notification))?;
×
1782
                        }
1783
                        Err(error) => {
×
1784
                            warn!("{error}");
×
1785
                        }
1786
                    };
1787
                }
UNCOV
1788
                Ok(false)
×
1789
            }
1790
            RPCServerToMain::ProofOfWorkSolution(new_block) => {
×
UNCOV
1791
                info!("Handling PoW solution from RPC call");
×
1792

UNCOV
1793
                self.handle_self_guessed_block(main_loop_state, new_block)
×
UNCOV
1794
                    .await?;
×
1795
                Ok(false)
×
1796
            }
1797
            RPCServerToMain::PauseMiner => {
1798
                info!("Received RPC request to stop miner");
×
1799

1800
                self.main_to_miner_tx.send(MainToMiner::StopMining);
×
1801
                Ok(false)
×
1802
            }
1803
            RPCServerToMain::RestartMiner => {
UNCOV
1804
                info!("Received RPC request to start miner");
×
UNCOV
1805
                self.main_to_miner_tx.send(MainToMiner::StartMining);
×
UNCOV
1806
                Ok(false)
×
1807
            }
1808
            RPCServerToMain::Shutdown => {
UNCOV
1809
                info!("Received RPC shutdown request.");
×
1810

1811
                // shut down
UNCOV
1812
                Ok(true)
×
1813
            }
1814
        }
UNCOV
1815
    }
×
1816

UNCOV
1817
    async fn graceful_shutdown(&mut self, task_handles: Vec<JoinHandle<()>>) -> Result<()> {
×
UNCOV
1818
        info!("Shutdown initiated.");
×
1819

1820
        // Stop mining
UNCOV
1821
        self.main_to_miner_tx.send(MainToMiner::Shutdown);
×
UNCOV
1822

×
UNCOV
1823
        // Send 'bye' message to all peers.
×
UNCOV
1824
        let _result = self
×
UNCOV
1825
            .main_to_peer_broadcast_tx
×
UNCOV
1826
            .send(MainToPeerTask::DisconnectAll());
×
UNCOV
1827
        debug!("sent bye");
×
1828

1829
        // Flush all databases
UNCOV
1830
        self.global_state_lock.flush_databases().await?;
×
1831

UNCOV
1832
        tokio::time::sleep(Duration::from_millis(50)).await;
×
1833

1834
        // Child processes should have finished by now. If not, abort them violently.
UNCOV
1835
        task_handles.iter().for_each(|jh| jh.abort());
×
UNCOV
1836

×
UNCOV
1837
        // wait for all to finish.
×
UNCOV
1838
        futures::future::join_all(task_handles).await;
×
1839

UNCOV
1840
        Ok(())
×
UNCOV
1841
    }
×
1842
}
1843

1844
#[cfg(test)]
1845
mod test {
1846
    use std::str::FromStr;
1847
    use std::time::UNIX_EPOCH;
1848

1849
    use tracing_test::traced_test;
1850

1851
    use super::*;
1852
    use crate::config_models::cli_args;
1853
    use crate::config_models::network::Network;
1854
    use crate::tests::shared::get_dummy_peer_incoming;
1855
    use crate::tests::shared::get_test_genesis_setup;
1856
    use crate::tests::shared::invalid_empty_block;
1857
    use crate::MINER_CHANNEL_CAPACITY;
1858

1859
    struct TestSetup {
1860
        peer_to_main_rx: mpsc::Receiver<PeerTaskToMain>,
1861
        miner_to_main_rx: mpsc::Receiver<MinerToMain>,
1862
        rpc_server_to_main_rx: mpsc::Receiver<RPCServerToMain>,
1863
        task_join_handles: Vec<JoinHandle<()>>,
1864
        main_loop_handler: MainLoopHandler,
1865
        main_to_peer_rx: broadcast::Receiver<MainToPeerTask>,
1866
    }
1867

1868
    async fn setup(num_init_peers_outgoing: u8, num_peers_incoming: u8) -> TestSetup {
10✔
1869
        const CHANNEL_CAPACITY_MINER_TO_MAIN: usize = 10;
1870

1871
        let network = Network::Main;
10✔
1872
        let (
1873
            main_to_peer_tx,
10✔
1874
            main_to_peer_rx,
10✔
1875
            peer_to_main_tx,
10✔
1876
            peer_to_main_rx,
10✔
1877
            mut state,
10✔
1878
            _own_handshake_data,
10✔
1879
        ) = get_test_genesis_setup(network, num_init_peers_outgoing, cli_args::Args::default())
10✔
1880
            .await
10✔
1881
            .unwrap();
10✔
1882
        assert!(
10✔
1883
            state
10✔
1884
                .lock_guard()
10✔
1885
                .await
10✔
1886
                .net
1887
                .peer_map
1888
                .iter()
10✔
1889
                .all(|(_addr, peer)| peer.connection_is_outbound()),
51✔
UNCOV
1890
            "Test assumption: All initial peers must represent outgoing connections."
×
1891
        );
1892

1893
        for i in 0..num_peers_incoming {
26✔
1894
            let peer_address =
26✔
1895
                std::net::SocketAddr::from_str(&format!("255.254.253.{}:8080", i)).unwrap();
26✔
1896
            state
26✔
1897
                .lock_guard_mut()
26✔
1898
                .await
26✔
1899
                .net
1900
                .peer_map
1901
                .insert(peer_address, get_dummy_peer_incoming(peer_address));
26✔
1902
        }
1903

1904
        let incoming_peer_listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
10✔
1905

10✔
1906
        let (main_to_miner_tx, _main_to_miner_rx) =
10✔
1907
            mpsc::channel::<MainToMiner>(MINER_CHANNEL_CAPACITY);
10✔
1908
        let (_miner_to_main_tx, miner_to_main_rx) =
10✔
1909
            mpsc::channel::<MinerToMain>(CHANNEL_CAPACITY_MINER_TO_MAIN);
10✔
1910
        let (_rpc_server_to_main_tx, rpc_server_to_main_rx) =
10✔
1911
            mpsc::channel::<RPCServerToMain>(CHANNEL_CAPACITY_MINER_TO_MAIN);
10✔
1912

10✔
1913
        let main_loop_handler = MainLoopHandler::new(
10✔
1914
            incoming_peer_listener,
10✔
1915
            state,
10✔
1916
            main_to_peer_tx,
10✔
1917
            peer_to_main_tx,
10✔
1918
            main_to_miner_tx,
10✔
1919
        );
10✔
1920

10✔
1921
        let task_join_handles = vec![];
10✔
1922

10✔
1923
        TestSetup {
10✔
1924
            miner_to_main_rx,
10✔
1925
            peer_to_main_rx,
10✔
1926
            rpc_server_to_main_rx,
10✔
1927
            task_join_handles,
10✔
1928
            main_loop_handler,
10✔
1929
            main_to_peer_rx,
10✔
1930
        }
10✔
1931
    }
10✔
1932

1933
    #[tokio::test]
1934
    async fn handle_self_guessed_block_new_tip() {
1✔
1935
        // A new tip is registered by main_loop. Verify correct state update.
1✔
1936
        let test_setup = setup(1, 0).await;
1✔
1937
        let TestSetup {
1✔
1938
            task_join_handles,
1✔
1939
            mut main_loop_handler,
1✔
1940
            mut main_to_peer_rx,
1✔
1941
            ..
1✔
1942
        } = test_setup;
1✔
1943
        let cli_args = main_loop_handler.global_state_lock.cli();
1✔
1944
        let mut mutable_main_loop_state = MutableMainLoopState::new(cli_args, task_join_handles);
1✔
1945

1✔
1946
        let network = main_loop_handler.global_state_lock.cli().network;
1✔
1947
        let block1 = invalid_empty_block(&Block::genesis(network));
1✔
1948

1✔
1949
        assert!(
1✔
1950
            main_loop_handler
1✔
1951
                .global_state_lock
1✔
1952
                .lock_guard()
1✔
1953
                .await
1✔
1954
                .chain
1✔
1955
                .light_state()
1✔
1956
                .header()
1✔
1957
                .height
1✔
1958
                .is_genesis(),
1✔
1959
            "Tip must be genesis prior to handling of new block"
1✔
1960
        );
1✔
1961

1✔
1962
        let block1 = Box::new(block1);
1✔
1963
        main_loop_handler
1✔
1964
            .handle_self_guessed_block(&mut mutable_main_loop_state, block1.clone())
1✔
1965
            .await
1✔
1966
            .unwrap();
1✔
1967
        let new_block_height: u64 = main_loop_handler
1✔
1968
            .global_state_lock
1✔
1969
            .lock_guard()
1✔
1970
            .await
1✔
1971
            .chain
1✔
1972
            .light_state()
1✔
1973
            .header()
1✔
1974
            .height
1✔
1975
            .into();
1✔
1976
        assert_eq!(
1✔
1977
            1u64, new_block_height,
1✔
1978
            "Tip height must be 1 after handling of new block"
1✔
1979
        );
1✔
1980
        let msg_to_peer_loops = main_to_peer_rx.recv().await.unwrap();
1✔
1981
        if let MainToPeerTask::Block(block_to_peers) = msg_to_peer_loops {
1✔
1982
            assert_eq!(
1✔
1983
                block1, block_to_peers,
1✔
1984
                "Peer loops must have received block 1"
1✔
1985
            );
1✔
1986
        } else {
1✔
1987
            panic!("Must have sent block notification to peer loops")
1✔
1988
        }
1✔
1989
    }
1✔
1990

1991
    mod sync_mode {
1992
        use tasm_lib::twenty_first::util_types::mmr::mmr_accumulator::MmrAccumulator;
1993
        use test_strategy::proptest;
1994

1995
        use super::*;
1996
        use crate::tests::shared::get_dummy_socket_address;
1997

1998
        #[proptest]
256✔
1999
        fn batch_request_heights_prop(#[strategy(0u64..100_000_000_000)] own_height: u64) {
1✔
2000
            batch_request_heights_sanity(own_height);
2001
        }
2002

2003
        #[test]
2004
        fn batch_request_heights_unit() {
1✔
2005
            let own_height = 1_000_000u64;
1✔
2006
            batch_request_heights_sanity(own_height);
1✔
2007
        }
1✔
2008

2009
        fn batch_request_heights_sanity(own_height: u64) {
257✔
2010
            let heights = MainLoopHandler::batch_request_uca_candidate_heights(own_height.into());
257✔
2011

257✔
2012
            let mut heights_rev = heights.clone();
257✔
2013
            heights_rev.reverse();
257✔
2014
            assert!(
257✔
2015
                heights_rev.is_sorted(),
257✔
UNCOV
2016
                "Heights must be sorted from high-to-low"
×
2017
            );
2018

2019
            heights_rev.dedup();
257✔
2020
            assert_eq!(heights_rev.len(), heights.len(), "duplicates");
257✔
2021

2022
            assert_eq!(heights[0], own_height.into(), "starts with own tip height");
257✔
2023
            assert!(
257✔
2024
                heights.last().unwrap().is_genesis(),
257✔
UNCOV
2025
                "ends with genesis block"
×
2026
            );
2027
        }
257✔
2028

2029
        #[tokio::test]
UNCOV
2030
        #[traced_test]
×
2031
        async fn sync_mode_abandoned_on_global_timeout() {
1✔
2032
            let num_outgoing_connections = 0;
1✔
2033
            let num_incoming_connections = 0;
1✔
2034
            let test_setup = setup(num_outgoing_connections, num_incoming_connections).await;
1✔
2035
            let TestSetup {
2036
                task_join_handles,
1✔
2037
                mut main_loop_handler,
1✔
2038
                ..
1✔
2039
            } = test_setup;
1✔
2040

1✔
2041
            let cli_args = main_loop_handler.global_state_lock.cli();
1✔
2042
            let mut mutable_main_loop_state =
1✔
2043
                MutableMainLoopState::new(cli_args, task_join_handles);
1✔
2044

1✔
2045
            main_loop_handler
1✔
2046
                .block_sync(&mut mutable_main_loop_state)
1✔
2047
                .await
1✔
2048
                .expect("Must return OK when no sync mode is set");
1✔
2049

1✔
2050
            // Mock that we are in a valid sync state
1✔
2051
            let claimed_max_height = 1_000u64.into();
1✔
2052
            let claimed_max_pow = ProofOfWork::new([100; 6]);
1✔
2053
            main_loop_handler
1✔
2054
                .global_state_lock
1✔
2055
                .lock_guard_mut()
1✔
2056
                .await
1✔
2057
                .net
1✔
2058
                .sync_anchor = Some(SyncAnchor::new(
1✔
2059
                claimed_max_pow,
1✔
2060
                MmrAccumulator::new_from_leafs(vec![]),
1✔
2061
            ));
1✔
2062
            mutable_main_loop_state.sync_state.peer_sync_states.insert(
1✔
2063
                get_dummy_socket_address(0),
1✔
2064
                PeerSynchronizationState::new(claimed_max_height, claimed_max_pow),
1✔
2065
            );
1✔
2066

2067
            let sync_start_time = main_loop_handler
1✔
2068
                .global_state_lock
1✔
2069
                .lock_guard()
1✔
2070
                .await
1✔
2071
                .net
2072
                .sync_anchor
2073
                .as_ref()
1✔
2074
                .unwrap()
1✔
2075
                .updated;
1✔
2076
            main_loop_handler
1✔
2077
                .block_sync(&mut mutable_main_loop_state)
1✔
2078
                .await
1✔
2079
                .expect("Must return OK when sync mode has not timed out yet");
1✔
2080
            assert!(
1✔
2081
                main_loop_handler
1✔
2082
                    .global_state_lock
1✔
2083
                    .lock_guard()
1✔
2084
                    .await
1✔
2085
                    .net
2086
                    .sync_anchor
2087
                    .is_some(),
1✔
UNCOV
2088
                "Sync mode must still be set before timeout has occurred"
×
2089
            );
2090

2091
            assert_eq!(
1✔
2092
                sync_start_time,
1✔
2093
                main_loop_handler
1✔
2094
                    .global_state_lock
1✔
2095
                    .lock_guard()
1✔
2096
                    .await
1✔
2097
                    .net
2098
                    .sync_anchor
2099
                    .as_ref()
1✔
2100
                    .unwrap()
1✔
2101
                    .updated,
UNCOV
2102
                "timestamp may not be updated without state change"
×
2103
            );
2104

2105
            // Mock that sync-mode has timed out
2106
            main_loop_handler = main_loop_handler.with_mocked_time(
1✔
2107
                SystemTime::now()
1✔
2108
                    + Duration::from_secs(GLOBAL_SYNCHRONIZATION_TIMEOUT_IN_SECONDS + 1),
1✔
2109
            );
1✔
2110

1✔
2111
            main_loop_handler
1✔
2112
                .block_sync(&mut mutable_main_loop_state)
1✔
2113
                .await
1✔
2114
                .expect("Must return OK when sync mode has timed out");
1✔
2115
            assert!(
1✔
2116
                main_loop_handler
1✔
2117
                    .global_state_lock
1✔
2118
                    .lock_guard()
1✔
2119
                    .await
1✔
2120
                    .net
1✔
2121
                    .sync_anchor
1✔
2122
                    .is_none(),
1✔
2123
                "Sync mode must be unset on timeout"
1✔
2124
            );
1✔
2125
        }
1✔
2126
    }
2127

2128
    mod proof_upgrader {
2129
        use super::*;
2130
        use crate::job_queue::triton_vm::TritonVmJobQueue;
2131
        use crate::models::blockchain::transaction::Transaction;
2132
        use crate::models::blockchain::transaction::TransactionProof;
2133
        use crate::models::blockchain::type_scripts::native_currency_amount::NativeCurrencyAmount;
2134
        use crate::models::peer::transfer_transaction::TransactionProofQuality;
2135
        use crate::models::proof_abstractions::timestamp::Timestamp;
2136
        use crate::models::state::wallet::utxo_notification::UtxoNotificationMedium;
2137

2138
        async fn tx_no_outputs(
1✔
2139
            global_state_lock: &GlobalStateLock,
1✔
2140
            tx_proof_type: TxProvingCapability,
1✔
2141
            fee: NativeCurrencyAmount,
1✔
2142
        ) -> Transaction {
1✔
2143
            let change_key = global_state_lock
1✔
2144
                .lock_guard()
1✔
2145
                .await
1✔
2146
                .wallet_state
2147
                .wallet_entropy
2148
                .nth_generation_spending_key_for_tests(0);
1✔
2149
            let in_seven_months = global_state_lock
1✔
2150
                .lock_guard()
1✔
2151
                .await
1✔
2152
                .chain
2153
                .light_state()
1✔
2154
                .header()
1✔
2155
                .timestamp
1✔
2156
                + Timestamp::months(7);
1✔
2157

2158
            let global_state = global_state_lock.lock_guard().await;
1✔
2159
            global_state
1✔
2160
                .create_transaction_with_prover_capability(
1✔
2161
                    vec![].into(),
1✔
2162
                    change_key.into(),
1✔
2163
                    UtxoNotificationMedium::OffChain,
1✔
2164
                    fee,
1✔
2165
                    in_seven_months,
1✔
2166
                    tx_proof_type,
1✔
2167
                    &TritonVmJobQueue::dummy(),
1✔
2168
                )
1✔
2169
                .await
1✔
2170
                .unwrap()
1✔
2171
                .0
1✔
2172
        }
1✔
2173

2174
        #[tokio::test]
UNCOV
2175
        #[traced_test]
×
2176
        async fn upgrade_proof_collection_to_single_proof_foreign_tx() {
1✔
2177
            let num_outgoing_connections = 0;
1✔
2178
            let num_incoming_connections = 0;
1✔
2179
            let test_setup = setup(num_outgoing_connections, num_incoming_connections).await;
1✔
2180
            let TestSetup {
2181
                peer_to_main_rx,
1✔
2182
                miner_to_main_rx,
1✔
2183
                rpc_server_to_main_rx,
1✔
2184
                task_join_handles,
1✔
2185
                mut main_loop_handler,
1✔
2186
                mut main_to_peer_rx,
1✔
2187
            } = test_setup;
1✔
2188

1✔
2189
            // Force instance to create SingleProofs, otherwise CI and other
1✔
2190
            // weak machines fail.
1✔
2191
            let mocked_cli = cli_args::Args {
1✔
2192
                tx_proving_capability: Some(TxProvingCapability::SingleProof),
1✔
2193
                tx_proof_upgrade_interval: 100, // seconds
1✔
2194
                ..Default::default()
1✔
2195
            };
1✔
2196

1✔
2197
            main_loop_handler
1✔
2198
                .global_state_lock
1✔
2199
                .set_cli(mocked_cli)
1✔
2200
                .await;
1✔
2201
            let mut main_loop_handler = main_loop_handler.with_mocked_time(SystemTime::now());
1✔
2202
            let cli_args = main_loop_handler.global_state_lock.cli();
1✔
2203
            let mut mutable_main_loop_state =
1✔
2204
                MutableMainLoopState::new(cli_args, task_join_handles);
1✔
2205

1✔
2206
            assert!(
1✔
2207
                main_loop_handler
1✔
2208
                    .proof_upgrader(&mut mutable_main_loop_state)
1✔
2209
                    .await
1✔
2210
                    .is_ok(),
1✔
UNCOV
2211
                "Scheduled task returns OK when run on empty mempool"
×
2212
            );
2213

2214
            let fee = NativeCurrencyAmount::coins(1);
1✔
2215
            let proof_collection_tx = tx_no_outputs(
1✔
2216
                &main_loop_handler.global_state_lock,
1✔
2217
                TxProvingCapability::ProofCollection,
1✔
2218
                fee,
1✔
2219
            )
1✔
2220
            .await;
1✔
2221

2222
            main_loop_handler
1✔
2223
                .global_state_lock
1✔
2224
                .lock_guard_mut()
1✔
2225
                .await
1✔
2226
                .mempool_insert(proof_collection_tx.clone(), TransactionOrigin::Foreign)
1✔
2227
                .await
1✔
2228
                .unwrap();
1✔
2229

1✔
2230
            assert!(
1✔
2231
                main_loop_handler
1✔
2232
                    .proof_upgrader(&mut mutable_main_loop_state)
1✔
2233
                    .await
1✔
2234
                    .is_ok(),
1✔
UNCOV
2235
                "Scheduled task returns OK when it's not yet time to upgrade"
×
2236
            );
2237

2238
            assert!(
1✔
UNCOV
2239
                matches!(
×
2240
                    main_loop_handler
1✔
2241
                        .global_state_lock
1✔
2242
                        .lock_guard()
1✔
2243
                        .await
1✔
2244
                        .mempool
2245
                        .get(proof_collection_tx.kernel.txid())
1✔
2246
                        .unwrap()
1✔
2247
                        .proof,
2248
                    TransactionProof::ProofCollection(_)
2249
                ),
UNCOV
2250
                "Proof in mempool must still be of type proof collection"
×
2251
            );
2252

2253
            // Mock that enough time has passed to perform the upgrade. Then
2254
            // perform the upgrade.
2255
            let mut main_loop_handler =
1✔
2256
                main_loop_handler.with_mocked_time(SystemTime::now() + Duration::from_secs(300));
1✔
2257
            assert!(
1✔
2258
                main_loop_handler
1✔
2259
                    .proof_upgrader(&mut mutable_main_loop_state)
1✔
2260
                    .await
1✔
2261
                    .is_ok(),
1✔
2262
                "Scheduled task must return OK when it's time to upgrade"
×
2263
            );
2264

2265
            // Wait for upgrade task to finish.
2266
            let handle = mutable_main_loop_state.proof_upgrader_task.unwrap().await;
1✔
2267
            assert!(
1✔
2268
                handle.is_ok(),
1✔
UNCOV
2269
                "Proof-upgrade task must finish successfully."
×
2270
            );
2271

2272
            // At this point there should be one transaction in the mempool,
2273
            // which is (if all is well) the merger of the ProofCollection
2274
            // transaction inserted above and one of the upgrader's fee
2275
            // gobblers. The point is that this transaction is a SingleProof
2276
            // transaction, so test that.
2277

2278
            let (merged_txid, _) = main_loop_handler
1✔
2279
                .global_state_lock
1✔
2280
                .lock_guard()
1✔
2281
                .await
1✔
2282
                .mempool
2283
                .get_sorted_iter()
1✔
2284
                .next_back()
1✔
2285
                .expect("mempool should contain one item here");
1✔
2286

1✔
2287
            assert!(
1✔
UNCOV
2288
                matches!(
×
2289
                    main_loop_handler
1✔
2290
                        .global_state_lock
1✔
2291
                        .lock_guard()
1✔
2292
                        .await
1✔
2293
                        .mempool
2294
                        .get(merged_txid)
1✔
2295
                        .unwrap()
1✔
2296
                        .proof,
2297
                    TransactionProof::SingleProof(_)
2298
                ),
UNCOV
2299
                "Proof in mempool must now be of type single proof"
×
2300
            );
2301

2302
            match main_to_peer_rx.recv().await {
1✔
2303
                Ok(MainToPeerTask::TransactionNotification(tx_noti)) => {
1✔
2304
                    assert_eq!(merged_txid, tx_noti.txid);
1✔
2305
                    assert_eq!(TransactionProofQuality::SingleProof, tx_noti.proof_quality);
1✔
2306
                }
UNCOV
2307
                other => panic!("Must have sent transaction notification to peer loop after successful proof upgrade. Got:\n{other:?}"),
×
2308
            }
2309

2310
            // These values are kept alive as the transmission-counterpart will
2311
            // otherwise fail on `send`.
2312
            drop(peer_to_main_rx);
1✔
2313
            drop(miner_to_main_rx);
1✔
2314
            drop(rpc_server_to_main_rx);
1✔
2315
            drop(main_to_peer_rx);
1✔
2316
        }
1✔
2317
    }
2318

2319
    mod peer_discovery {
2320
        use super::*;
2321
        use crate::models::peer::bootstrap_info::BootstrapInfo;
2322
        use crate::tests::shared::get_dummy_socket_address;
2323

2324
        #[tokio::test]
UNCOV
2325
        #[traced_test]
×
2326
        async fn prune_peers_too_many_connections() {
1✔
2327
            let num_init_peers_outgoing = 10;
1✔
2328
            let num_init_peers_incoming = 4;
1✔
2329
            let test_setup = setup(num_init_peers_outgoing, num_init_peers_incoming).await;
1✔
2330
            let TestSetup {
2331
                mut main_to_peer_rx,
1✔
2332
                mut main_loop_handler,
1✔
2333
                ..
1✔
2334
            } = test_setup;
1✔
2335

1✔
2336
            let mocked_cli = cli_args::Args {
1✔
2337
                max_num_peers: num_init_peers_outgoing as usize,
1✔
2338
                ..Default::default()
1✔
2339
            };
1✔
2340

1✔
2341
            main_loop_handler
1✔
2342
                .global_state_lock
1✔
2343
                .set_cli(mocked_cli)
1✔
2344
                .await;
1✔
2345

2346
            main_loop_handler.prune_peers().await.unwrap();
1✔
2347
            assert_eq!(4, main_to_peer_rx.len());
1✔
2348
            for _ in 0..4 {
5✔
2349
                let peer_msg = main_to_peer_rx.recv().await.unwrap();
4✔
2350
                assert!(matches!(peer_msg, MainToPeerTask::Disconnect(_)))
4✔
2351
            }
1✔
2352
        }
1✔
2353

2354
        #[tokio::test]
UNCOV
2355
        #[traced_test]
×
2356
        async fn prune_peers_not_too_many_connections() {
1✔
2357
            let num_init_peers_outgoing = 10;
1✔
2358
            let num_init_peers_incoming = 1;
1✔
2359
            let test_setup = setup(num_init_peers_outgoing, num_init_peers_incoming).await;
1✔
2360
            let TestSetup {
2361
                main_to_peer_rx,
1✔
2362
                mut main_loop_handler,
1✔
2363
                ..
1✔
2364
            } = test_setup;
1✔
2365

1✔
2366
            let mocked_cli = cli_args::Args {
1✔
2367
                max_num_peers: 200,
1✔
2368
                ..Default::default()
1✔
2369
            };
1✔
2370

1✔
2371
            main_loop_handler
1✔
2372
                .global_state_lock
1✔
2373
                .set_cli(mocked_cli)
1✔
2374
                .await;
1✔
2375

2376
            main_loop_handler.prune_peers().await.unwrap();
1✔
2377
            assert!(main_to_peer_rx.is_empty());
1✔
2378
        }
1✔
2379

2380
        #[tokio::test]
2381
        #[traced_test]
1✔
2382
        async fn skip_peer_discovery_if_peer_limit_is_exceeded() {
1✔
2383
            let num_init_peers_outgoing = 2;
1✔
2384
            let num_init_peers_incoming = 0;
1✔
2385
            let test_setup = setup(num_init_peers_outgoing, num_init_peers_incoming).await;
1✔
2386
            let TestSetup {
2387
                task_join_handles,
1✔
2388
                mut main_loop_handler,
1✔
2389
                ..
1✔
2390
            } = test_setup;
1✔
2391

1✔
2392
            let mocked_cli = cli_args::Args {
1✔
2393
                max_num_peers: 0,
1✔
2394
                ..Default::default()
1✔
2395
            };
1✔
2396
            main_loop_handler
1✔
2397
                .global_state_lock
1✔
2398
                .set_cli(mocked_cli)
1✔
2399
                .await;
1✔
2400
            let cli_args = main_loop_handler.global_state_lock.cli();
1✔
2401
            main_loop_handler
1✔
2402
                .discover_peers(&mut MutableMainLoopState::new(cli_args, task_join_handles))
1✔
2403
                .await
1✔
2404
                .unwrap();
1✔
2405

1✔
2406
            assert!(logs_contain("Skipping peer discovery."));
1✔
2407
        }
1✔
2408

2409
        #[tokio::test]
2410
        #[traced_test]
1✔
2411
        async fn performs_peer_discovery_on_few_connections() {
1✔
2412
            let num_init_peers_outgoing = 2;
1✔
2413
            let num_init_peers_incoming = 0;
1✔
2414
            let TestSetup {
2415
                task_join_handles,
1✔
2416
                mut main_loop_handler,
1✔
2417
                mut main_to_peer_rx,
1✔
2418
                peer_to_main_rx: _keep_channel_open,
1✔
2419
                ..
2420
            } = setup(num_init_peers_outgoing, num_init_peers_incoming).await;
1✔
2421

2422
            // Set CLI to attempt to make more connections
2423
            let mocked_cli = cli_args::Args {
1✔
2424
                max_num_peers: 10,
1✔
2425
                ..Default::default()
1✔
2426
            };
1✔
2427
            main_loop_handler
1✔
2428
                .global_state_lock
1✔
2429
                .set_cli(mocked_cli)
1✔
2430
                .await;
1✔
2431
            let cli_args = main_loop_handler.global_state_lock.cli();
1✔
2432
            main_loop_handler
1✔
2433
                .discover_peers(&mut MutableMainLoopState::new(cli_args, task_join_handles))
1✔
2434
                .await
1✔
2435
                .unwrap();
1✔
2436

1✔
2437
            let peer_discovery_sent_messages_on_peer_channel = main_to_peer_rx.try_recv().is_ok();
1✔
2438
            assert!(peer_discovery_sent_messages_on_peer_channel);
1✔
2439
            assert!(logs_contain("Performing peer discovery"));
1✔
2440
        }
1✔
2441

2442
        #[tokio::test]
UNCOV
2443
        #[traced_test]
×
2444
        async fn poorly_connected_node_tries_to_connect_to_bootstrap_node() {
1✔
2445
            let mut global_state_lock = setup(1, 1).await.main_loop_handler.global_state_lock;
1✔
2446
            let cli_args = cli_args::Args {
1✔
2447
                max_num_peers: 100,
1✔
2448
                ..Default::default()
1✔
2449
            };
1✔
2450
            global_state_lock.set_cli(cli_args).await;
1✔
2451
            let (peer_candidate, peer_state) =
1✔
2452
                set_up_peer_candidate_test(global_state_lock.clone()).await;
1✔
2453

2454
            let net_state = &global_state_lock.lock_guard().await.net;
1✔
2455
            let chosen_candidate = peer_state.peer_candidate(net_state);
1✔
2456
            assert_eq!(Some(peer_candidate), chosen_candidate);
1✔
2457
        }
1✔
2458

2459
        #[tokio::test]
UNCOV
2460
        #[traced_test]
×
2461
        async fn well_connected_node_does_not_try_to_connect_to_bootstrap_nodes() {
1✔
2462
            let mut global_state_lock = setup(20, 20).await.main_loop_handler.global_state_lock;
1✔
2463
            let cli_args = cli_args::Args {
1✔
2464
                max_num_peers: 42,
1✔
2465
                ..Default::default()
1✔
2466
            };
1✔
2467
            global_state_lock.set_cli(cli_args).await;
1✔
2468
            let (_, peer_state) = set_up_peer_candidate_test(global_state_lock.clone()).await;
1✔
2469

2470
            let net_state = &global_state_lock.lock_guard().await.net;
1✔
2471
            let chosen_candidate = peer_state.peer_candidate(net_state);
1✔
2472
            assert!(chosen_candidate.is_none());
1✔
2473
        }
1✔
2474

2475
        /// Create a [PotentialPeersState] and [add](PotentialPeersState::add)
2476
        /// one [peer candidate](PeerCandidate) that is a
2477
        /// [bootstrap node](BootstrapStatus::Bootstrap). The added peer
2478
        /// candidate as well as the resulting [PotentialPeersState] are
2479
        /// returned.
2480
        ///
2481
        ///   * acquires `global_state_lock` for write
2482
        async fn set_up_peer_candidate_test(
2✔
2483
            mut global_state_lock: GlobalStateLock,
2✔
2484
        ) -> (PeerCandidate, PotentialPeersState) {
2✔
2485
            // try to avoid collisions: peer ids are implicit in the setup
2✔
2486
            let peer_id = 255;
2✔
2487
            let peer_socket_address = get_dummy_socket_address(peer_id as u8);
2✔
2488
            let peer_bootstrap_info = BootstrapInfo::new(BootstrapStatus::Bootstrap);
2✔
2489
            global_state_lock
2✔
2490
                .lock_guard_mut()
2✔
2491
                .await
2✔
2492
                .net
2493
                .bootstrap_status
2494
                .insert(peer_socket_address, peer_bootstrap_info);
2✔
2495

2✔
2496
            let peer_candidate = PeerCandidate::new(peer_socket_address, peer_id, 1);
2✔
2497
            let mut peer_state = PotentialPeersState::new(global_state_lock.cli());
2✔
2498
            peer_state.add(peer_candidate);
2✔
2499
            (peer_candidate, peer_state)
2✔
2500
        }
2✔
2501
    }
2502

2503
    #[test]
2504
    fn older_systemtime_ranks_first() {
1✔
2505
        let start = UNIX_EPOCH;
1✔
2506
        let other = UNIX_EPOCH + Duration::from_secs(1000);
1✔
2507
        let mut instants = [start, other];
1✔
2508

1✔
2509
        assert_eq!(
1✔
2510
            start,
1✔
2511
            instants.iter().copied().min_by(|l, r| l.cmp(r)).unwrap()
1✔
2512
        );
1✔
2513

2514
        instants.reverse();
1✔
2515

1✔
2516
        assert_eq!(
1✔
2517
            start,
1✔
2518
            instants.iter().copied().min_by(|l, r| l.cmp(r)).unwrap()
1✔
2519
        );
1✔
2520
    }
1✔
2521

2522
    mod bootstrap_mode {
2523
        use rand::Rng;
2524

2525
        use super::*;
2526
        use crate::models::peer::PeerMessage;
2527
        use crate::models::peer::TransferConnectionStatus;
2528
        use crate::tests::shared::get_dummy_peer_connection_data_genesis;
2529
        use crate::tests::shared::to_bytes;
2530

2531
        #[tokio::test]
UNCOV
2532
        #[traced_test]
×
2533
        async fn disconnect_from_oldest_peer_upon_connection_request() {
1✔
2534
            // Set up a node in bootstrap mode and connected to a given
1✔
2535
            // number of peers, which is one less than the maximum. Initiate a
1✔
2536
            // connection request. Verify that the oldest of the existing
1✔
2537
            // connections is dropped.
1✔
2538

1✔
2539
            let network = Network::Main;
1✔
2540
            let num_init_peers_outgoing = 5;
1✔
2541
            let num_init_peers_incoming = 0;
1✔
2542
            let test_setup = setup(num_init_peers_outgoing, num_init_peers_incoming).await;
1✔
2543
            let TestSetup {
2544
                mut peer_to_main_rx,
1✔
2545
                task_join_handles,
1✔
2546
                mut main_loop_handler,
1✔
2547
                mut main_to_peer_rx,
1✔
2548
                ..
1✔
2549
            } = test_setup;
1✔
2550

1✔
2551
            let mocked_cli = cli_args::Args {
1✔
2552
                max_num_peers: usize::from(num_init_peers_outgoing) + 1,
1✔
2553
                bootstrap: true,
1✔
2554
                network,
1✔
2555
                ..Default::default()
1✔
2556
            };
1✔
2557
            main_loop_handler
1✔
2558
                .global_state_lock
1✔
2559
                .set_cli(mocked_cli)
1✔
2560
                .await;
1✔
2561

2562
            let cli_args = main_loop_handler.global_state_lock.cli();
1✔
2563
            let mut mutable_main_loop_state =
1✔
2564
                MutableMainLoopState::new(cli_args, task_join_handles);
1✔
2565

1✔
2566
            // check sanity: at startup, we are connected to the initial number of peers
1✔
2567
            assert_eq!(
1✔
2568
                usize::from(num_init_peers_outgoing),
1✔
2569
                main_loop_handler
1✔
2570
                    .global_state_lock
1✔
2571
                    .lock_guard()
1✔
2572
                    .await
1✔
2573
                    .net
2574
                    .peer_map
2575
                    .len()
1✔
2576
            );
2577

2578
            // randomize "connection established" timestamps
2579
            let random_offset = || {
5✔
2580
                let unix_epoch_to_now_in_millis = SystemTime::now()
5✔
2581
                    .duration_since(UNIX_EPOCH)
5✔
2582
                    .unwrap()
5✔
2583
                    .as_millis() as u64;
5✔
2584
                Duration::from_millis(rand::rng().random_range(0..unix_epoch_to_now_in_millis))
5✔
2585
            };
5✔
2586
            main_loop_handler
1✔
2587
                .global_state_lock
1✔
2588
                .lock_guard_mut()
1✔
2589
                .await
1✔
2590
                .net
2591
                .peer_map
2592
                .iter_mut()
1✔
2593
                .for_each(|(_, peer_info)| {
5✔
2594
                    peer_info.set_connection_established(UNIX_EPOCH + random_offset());
5✔
2595
                });
5✔
2596

2597
            // compute which peer will be dropped, for later reference
2598
            let expected_drop_peer_socket_address = main_loop_handler
1✔
2599
                .global_state_lock
1✔
2600
                .lock_guard()
1✔
2601
                .await
1✔
2602
                .net
2603
                .peer_map
2604
                .iter()
1✔
2605
                .min_by(|(_, left), (_, right)| {
4✔
2606
                    left.connection_established()
4✔
2607
                        .cmp(&right.connection_established())
4✔
2608
                })
4✔
2609
                .map(|(socket_address, _)| socket_address)
1✔
2610
                .copied()
1✔
2611
                .unwrap();
1✔
2612

1✔
2613
            // simulate incoming connection
1✔
2614
            let (peer_handshake_data, peer_socket_address) =
1✔
2615
                get_dummy_peer_connection_data_genesis(network, 1);
1✔
2616
            let own_handshake_data = main_loop_handler
1✔
2617
                .global_state_lock
1✔
2618
                .lock_guard()
1✔
2619
                .await
1✔
2620
                .get_own_handshakedata();
1✔
2621
            assert_eq!(peer_handshake_data.network, own_handshake_data.network);
1✔
2622
            assert_eq!(peer_handshake_data.version, own_handshake_data.version);
1✔
2623
            let mock_stream = tokio_test::io::Builder::new()
1✔
2624
                .read(
1✔
2625
                    &to_bytes(&PeerMessage::Handshake(Box::new((
1✔
2626
                        crate::MAGIC_STRING_REQUEST.to_vec(),
1✔
2627
                        peer_handshake_data.clone(),
1✔
2628
                    ))))
1✔
2629
                    .unwrap(),
1✔
2630
                )
1✔
2631
                .write(
1✔
2632
                    &to_bytes(&PeerMessage::Handshake(Box::new((
1✔
2633
                        crate::MAGIC_STRING_RESPONSE.to_vec(),
1✔
2634
                        own_handshake_data.clone(),
1✔
2635
                    ))))
1✔
2636
                    .unwrap(),
1✔
2637
                )
1✔
2638
                .write(
1✔
2639
                    &to_bytes(&PeerMessage::ConnectionStatus(
1✔
2640
                        TransferConnectionStatus::Accepted,
1✔
2641
                    ))
1✔
2642
                    .unwrap(),
1✔
2643
                )
1✔
2644
                .build();
1✔
2645
            let peer_to_main_tx_clone = main_loop_handler.peer_task_to_main_tx.clone();
1✔
2646
            let global_state_lock_clone = main_loop_handler.global_state_lock.clone();
1✔
2647
            let main_to_peer_rx_clone = main_loop_handler.main_to_peer_broadcast_tx.subscribe();
1✔
2648
            let incoming_peer_task_handle = tokio::task::Builder::new()
1✔
2649
                .name("answer_peer_wrapper")
1✔
2650
                .spawn(async move {
1✔
2651
                    answer_peer(
1✔
2652
                        mock_stream,
1✔
2653
                        global_state_lock_clone,
1✔
2654
                        peer_socket_address,
1✔
2655
                        main_to_peer_rx_clone,
1✔
2656
                        peer_to_main_tx_clone,
1✔
2657
                        own_handshake_data,
1✔
2658
                    )
1✔
2659
                    .await
1✔
2660
                    .unwrap();
1✔
2661
                })
1✔
2662
                .unwrap();
1✔
2663

2664
            // `answer_peer_wrapper` should send a
2665
            // `DisconnectFromLongestLivedPeer` message to main
2666
            let peer_to_main_message = peer_to_main_rx.recv().await.unwrap();
1✔
2667
            assert!(matches!(
1✔
2668
                peer_to_main_message,
1✔
2669
                PeerTaskToMain::DisconnectFromLongestLivedPeer,
2670
            ));
2671

2672
            // process this message
2673
            main_loop_handler
1✔
2674
                .handle_peer_task_message(peer_to_main_message, &mut mutable_main_loop_state)
1✔
2675
                .await
1✔
2676
                .unwrap();
1✔
2677

2678
            // main loop should send a `Disconnect` message
2679
            let main_to_peers_message = main_to_peer_rx.recv().await.unwrap();
1✔
2680
            let MainToPeerTask::Disconnect(observed_drop_peer_socket_address) =
1✔
2681
                main_to_peers_message
1✔
2682
            else {
UNCOV
2683
                panic!("Expected disconnect, got {main_to_peers_message:?}");
×
2684
            };
2685

2686
            // matched observed droppee against expectation
2687
            assert_eq!(
1✔
2688
                expected_drop_peer_socket_address,
1✔
2689
                observed_drop_peer_socket_address,
1✔
2690
            );
1✔
2691
            println!("Dropped connection with {expected_drop_peer_socket_address}.");
1✔
2692

1✔
2693
            // don't forget to terminate the peer task, which is still running
1✔
2694
            incoming_peer_task_handle.abort();
1✔
2695
        }
1✔
2696
    }
2697
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc