• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Neptune-Crypto / neptune-core / 15027636480

14 May 2025 05:53PM UTC coverage: 71.729% (-0.02%) from 71.744%
15027636480

Pull #592

github

web-flow
Merge e1c7119d6 into 825a3b2f8
Pull Request #592: Bootstrap with downloaded blocks

83 of 124 new or added lines in 6 files covered. (66.94%)

19 existing lines in 4 files now uncovered.

20051 of 27954 relevant lines covered (71.73%)

381258.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.03
/src/main_loop.rs
1
pub mod proof_upgrader;
2

3
use std::collections::HashMap;
4
use std::net::SocketAddr;
5
use std::sync::Arc;
6
use std::time::Duration;
7
use std::time::SystemTime;
8

9
use anyhow::Result;
10
use itertools::Itertools;
11
use proof_upgrader::get_upgrade_task_from_mempool;
12
use proof_upgrader::UpdateMutatorSetDataJob;
13
use proof_upgrader::UpgradeJob;
14
use rand::prelude::IteratorRandom;
15
use rand::seq::IndexedRandom;
16
use tokio::net::TcpListener;
17
use tokio::select;
18
use tokio::signal;
19
use tokio::sync::broadcast;
20
use tokio::sync::mpsc;
21
use tokio::task::JoinHandle;
22
use tokio::time;
23
use tokio::time::Instant;
24
use tokio::time::MissedTickBehavior;
25
use tracing::debug;
26
use tracing::error;
27
use tracing::info;
28
use tracing::trace;
29
use tracing::warn;
30

31
use crate::connect_to_peers::answer_peer;
32
use crate::connect_to_peers::call_peer;
33
use crate::macros::fn_name;
34
use crate::macros::log_slow_scope;
35
use crate::models::blockchain::block::block_header::BlockHeader;
36
use crate::models::blockchain::block::block_height::BlockHeight;
37
use crate::models::blockchain::block::difficulty_control::ProofOfWork;
38
use crate::models::blockchain::block::Block;
39
use crate::models::blockchain::transaction::Transaction;
40
use crate::models::blockchain::transaction::TransactionProof;
41
use crate::models::channel::MainToMiner;
42
use crate::models::channel::MainToPeerTask;
43
use crate::models::channel::MainToPeerTaskBatchBlockRequest;
44
use crate::models::channel::MinerToMain;
45
use crate::models::channel::PeerTaskToMain;
46
use crate::models::channel::RPCServerToMain;
47
use crate::models::peer::handshake_data::HandshakeData;
48
use crate::models::peer::peer_info::PeerInfo;
49
use crate::models::peer::transaction_notification::TransactionNotification;
50
use crate::models::peer::PeerSynchronizationState;
51
use crate::models::proof_abstractions::tasm::program::TritonVmProofJobOptions;
52
use crate::models::state::block_proposal::BlockProposal;
53
use crate::models::state::mempool::TransactionOrigin;
54
use crate::models::state::networking_state::SyncAnchor;
55
use crate::models::state::tx_proving_capability::TxProvingCapability;
56
use crate::models::state::GlobalState;
57
use crate::models::state::GlobalStateLock;
58
use crate::triton_vm_job_queue::vm_job_queue;
59
use crate::triton_vm_job_queue::TritonVmJobPriority;
60
use crate::triton_vm_job_queue::TritonVmJobQueue;
61
use crate::SUCCESS_EXIT_CODE;
62

63
const PEER_DISCOVERY_INTERVAL: Duration = Duration::from_secs(2 * 60);
64
const SYNC_REQUEST_INTERVAL: Duration = Duration::from_secs(3);
65
const MEMPOOL_PRUNE_INTERVAL: Duration = Duration::from_secs(30 * 60);
66
const MP_RESYNC_INTERVAL: Duration = Duration::from_secs(59);
67
const EXPECTED_UTXOS_PRUNE_INTERVAL: Duration = Duration::from_secs(19 * 60);
68

69
/// Interval for when transaction-upgrade checker is run. Note that this does
70
/// *not* define how often a transaction-proof upgrade is actually performed.
71
/// Only how often we check if we're ready to perform an upgrade.
72
const TRANSACTION_UPGRADE_CHECK_INTERVAL: Duration = Duration::from_secs(60);
73

74
const SANCTION_PEER_TIMEOUT_FACTOR: u64 = 40;
75

76
/// Number of seconds within which an individual peer is expected to respond
77
/// to a synchronization request.
78
const INDIVIDUAL_PEER_SYNCHRONIZATION_TIMEOUT: Duration =
79
    Duration::from_secs(SYNC_REQUEST_INTERVAL.as_secs() * SANCTION_PEER_TIMEOUT_FACTOR);
80

81
/// Number of seconds that a synchronization may run without any progress.
82
const GLOBAL_SYNCHRONIZATION_TIMEOUT: Duration =
83
    Duration::from_secs(INDIVIDUAL_PEER_SYNCHRONIZATION_TIMEOUT.as_secs() * 4);
84

85
const POTENTIAL_PEER_MAX_COUNT_AS_A_FACTOR_OF_MAX_PEERS: usize = 20;
86
pub(crate) const MAX_NUM_DIGESTS_IN_BATCH_REQUEST: usize = 200;
87
const TX_UPDATER_CHANNEL_CAPACITY: usize = 1;
88

89
/// Wraps a transmission channel.
90
///
91
/// To be used for the transmission channel to the miner, because
92
///  a) the miner might not exist in which case there would be no-one to empty
93
///     the channel; and
94
///  b) contrary to other channels, transmission failures here are not critical.
95
#[derive(Debug)]
96
struct MainToMinerChannel(Option<mpsc::Sender<MainToMiner>>);
97

98
impl MainToMinerChannel {
99
    /// Send a message to the miner task (if any).
100
    fn send(&self, message: MainToMiner) {
53✔
101
        // Do no use the async `send` function because it blocks until there
102
        // is spare capacity on the channel. Messages to the miner are not
103
        // critical so if there is no capacity left, just log an error
104
        // message.
105
        if let Some(channel) = &self.0 {
53✔
106
            if let Err(e) = channel.try_send(message) {
×
107
                error!("Failed to send pause message to miner thread:\n{e}");
×
108
            }
×
109
        }
53✔
110
    }
53✔
111
}
112

113
/// MainLoop is the immutable part of the input for the main loop function
114
#[derive(Debug)]
115
pub struct MainLoopHandler {
116
    incoming_peer_listener: TcpListener,
117
    global_state_lock: GlobalStateLock,
118

119
    // note: broadcast::Sender::send() does not block
120
    main_to_peer_broadcast_tx: broadcast::Sender<MainToPeerTask>,
121

122
    // note: mpsc::Sender::send() blocks if channel full.
123
    // locks should not be held across it.
124
    peer_task_to_main_tx: mpsc::Sender<PeerTaskToMain>,
125

126
    // note: MainToMinerChannel::send() does not block.  might log error.
127
    main_to_miner_tx: MainToMinerChannel,
128

129
    peer_task_to_main_rx: mpsc::Receiver<PeerTaskToMain>,
130
    miner_to_main_rx: mpsc::Receiver<MinerToMain>,
131
    rpc_server_to_main_rx: mpsc::Receiver<RPCServerToMain>,
132
    task_handles: Vec<JoinHandle<()>>,
133

134
    #[cfg(test)]
135
    mock_now: Option<SystemTime>,
136
}
137

138
/// The mutable part of the main loop function
139
struct MutableMainLoopState {
140
    /// Information used to batch-download blocks.
141
    sync_state: SyncState,
142

143
    /// Information about potential peers for new connections.
144
    potential_peers: PotentialPeersState,
145

146
    /// A list of join-handles to spawned tasks.
147
    task_handles: Vec<JoinHandle<()>>,
148

149
    /// A join-handle to a task performing transaction-proof upgrades.
150
    proof_upgrader_task: Option<JoinHandle<()>>,
151

152
    /// A join-handle to a task running the update of the mempool transactions.
153
    update_mempool_txs_handle: Option<JoinHandle<()>>,
154

155
    /// A channel that the task updating mempool transactions can use to
156
    /// communicate its result.
157
    update_mempool_receiver: mpsc::Receiver<Vec<Transaction>>,
158
}
159

160
impl MutableMainLoopState {
161
    fn new(task_handles: Vec<JoinHandle<()>>) -> Self {
14✔
162
        let (_dummy_sender, dummy_receiver) =
14✔
163
            mpsc::channel::<Vec<Transaction>>(TX_UPDATER_CHANNEL_CAPACITY);
14✔
164
        Self {
14✔
165
            sync_state: SyncState::default(),
14✔
166
            potential_peers: PotentialPeersState::default(),
14✔
167
            task_handles,
14✔
168
            proof_upgrader_task: None,
14✔
169
            update_mempool_txs_handle: None,
14✔
170
            update_mempool_receiver: dummy_receiver,
14✔
171
        }
14✔
172
    }
14✔
173
}
174

175
/// handles batch-downloading of blocks if we are more than n blocks behind
176
#[derive(Default, Debug)]
177
struct SyncState {
178
    peer_sync_states: HashMap<SocketAddr, PeerSynchronizationState>,
179
    last_sync_request: Option<(SystemTime, BlockHeight, SocketAddr)>,
180
}
181

182
impl SyncState {
183
    fn record_request(
1✔
184
        &mut self,
1✔
185
        requested_block_height: BlockHeight,
1✔
186
        peer: SocketAddr,
1✔
187
        now: SystemTime,
1✔
188
    ) {
1✔
189
        self.last_sync_request = Some((now, requested_block_height, peer));
1✔
190
    }
1✔
191

192
    /// Return a list of peers that have reported to be in possession of blocks
193
    /// with a PoW above a threshold.
194
    fn get_potential_peers_for_sync_request(&self, threshold_pow: ProofOfWork) -> Vec<SocketAddr> {
2✔
195
        self.peer_sync_states
2✔
196
            .iter()
2✔
197
            .filter(|(_sa, sync_state)| sync_state.claimed_max_pow > threshold_pow)
2✔
198
            .map(|(sa, _)| *sa)
2✔
199
            .collect()
2✔
200
    }
2✔
201

202
    /// Determine if a peer should be sanctioned for failing to respond to a
203
    /// synchronization request fast enough. Also determine if a new request
204
    /// should be made or the previous one should be allowed to run for longer.
205
    ///
206
    /// Returns (peer to be sanctioned, attempt new request).
207
    fn get_status_of_last_request(
1✔
208
        &self,
1✔
209
        current_block_height: BlockHeight,
1✔
210
        now: SystemTime,
1✔
211
    ) -> (Option<SocketAddr>, bool) {
1✔
212
        // A peer is sanctioned if no answer has been received after N times the sync request
213
        // interval.
214
        match self.last_sync_request {
1✔
215
            None => {
216
                // No sync request has been made since startup of program
217
                (None, true)
1✔
218
            }
219
            Some((req_time, requested_height, peer_sa)) => {
×
220
                if requested_height < current_block_height {
×
221
                    // The last sync request updated the state
222
                    (None, true)
×
223
                } else if req_time + INDIVIDUAL_PEER_SYNCHRONIZATION_TIMEOUT < now {
×
224
                    // The last sync request was not answered, sanction peer
225
                    // and make a new sync request.
226
                    (Some(peer_sa), true)
×
227
                } else {
228
                    // The last sync request has not yet been answered. But it has
229
                    // not timed out yet.
230
                    (None, false)
×
231
                }
232
            }
233
        }
234
    }
1✔
235
}
236

237
/// holds information about a potential peer in the process of peer discovery
238
struct PotentialPeerInfo {
239
    _reported: SystemTime,
240
    _reported_by: SocketAddr,
241
    instance_id: u128,
242
    distance: u8,
243
}
244

245
impl PotentialPeerInfo {
246
    fn new(reported_by: SocketAddr, instance_id: u128, distance: u8, now: SystemTime) -> Self {
3✔
247
        Self {
3✔
248
            _reported: now,
3✔
249
            _reported_by: reported_by,
3✔
250
            instance_id,
3✔
251
            distance,
3✔
252
        }
3✔
253
    }
3✔
254
}
255

256
/// holds information about a set of potential peers in the process of peer discovery
257
struct PotentialPeersState {
258
    potential_peers: HashMap<SocketAddr, PotentialPeerInfo>,
259
}
260

261
impl PotentialPeersState {
262
    fn default() -> Self {
14✔
263
        Self {
14✔
264
            potential_peers: HashMap::new(),
14✔
265
        }
14✔
266
    }
14✔
267

268
    fn add(
3✔
269
        &mut self,
3✔
270
        reported_by: SocketAddr,
3✔
271
        potential_peer: (SocketAddr, u128),
3✔
272
        max_peers: usize,
3✔
273
        distance: u8,
3✔
274
        now: SystemTime,
3✔
275
    ) {
3✔
276
        let potential_peer_socket_address = potential_peer.0;
3✔
277
        let potential_peer_instance_id = potential_peer.1;
3✔
278

279
        // This check *should* make it likely that a potential peer is always
280
        // registered with the lowest observed distance.
281
        if self
3✔
282
            .potential_peers
3✔
283
            .contains_key(&potential_peer_socket_address)
3✔
284
        {
285
            return;
×
286
        }
3✔
287

288
        // If this data structure is full, remove a random entry. Then add this.
289
        if self.potential_peers.len()
3✔
290
            > max_peers * POTENTIAL_PEER_MAX_COUNT_AS_A_FACTOR_OF_MAX_PEERS
3✔
291
        {
×
292
            let mut rng = rand::rng();
×
293
            let random_potential_peer = self
×
294
                .potential_peers
×
295
                .keys()
×
296
                .choose(&mut rng)
×
297
                .unwrap()
×
298
                .to_owned();
×
299
            self.potential_peers.remove(&random_potential_peer);
×
300
        }
3✔
301

302
        let insert_value =
3✔
303
            PotentialPeerInfo::new(reported_by, potential_peer_instance_id, distance, now);
3✔
304
        self.potential_peers
3✔
305
            .insert(potential_peer_socket_address, insert_value);
3✔
306
    }
3✔
307

308
    /// Return a peer from the potential peer list that we aren't connected to
309
    /// and  that isn't our own address.
310
    ///
311
    /// Favors peers with a high distance and with IPs that we are not already
312
    /// connected to.
313
    ///
314
    /// Returns (socket address, peer distance)
315
    fn get_candidate(
1✔
316
        &self,
1✔
317
        connected_clients: &[PeerInfo],
1✔
318
        own_instance_id: u128,
1✔
319
    ) -> Option<(SocketAddr, u8)> {
1✔
320
        let peers_instance_ids: Vec<u128> =
1✔
321
            connected_clients.iter().map(|x| x.instance_id()).collect();
2✔
322

323
        // Only pick those peers that report a listening port
324
        let peers_listen_addresses: Vec<SocketAddr> = connected_clients
1✔
325
            .iter()
1✔
326
            .filter_map(|x| x.listen_address())
2✔
327
            .collect();
1✔
328

329
        // Find the appropriate candidates
330
        let candidates = self
1✔
331
            .potential_peers
1✔
332
            .iter()
1✔
333
            // Prevent connecting to self. Note that we *only* use instance ID to prevent this,
334
            // meaning this will allow multiple nodes e.g. running on the same computer to form
335
            // a complete graph.
336
            .filter(|pp| pp.1.instance_id != own_instance_id)
1✔
337
            // Prevent connecting to peer we already are connected to
338
            .filter(|potential_peer| !peers_instance_ids.contains(&potential_peer.1.instance_id))
1✔
339
            .filter(|potential_peer| !peers_listen_addresses.contains(potential_peer.0))
1✔
340
            .collect::<Vec<_>>();
1✔
341

342
        // Prefer candidates with IPs that we are not already connected to but
343
        // connect to repeated IPs in case we don't have other options, as
344
        // repeated IPs may just be multiple machines on the same NAT'ed IPv4
345
        // address.
346
        let mut connected_ips = peers_listen_addresses.into_iter().map(|x| x.ip());
1✔
347
        let candidates = if candidates
1✔
348
            .iter()
1✔
349
            .any(|candidate| !connected_ips.contains(&candidate.0.ip()))
1✔
350
        {
351
            candidates
×
352
                .into_iter()
×
353
                .filter(|candidate| !connected_ips.contains(&candidate.0.ip()))
×
354
                .collect()
×
355
        } else {
356
            candidates
1✔
357
        };
358

359
        // Get the candidate list with the highest distance
360
        let max_distance_candidates = candidates.iter().max_by_key(|pp| pp.1.distance);
1✔
361

362
        // Pick a random candidate from the appropriate candidates
363
        let mut rng = rand::rng();
1✔
364
        max_distance_candidates
1✔
365
            .iter()
1✔
366
            .choose(&mut rng)
1✔
367
            .map(|x| (x.0.to_owned(), x.1.distance))
1✔
368
    }
1✔
369
}
370

371
/// Return a boolean indicating if synchronization mode should be left
372
fn stay_in_sync_mode(
×
373
    own_block_tip_header: &BlockHeader,
×
374
    sync_state: &SyncState,
×
375
    sync_mode_threshold: usize,
×
376
) -> bool {
×
377
    let max_claimed_pow = sync_state
×
378
        .peer_sync_states
×
379
        .values()
×
380
        .max_by_key(|x| x.claimed_max_pow);
×
381
    match max_claimed_pow {
×
382
        None => false, // No peer have passed the sync challenge phase.
×
383

384
        // Synchronization is left when the remaining number of block is half of what has
385
        // been indicated to fit into RAM
386
        Some(max_claim) => {
×
387
            own_block_tip_header.cumulative_proof_of_work < max_claim.claimed_max_pow
×
388
                && max_claim.claimed_max_height - own_block_tip_header.height
×
389
                    > sync_mode_threshold as i128 / 2
×
390
        }
391
    }
392
}
×
393

394
impl MainLoopHandler {
395
    // todo: find a way to avoid triggering lint
396
    #[allow(clippy::too_many_arguments)]
397
    pub(crate) fn new(
16✔
398
        incoming_peer_listener: TcpListener,
16✔
399
        global_state_lock: GlobalStateLock,
16✔
400
        main_to_peer_broadcast_tx: broadcast::Sender<MainToPeerTask>,
16✔
401
        peer_task_to_main_tx: mpsc::Sender<PeerTaskToMain>,
16✔
402
        main_to_miner_tx: mpsc::Sender<MainToMiner>,
16✔
403

16✔
404
        peer_task_to_main_rx: mpsc::Receiver<PeerTaskToMain>,
16✔
405
        miner_to_main_rx: mpsc::Receiver<MinerToMain>,
16✔
406
        rpc_server_to_main_rx: mpsc::Receiver<RPCServerToMain>,
16✔
407
        task_handles: Vec<JoinHandle<()>>,
16✔
408
    ) -> Self {
16✔
409
        let maybe_main_to_miner_tx = if global_state_lock.cli().mine() {
16✔
410
            Some(main_to_miner_tx)
×
411
        } else {
412
            None
16✔
413
        };
414
        Self {
16✔
415
            incoming_peer_listener,
16✔
416
            global_state_lock,
16✔
417
            main_to_miner_tx: MainToMinerChannel(maybe_main_to_miner_tx),
16✔
418
            main_to_peer_broadcast_tx,
16✔
419
            peer_task_to_main_tx,
16✔
420

16✔
421
            peer_task_to_main_rx,
16✔
422
            miner_to_main_rx,
16✔
423
            rpc_server_to_main_rx,
16✔
424
            task_handles,
16✔
425

16✔
426
            #[cfg(test)]
16✔
427
            mock_now: None,
16✔
428
        }
16✔
429
    }
16✔
430

431
    pub fn global_state_lock(&mut self) -> GlobalStateLock {
8✔
432
        self.global_state_lock.clone()
8✔
433
    }
8✔
434

435
    /// Allows for mocked timestamps such that time dependencies may be tested.
436
    #[cfg(test)]
437
    fn with_mocked_time(mut self, mocked_time: SystemTime) -> Self {
3✔
438
        self.mock_now = Some(mocked_time);
3✔
439
        self
3✔
440
    }
3✔
441

442
    fn now(&self) -> SystemTime {
18✔
443
        #[cfg(not(test))]
444
        {
445
            SystemTime::now()
11✔
446
        }
447
        #[cfg(test)]
448
        {
449
            self.mock_now.unwrap_or(SystemTime::now())
7✔
450
        }
451
    }
18✔
452

453
    /// Run a list of Triton VM prover jobs that update the mutator set state
454
    /// for transactions.
455
    ///
456
    /// Sends the result back through the provided channel.
457
    async fn update_mempool_jobs(
32✔
458
        update_jobs: Vec<UpdateMutatorSetDataJob>,
32✔
459
        job_queue: Arc<TritonVmJobQueue>,
32✔
460
        transaction_update_sender: mpsc::Sender<Vec<Transaction>>,
32✔
461
        proof_job_options: TritonVmProofJobOptions,
32✔
462
    ) {
32✔
463
        debug!(
32✔
464
            "Attempting to update transaction proofs of {} transactions",
×
465
            update_jobs.len()
×
466
        );
467
        let mut result = vec![];
32✔
468
        for job in update_jobs {
32✔
469
            // Jobs for updating txs in the mempool have highest priority since
470
            // they block the composer from continuing.
471
            // TODO: Handle errors better here.
472
            let job_result = job
×
473
                .upgrade(job_queue.clone(), proof_job_options.clone())
×
474
                .await
×
475
                .unwrap();
×
476
            result.push(job_result);
×
477
        }
478

479
        transaction_update_sender
32✔
480
            .send(result)
32✔
481
            .await
32✔
482
            .expect("Receiver for updated txs in main loop must still exist");
32✔
483
    }
32✔
484

485
    /// Handles a list of transactions whose proof has been updated with new
486
    /// mutator set data.
487
    async fn handle_updated_mempool_txs(&mut self, updated_txs: Vec<Transaction>) {
31✔
488
        // Update mempool with updated transactions
489
        {
490
            let mut state = self.global_state_lock.lock_guard_mut().await;
31✔
491
            for updated in &updated_txs {
29✔
492
                let txid = updated.kernel.txid();
×
493
                if let Some(tx) = state.mempool.get_mut(txid) {
×
494
                    *tx = updated.to_owned();
×
495
                } else {
×
496
                    warn!("Updated transaction which is no longer in mempool");
×
497
                }
498
            }
499
        }
500

501
        // Then notify all peers
502
        for updated in updated_txs {
29✔
503
            let pmsg = MainToPeerTask::TransactionNotification((&updated).try_into().unwrap());
×
504
            self.main_to_peer_broadcast(pmsg);
×
505
        }
×
506

507
        // Tell miner that it can now start composing next block.
508
        self.main_to_miner_tx.send(MainToMiner::Continue);
29✔
509
    }
29✔
510

511
    /// Process a block whose PoW solution was solved by this client (or an
512
    /// external program) and has not been seen by the rest of the network yet.
513
    ///
514
    /// Shares block with all connected peers, updates own state, and updates
515
    /// any mempool transactions to be valid under this new block.
516
    ///
517
    /// Locking:
518
    ///  * acquires `global_state_lock` for read and write
519
    async fn handle_self_guessed_block(
20✔
520
        &mut self,
20✔
521
        main_loop_state: &mut MutableMainLoopState,
20✔
522
        new_block: Box<Block>,
20✔
523
    ) -> Result<()> {
20✔
524
        let new_block_hash = new_block.hash();
20✔
525

526
        // clone block in advance, so lock is held less time.
527
        // note that this clone is wasted if block is not more canonical
528
        // but that should be the less common case.
529
        //
530
        // perf: in the future we should use Arc systematically to avoid these
531
        // expensive block clones.
532
        let new_block_clone = (*new_block).clone();
20✔
533

534
        // important!  the is_canonical check and set_new_tip() need to be an
535
        // atomic operation, ie called within the same write-lock acquisition.
536
        //
537
        // this avoids a race condition where block B and C are both more
538
        // canonical than A, but B is more than C, yet C replaces B because it
539
        // was only checked against A.
540
        //
541
        // we release the lock as quickly as possible.
542
        let update_jobs = {
20✔
543
            let mut gsm = self.global_state_lock.lock_guard_mut().await;
20✔
544

545
            // bail out if incoming block is not more canonical than present tip.
546
            if !gsm.incoming_block_is_more_canonical(&new_block) {
20✔
547
                drop(gsm); // drop lock right away before send.
×
548
                warn!("Got new block from miner that was not child of tip. Discarding.");
×
549
                self.main_to_miner_tx.send(MainToMiner::Continue);
×
550
                return Ok(());
×
551
            }
20✔
552
            // set new tip and obtain list of update-jobs to perform.
553
            // the jobs update mutator-set data for:
554
            //   all tx if we are in composer role.
555
            //   else self-owned tx.
556
            // see: Mempool::update_with_block_and_predecessor()
557
            let update_jobs = gsm.set_new_tip(new_block_clone).await?;
20✔
558
            gsm.flush_databases().await?;
20✔
559
            update_jobs
20✔
560
        }; // write-lock is dropped here.
561

562
        // Share block with peers right away.
563
        let pmsg = MainToPeerTask::Block(new_block);
20✔
564
        self.main_to_peer_broadcast(pmsg);
20✔
565

566
        info!("Locally-mined block is new tip: {}", new_block_hash);
20✔
567
        info!("broadcasting new block to peers");
20✔
568

569
        self.spawn_mempool_txs_update_job(main_loop_state, update_jobs);
20✔
570

571
        Ok(())
20✔
572
    }
20✔
573

574
    /// Locking:
575
    ///   * acquires `global_state_lock` for write
576
    async fn handle_miner_task_message(
×
577
        &mut self,
×
578
        msg: MinerToMain,
×
579
        main_loop_state: &mut MutableMainLoopState,
×
580
    ) -> Result<Option<i32>> {
×
581
        match msg {
×
582
            MinerToMain::NewBlockFound(new_block_info) => {
×
583
                log_slow_scope!(fn_name!() + "::MinerToMain::NewBlockFound");
×
584

585
                let new_block = new_block_info.block;
×
586

587
                info!("Miner found new block: {}", new_block.kernel.header.height);
×
588
                self.handle_self_guessed_block(main_loop_state, new_block)
×
589
                    .await?;
×
590
            }
591
            MinerToMain::BlockProposal(boxed_proposal) => {
×
592
                let (block, expected_utxos) = *boxed_proposal;
×
593

594
                // If block proposal from miner does not build on current tip,
595
                // don't broadcast it. This check covers reorgs as well.
596
                let current_tip = self
×
597
                    .global_state_lock
×
598
                    .lock_guard()
×
599
                    .await
×
600
                    .chain
601
                    .light_state()
×
602
                    .clone();
×
603
                if block.header().prev_block_digest != current_tip.hash() {
×
604
                    warn!(
×
605
                        "Got block proposal from miner that does not build on current tip. \
×
606
                           Rejecting. If this happens a lot, then maybe this machine is too \
×
607
                           slow to competitively compose blocks. Consider running the client only \
×
608
                           with the guesser flag set and not the compose flag."
×
609
                    );
610
                    self.main_to_miner_tx.send(MainToMiner::Continue);
×
611
                    return Ok(None);
×
612
                }
×
613

614
                // Ensure proposal validity before sharing
615
                if !block
×
616
                    .is_valid(
×
617
                        &current_tip,
×
618
                        block.header().timestamp,
×
619
                        self.global_state_lock.cli().network,
×
620
                    )
×
621
                    .await
×
622
                {
623
                    error!("Own block proposal invalid. This should not happen.");
×
624
                    self.main_to_miner_tx.send(MainToMiner::Continue);
×
625
                    return Ok(None);
×
626
                }
×
627

628
                if !self.global_state_lock.cli().secret_compositions {
×
629
                    let pmsg = MainToPeerTask::BlockProposalNotification((&block).into());
×
630
                    self.main_to_peer_broadcast(pmsg);
×
631
                }
×
632

633
                {
634
                    // Use block proposal and add expected UTXOs from this
635
                    // proposal.
636
                    let mut state = self.global_state_lock.lock_guard_mut().await;
×
637
                    state.mining_state.block_proposal =
×
638
                        BlockProposal::own_proposal(block.clone(), expected_utxos.clone());
×
639
                    state.wallet_state.add_expected_utxos(expected_utxos).await;
×
640
                }
641

642
                // Indicate to miner that block proposal was successfully
643
                // received by main-loop.
644
                self.main_to_miner_tx.send(MainToMiner::Continue);
×
645
            }
646
            MinerToMain::Shutdown(exit_code) => {
×
647
                return Ok(Some(exit_code));
×
648
            }
649
        }
650

651
        Ok(None)
×
652
    }
×
653

654
    /// Locking:
655
    ///   * acquires `global_state_lock` for write
656
    async fn handle_peer_task_message(
1✔
657
        &mut self,
1✔
658
        msg: PeerTaskToMain,
1✔
659
        main_loop_state: &mut MutableMainLoopState,
1✔
660
    ) -> Result<()> {
19✔
661
        debug!("Received {} from a peer task", msg.get_type());
19✔
662
        let cli_args = self.global_state_lock.cli().clone();
19✔
663
        match msg {
19✔
664
            PeerTaskToMain::NewBlocks(blocks) => {
12✔
665
                log_slow_scope!(fn_name!() + "::PeerTaskToMain::NewBlocks");
12✔
666

667
                let last_block = blocks.last().unwrap().to_owned();
12✔
668
                let update_jobs = {
12✔
669
                    // The peer tasks also check this condition, if block is more canonical than current
670
                    // tip, but we have to check it again since the block update might have already been applied
671
                    // through a message from another peer (or from own miner).
672
                    let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
12✔
673
                    let new_canonical =
12✔
674
                        global_state_mut.incoming_block_is_more_canonical(&last_block);
12✔
675

676
                    if !new_canonical {
12✔
677
                        // The blocks are not canonical, but: if we are in sync
678
                        // mode and these blocks beat our current champion, then
679
                        // we store them anyway, without marking them as tip.
680
                        let Some(sync_anchor) = global_state_mut.net.sync_anchor.as_mut() else {
×
681
                            warn!(
×
682
                                "Blocks were not new, and we're not syncing. Not storing blocks."
×
683
                            );
684
                            return Ok(());
×
685
                        };
686
                        if sync_anchor
×
687
                            .champion
×
688
                            .is_some_and(|(height, _)| height >= last_block.header().height)
×
689
                        {
690
                            warn!("Repeated blocks received in sync mode, not storing");
×
691
                            return Ok(());
×
692
                        }
×
693

694
                        sync_anchor.catch_up(last_block.header().height, last_block.hash());
×
695

696
                        for block in blocks {
×
697
                            global_state_mut.store_block_not_tip(block).await?;
×
698
                        }
699

NEW
700
                        global_state_mut.flush_databases().await?;
×
701

UNCOV
702
                        return Ok(());
×
703
                    }
12✔
704

705
                    info!(
12✔
706
                        "Last block from peer is new canonical tip: {}; height: {}",
×
707
                        last_block.hash(),
×
708
                        last_block.header().height
×
709
                    );
710

711
                    // Ask miner to stop work until state update is completed
712
                    self.main_to_miner_tx.send(MainToMiner::WaitForContinue);
12✔
713

714
                    // Get out of sync mode if needed
715
                    if global_state_mut.net.sync_anchor.is_some() {
12✔
716
                        let stay_in_sync_mode = stay_in_sync_mode(
×
717
                            &last_block.kernel.header,
×
718
                            &main_loop_state.sync_state,
×
719
                            cli_args.sync_mode_threshold,
×
720
                        );
721
                        if !stay_in_sync_mode {
×
722
                            info!("Exiting sync mode");
×
723
                            global_state_mut.net.sync_anchor = None;
×
724
                            self.main_to_miner_tx.send(MainToMiner::StopSyncing);
×
725
                        }
×
726
                    }
12✔
727

728
                    let mut update_jobs: Vec<UpdateMutatorSetDataJob> = vec![];
12✔
729
                    for new_block in blocks {
24✔
730
                        debug!(
12✔
731
                            "Storing block {} in database. Height: {}, Mined: {}",
×
732
                            new_block.hash(),
×
733
                            new_block.kernel.header.height,
×
734
                            new_block.kernel.header.timestamp.standard_format()
×
735
                        );
736

737
                        // Potential race condition here.
738
                        // What if last block is new and canonical, but first
739
                        // block is already known then we'll store the same block
740
                        // twice. That should be OK though, as the appropriate
741
                        // database entries are simply overwritten with the new
742
                        // block info. See the
743
                        // [GlobalState::tests::setting_same_tip_twice_is_allowed]
744
                        // test for a test of this phenomenon.
745

746
                        let update_jobs_ = global_state_mut.set_new_tip(new_block).await?;
12✔
747
                        update_jobs.extend(update_jobs_);
12✔
748
                    }
749

750
                    global_state_mut.flush_databases().await?;
12✔
751

752
                    update_jobs
12✔
753
                };
754

755
                // Inform all peers about new block
756
                let pmsg = MainToPeerTask::Block(Box::new(last_block.clone()));
12✔
757
                self.main_to_peer_broadcast(pmsg);
12✔
758

759
                // Spawn task to handle mempool tx-updating after new blocks.
760
                // TODO: Do clever trick to collapse all jobs relating to the same transaction,
761
                //       identified by transaction-ID, into *one* update job.
762
                self.spawn_mempool_txs_update_job(main_loop_state, update_jobs);
12✔
763

764
                // Inform miner about new block.
765
                self.main_to_miner_tx.send(MainToMiner::NewBlock);
12✔
766
            }
767
            PeerTaskToMain::AddPeerMaxBlockHeight {
768
                peer_address,
×
769
                claimed_height,
×
770
                claimed_cumulative_pow,
×
771
                claimed_block_mmra,
×
772
            } => {
773
                log_slow_scope!(fn_name!() + "::PeerTaskToMain::AddPeerMaxBlockHeight");
×
774

775
                let claimed_state =
×
776
                    PeerSynchronizationState::new(claimed_height, claimed_cumulative_pow);
×
777
                main_loop_state
×
778
                    .sync_state
×
779
                    .peer_sync_states
×
780
                    .insert(peer_address, claimed_state);
×
781

782
                // Check if synchronization mode should be activated.
783
                // Synchronization mode is entered if accumulated PoW exceeds
784
                // our tip and if the height difference is positive and beyond
785
                // a threshold value.
786
                let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
×
787
                if global_state_mut.sync_mode_criterion(claimed_height, claimed_cumulative_pow)
×
788
                    && global_state_mut
×
789
                        .net
×
790
                        .sync_anchor
×
791
                        .as_ref()
×
792
                        .is_none_or(|sa| sa.cumulative_proof_of_work < claimed_cumulative_pow)
×
793
                {
794
                    info!(
×
795
                        "Entering synchronization mode due to peer {} indicating tip height {}; cumulative pow: {:?}",
×
796
                        peer_address, claimed_height, claimed_cumulative_pow
797
                    );
798
                    global_state_mut.net.sync_anchor =
×
799
                        Some(SyncAnchor::new(claimed_cumulative_pow, claimed_block_mmra));
×
800
                    self.main_to_miner_tx.send(MainToMiner::StartSyncing);
×
801
                }
×
802
            }
803
            PeerTaskToMain::RemovePeerMaxBlockHeight(socket_addr) => {
×
804
                log_slow_scope!(fn_name!() + "::PeerTaskToMain::RemovePeerMaxBlockHeight");
×
805

806
                debug!(
×
807
                    "Removing max block height from sync data structure for peer {}",
×
808
                    socket_addr
809
                );
810
                main_loop_state
×
811
                    .sync_state
×
812
                    .peer_sync_states
×
813
                    .remove(&socket_addr);
×
814

815
                // Get out of sync mode if needed.
816
                let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
×
817

818
                if global_state_mut.net.sync_anchor.is_some() {
×
819
                    let stay_in_sync_mode = stay_in_sync_mode(
×
820
                        global_state_mut.chain.light_state().header(),
×
821
                        &main_loop_state.sync_state,
×
822
                        cli_args.sync_mode_threshold,
×
823
                    );
824
                    if !stay_in_sync_mode {
×
825
                        info!("Exiting sync mode");
×
826
                        global_state_mut.net.sync_anchor = None;
×
827
                    }
×
828
                }
×
829
            }
830
            PeerTaskToMain::PeerDiscoveryAnswer((pot_peers, reported_by, distance)) => {
3✔
831
                log_slow_scope!(fn_name!() + "::PeerTaskToMain::PeerDiscoveryAnswer");
3✔
832

833
                let max_peers = self.global_state_lock.cli().max_num_peers;
3✔
834
                for pot_peer in pot_peers {
6✔
835
                    main_loop_state.potential_peers.add(
3✔
836
                        reported_by,
3✔
837
                        pot_peer,
3✔
838
                        max_peers,
3✔
839
                        distance,
3✔
840
                        self.now(),
3✔
841
                    );
3✔
842
                }
3✔
843
            }
844
            PeerTaskToMain::Transaction(pt2m_transaction) => {
3✔
845
                log_slow_scope!(fn_name!() + "::PeerTaskToMain::Transaction");
3✔
846

847
                debug!(
3✔
848
                    "`peer_loop` received following transaction from peer. {} inputs, {} outputs. Synced to mutator set hash: {}",
×
849
                    pt2m_transaction.transaction.kernel.inputs.len(),
×
850
                    pt2m_transaction.transaction.kernel.outputs.len(),
×
851
                    pt2m_transaction.transaction.kernel.mutator_set_hash
×
852
                );
853

854
                {
855
                    let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
3✔
856
                    if pt2m_transaction.confirmable_for_block
3✔
857
                        != global_state_mut.chain.light_state().hash()
3✔
858
                    {
859
                        warn!("main loop got unmined transaction with bad mutator set data, discarding transaction");
×
860
                        return Ok(());
×
861
                    }
3✔
862

863
                    // Insert into mempool
864
                    global_state_mut
3✔
865
                        .mempool_insert(
3✔
866
                            pt2m_transaction.transaction.to_owned(),
3✔
867
                            TransactionOrigin::Foreign,
3✔
868
                        )
3✔
869
                        .await;
3✔
870
                }
871

872
                // send notification to peers
873
                let transaction_notification: TransactionNotification =
3✔
874
                    (&pt2m_transaction.transaction).try_into()?;
3✔
875

876
                let pmsg = MainToPeerTask::TransactionNotification(transaction_notification);
3✔
877
                self.main_to_peer_broadcast(pmsg);
3✔
878
            }
879
            PeerTaskToMain::BlockProposal(block) => {
×
880
                log_slow_scope!(fn_name!() + "::PeerTaskToMain::BlockProposal");
×
881

882
                debug!("main loop received block proposal from peer loop");
×
883

884
                // Due to race-conditions, we need to verify that this
885
                // block proposal is still the immediate child of tip. If it is,
886
                // and it has a higher guesser fee than what we're currently
887
                // working on, then we switch to this, and notify the miner to
888
                // mine on this new block. We don't need to verify the block's
889
                // validity, since that was done in peer loop.
890
                // To ensure atomicity, a write-lock must be held over global
891
                // state while we check if this proposal is favorable.
892
                {
893
                    info!("Received new favorable block proposal for mining operation.");
×
894
                    let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
×
895
                    let verdict = global_state_mut.favor_incoming_block_proposal(
×
896
                        block.header().height,
×
897
                        block.total_guesser_reward(),
×
898
                    );
×
899
                    if let Err(reject_reason) = verdict {
×
900
                        warn!("main loop got unfavorable block proposal. Reason: {reject_reason}");
×
901
                        return Ok(());
×
902
                    }
×
903

904
                    global_state_mut.mining_state.block_proposal =
×
905
                        BlockProposal::foreign_proposal(*block.clone());
×
906
                }
907

908
                // Notify all peers of the block proposal we just accepted
909
                let pmsg = MainToPeerTask::BlockProposalNotification((&*block).into());
×
910
                self.main_to_peer_broadcast(pmsg);
×
911

912
                self.main_to_miner_tx.send(MainToMiner::NewBlockProposal);
×
913
            }
914
            PeerTaskToMain::DisconnectFromLongestLivedPeer => {
915
                let global_state = self.global_state_lock.lock_guard().await;
1✔
916

917
                // get all peers
918
                let all_peers = global_state.net.peer_map.iter();
1✔
919

920
                // filter out CLI peers
921
                let disconnect_candidates =
1✔
922
                    all_peers.filter(|p| !global_state.cli_peers().contains(p.0));
5✔
923

924
                // find the one with the oldest connection
925
                let longest_lived_peer = disconnect_candidates.min_by(
1✔
926
                    |(_socket_address_left, peer_info_left),
927
                     (_socket_address_right, peer_info_right)| {
4✔
928
                        peer_info_left
4✔
929
                            .connection_established()
4✔
930
                            .cmp(&peer_info_right.connection_established())
4✔
931
                    },
4✔
932
                );
933

934
                // tell to disconnect
935
                if let Some((peer_socket, _peer_info)) = longest_lived_peer {
1✔
936
                    let pmsg = MainToPeerTask::Disconnect(peer_socket.to_owned());
1✔
937
                    self.main_to_peer_broadcast(pmsg);
1✔
938
                }
1✔
939
            }
940
        }
941

942
        Ok(())
19✔
943
    }
19✔
944

945
    /// If necessary, disconnect from peers.
946
    ///
947
    /// While a reasonable effort is made to never have more connections than
948
    /// [`max_num_peers`](crate::config_models::cli_args::Args::max_num_peers),
949
    /// this is not guaranteed. For example, bootstrap nodes temporarily allow a
950
    /// surplus of incoming connections to provide their service more reliably.
951
    ///
952
    /// Never disconnects peers listed as CLI arguments.
953
    ///
954
    /// Locking:
955
    ///   * acquires `global_state_lock` for read
956
    async fn prune_peers(&self) -> Result<()> {
2✔
957
        // fetch all relevant info from global state; don't hold the lock
958
        let cli_args = self.global_state_lock.cli();
2✔
959
        let connected_peers = self
2✔
960
            .global_state_lock
2✔
961
            .lock_guard()
2✔
962
            .await
2✔
963
            .net
964
            .peer_map
965
            .values()
2✔
966
            .cloned()
2✔
967
            .collect_vec();
2✔
968

969
        let num_peers = connected_peers.len();
2✔
970
        let max_num_peers = cli_args.max_num_peers;
2✔
971
        if num_peers <= max_num_peers {
2✔
972
            debug!("No need to prune any peer connections.");
1✔
973
            return Ok(());
1✔
974
        }
1✔
975
        warn!("Connected to {num_peers} peers, which exceeds the maximum ({max_num_peers}).");
1✔
976

977
        // If all connections are outbound, it's OK to exceed the max.
978
        if connected_peers.iter().all(|p| p.connection_is_outbound()) {
7✔
979
            warn!("Not disconnecting from any peer because all connections are outbound.");
×
980
            return Ok(());
×
981
        }
1✔
982

983
        let num_peers_to_disconnect = num_peers - max_num_peers;
1✔
984
        let peers_to_disconnect = connected_peers
1✔
985
            .into_iter()
1✔
986
            .filter(|peer| !cli_args.peers.contains(&peer.connected_address()))
14✔
987
            .choose_multiple(&mut rand::rng(), num_peers_to_disconnect);
1✔
988
        match peers_to_disconnect.len() {
1✔
989
            0 => warn!("Not disconnecting from any peer because of manual override."),
×
990
            i => info!("Disconnecting from {i} peers."),
1✔
991
        }
992
        for peer in peers_to_disconnect {
5✔
993
            let pmsg = MainToPeerTask::Disconnect(peer.connected_address());
4✔
994
            self.main_to_peer_broadcast(pmsg);
4✔
995
        }
4✔
996

997
        Ok(())
1✔
998
    }
2✔
999

1000
    /// If necessary, reconnect to the peers listed as CLI arguments.
1001
    ///
1002
    /// Locking:
1003
    ///   * acquires `global_state_lock` for read
1004
    async fn reconnect(&self, main_loop_state: &mut MutableMainLoopState) -> Result<()> {
×
1005
        let connected_peers = self
×
1006
            .global_state_lock
×
1007
            .lock_guard()
×
1008
            .await
×
1009
            .net
1010
            .peer_map
1011
            .keys()
×
1012
            .copied()
×
1013
            .collect_vec();
×
1014
        let peers_with_lost_connection = self
×
1015
            .global_state_lock
×
1016
            .cli()
×
1017
            .peers
×
1018
            .iter()
×
1019
            .filter(|peer| !connected_peers.contains(peer));
×
1020

1021
        // If no connection was lost, there's nothing to do.
1022
        if peers_with_lost_connection.clone().count() == 0 {
×
1023
            return Ok(());
×
1024
        }
×
1025

1026
        // Else, try to reconnect.
1027
        let own_handshake_data = self
×
1028
            .global_state_lock
×
1029
            .lock_guard()
×
1030
            .await
×
1031
            .get_own_handshakedata();
×
1032
        for &peer_with_lost_connection in peers_with_lost_connection {
×
1033
            // Disallow reconnection if peer is in bad standing
1034
            let peer_standing = self
×
1035
                .global_state_lock
×
1036
                .lock_guard()
×
1037
                .await
×
1038
                .net
1039
                .get_peer_standing_from_database(peer_with_lost_connection.ip())
×
1040
                .await;
×
1041
            if peer_standing.is_some_and(|standing| standing.is_bad()) {
×
1042
                info!("Not reconnecting to peer in bad standing: {peer_with_lost_connection}");
×
1043
                continue;
×
1044
            }
×
1045

1046
            info!("Attempting to reconnect to peer: {peer_with_lost_connection}");
×
1047
            let global_state_lock = self.global_state_lock.clone();
×
1048
            let main_to_peer_broadcast_rx = self.main_to_peer_broadcast_tx.subscribe();
×
1049
            let peer_task_to_main_tx = self.peer_task_to_main_tx.to_owned();
×
1050
            let own_handshake_data = own_handshake_data.clone();
×
1051
            let outgoing_connection_task = tokio::task::Builder::new()
×
1052
                .name("call_peer_wrapper_1")
×
1053
                .spawn(async move {
×
1054
                    call_peer(
×
1055
                        peer_with_lost_connection,
×
1056
                        global_state_lock,
×
1057
                        main_to_peer_broadcast_rx,
×
1058
                        peer_task_to_main_tx,
×
1059
                        own_handshake_data,
×
1060
                        1, // All CLI-specified peers have distance 1
×
1061
                    )
×
1062
                    .await;
×
1063
                })?;
×
1064
            main_loop_state.task_handles.push(outgoing_connection_task);
×
1065
            main_loop_state.task_handles.retain(|th| !th.is_finished());
×
1066
        }
1067

1068
        Ok(())
×
1069
    }
×
1070

1071
    /// Perform peer discovery.
1072
    ///
1073
    /// Peer discovery involves finding potential peers from connected peers
1074
    /// and attempts to establish a connection with one of them.
1075
    ///
1076
    /// Locking:
1077
    ///   * acquires `global_state_lock` for read
1078
    async fn discover_peers(&self, main_loop_state: &mut MutableMainLoopState) -> Result<()> {
2✔
1079
        // fetch all relevant info from global state, then release the lock
1080
        let cli_args = self.global_state_lock.cli();
2✔
1081
        let global_state = self.global_state_lock.lock_guard().await;
2✔
1082
        let connected_peers = global_state.net.peer_map.values().cloned().collect_vec();
2✔
1083
        let own_instance_id = global_state.net.instance_id;
2✔
1084
        let own_handshake_data = global_state.get_own_handshakedata();
2✔
1085
        drop(global_state);
2✔
1086

1087
        let num_peers = connected_peers.len();
2✔
1088
        let max_num_peers = cli_args.max_num_peers;
2✔
1089

1090
        // Don't make an outgoing connection if
1091
        // - the peer limit is reached (or exceeded), or
1092
        // - the peer limit is _almost_ reached; reserve the last slot for an
1093
        //   incoming connection.
1094
        if num_peers >= max_num_peers || num_peers > 2 && num_peers - 1 == max_num_peers {
2✔
1095
            info!("Connected to {num_peers} peers. The configured max is {max_num_peers} peers.");
1✔
1096
            info!("Skipping peer discovery.");
1✔
1097
            return Ok(());
1✔
1098
        }
1✔
1099

1100
        info!("Performing peer discovery");
1✔
1101

1102
        // Ask all peers for their peer lists. This will eventually – once the
1103
        // responses have come in – update the list of potential peers.
1104
        let pmsg = MainToPeerTask::MakePeerDiscoveryRequest;
1✔
1105
        self.main_to_peer_broadcast(pmsg);
1✔
1106

1107
        // Get a peer candidate from the list of potential peers. Generally,
1108
        // the peer lists requested in the previous step will not have come in
1109
        // yet. Therefore, the new candidate is selected based on somewhat
1110
        // (but not overly) old information.
1111
        let Some((peer_candidate, candidate_distance)) = main_loop_state
1✔
1112
            .potential_peers
1✔
1113
            .get_candidate(&connected_peers, own_instance_id)
1✔
1114
        else {
1115
            info!("Found no peer candidate to connect to. Not making new connection.");
1✔
1116
            return Ok(());
1✔
1117
        };
1118

1119
        // Try to connect to the selected candidate.
1120
        info!("Connecting to peer {peer_candidate} with distance {candidate_distance}");
×
1121
        let global_state_lock = self.global_state_lock.clone();
×
1122
        let main_to_peer_broadcast_rx = self.main_to_peer_broadcast_tx.subscribe();
×
1123
        let peer_task_to_main_tx = self.peer_task_to_main_tx.to_owned();
×
1124
        let outgoing_connection_task = tokio::task::Builder::new()
×
1125
            .name("call_peer_wrapper_2")
×
1126
            .spawn(async move {
×
1127
                call_peer(
×
1128
                    peer_candidate,
×
1129
                    global_state_lock,
×
1130
                    main_to_peer_broadcast_rx,
×
1131
                    peer_task_to_main_tx,
×
1132
                    own_handshake_data,
×
1133
                    candidate_distance,
×
1134
                )
×
1135
                .await;
×
1136
            })?;
×
1137
        main_loop_state.task_handles.push(outgoing_connection_task);
×
1138
        main_loop_state.task_handles.retain(|th| !th.is_finished());
×
1139

1140
        // Immediately request the new peer's peer list. This allows
1141
        // incorporating the new peer's peers into the list of potential peers,
1142
        // to be used in the next round of peer discovery.
1143
        let m2pmsg = MainToPeerTask::MakeSpecificPeerDiscoveryRequest(peer_candidate);
×
1144
        self.main_to_peer_broadcast(m2pmsg);
×
1145

1146
        Ok(())
×
1147
    }
2✔
1148

1149
    /// Return a list of block heights for a block-batch request.
1150
    ///
1151
    /// Returns an ordered list of the heights of *most preferred block*
1152
    /// to build on, where current tip is always the most preferred block.
1153
    ///
1154
    /// Uses a factor to ensure that the peer will always have something to
1155
    /// build on top of by providing potential starting points all the way
1156
    /// back to genesis.
1157
    fn batch_request_uca_candidate_heights(own_tip_height: BlockHeight) -> Vec<BlockHeight> {
258✔
1158
        const FACTOR: f64 = 1.07f64;
1159

1160
        let mut look_behind = 0;
258✔
1161
        let mut ret = vec![];
258✔
1162

1163
        // A factor of 1.07 can look back ~1m blocks in 200 digests.
1164
        while ret.len() < MAX_NUM_DIGESTS_IN_BATCH_REQUEST - 1 {
51,374✔
1165
            let height = match own_tip_height.checked_sub(look_behind) {
51,118✔
1166
                None => break,
1✔
1167
                Some(height) if height.is_genesis() => break,
51,117✔
1168
                Some(height) => height,
51,116✔
1169
            };
1170

1171
            ret.push(height);
51,116✔
1172
            look_behind = ((look_behind as f64 + 1.0) * FACTOR).floor() as u64;
51,116✔
1173
        }
1174

1175
        ret.push(BlockHeight::genesis());
258✔
1176

1177
        ret
258✔
1178
    }
258✔
1179

1180
    /// Logic for requesting the batch-download of blocks from peers
1181
    ///
1182
    /// Locking:
1183
    ///   * acquires `global_state_lock` for read
1184
    async fn block_sync(&mut self, main_loop_state: &mut MutableMainLoopState) -> Result<()> {
35✔
1185
        let global_state = self.global_state_lock.lock_guard().await;
35✔
1186

1187
        // Check if we are in sync mode
1188
        let Some(anchor) = &global_state.net.sync_anchor else {
35✔
1189
            return Ok(());
33✔
1190
        };
1191

1192
        info!("Running sync");
2✔
1193

1194
        let (own_tip_hash, own_tip_height, own_cumulative_pow) = (
2✔
1195
            global_state.chain.light_state().hash(),
2✔
1196
            global_state.chain.light_state().kernel.header.height,
2✔
1197
            global_state
2✔
1198
                .chain
2✔
1199
                .light_state()
2✔
1200
                .kernel
2✔
1201
                .header
2✔
1202
                .cumulative_proof_of_work,
2✔
1203
        );
2✔
1204

1205
        // Check if sync mode has timed out entirely, in which case it should
1206
        // be abandoned.
1207
        let anchor = anchor.to_owned();
2✔
1208
        if self.now().duration_since(anchor.updated)? > GLOBAL_SYNCHRONIZATION_TIMEOUT {
2✔
1209
            warn!("Sync mode has timed out. Abandoning sync mode.");
1✔
1210

1211
            // Abandon attempt, and punish all peers claiming to serve these
1212
            // blocks.
1213
            drop(global_state);
1✔
1214
            self.global_state_lock
1✔
1215
                .lock_guard_mut()
1✔
1216
                .await
1✔
1217
                .net
1218
                .sync_anchor = None;
1✔
1219

1220
            let peers_to_punish = main_loop_state
1✔
1221
                .sync_state
1✔
1222
                .get_potential_peers_for_sync_request(own_cumulative_pow);
1✔
1223

1224
            for peer in peers_to_punish {
2✔
1225
                let pmsg = MainToPeerTask::PeerSynchronizationTimeout(peer);
1✔
1226
                self.main_to_peer_broadcast(pmsg);
1✔
1227
            }
1✔
1228

1229
            return Ok(());
1✔
1230
        }
1✔
1231

1232
        let (peer_to_sanction, try_new_request): (Option<SocketAddr>, bool) = main_loop_state
1✔
1233
            .sync_state
1✔
1234
            .get_status_of_last_request(own_tip_height, self.now());
1✔
1235

1236
        // Sanction peer if they failed to respond
1237
        if let Some(peer) = peer_to_sanction {
1✔
1238
            let pmsg = MainToPeerTask::PeerSynchronizationTimeout(peer);
×
1239
            self.main_to_peer_broadcast(pmsg);
×
1240
        }
1✔
1241

1242
        if !try_new_request {
1✔
1243
            info!("Waiting for last sync to complete.");
×
1244
            return Ok(());
×
1245
        }
1✔
1246

1247
        // Create the next request from the reported
1248
        info!("Creating new sync request");
1✔
1249

1250
        // Pick a random peer that has reported to have relevant blocks
1251
        let candidate_peers = main_loop_state
1✔
1252
            .sync_state
1✔
1253
            .get_potential_peers_for_sync_request(own_cumulative_pow);
1✔
1254
        let chosen_peer = candidate_peers.choose(&mut rand::rng());
1✔
1255
        assert!(
1✔
1256
            chosen_peer.is_some(),
1✔
1257
            "A synchronization candidate must be available for a request. \
×
1258
            Otherwise, the data structure is in an invalid state and syncing should not be active"
×
1259
        );
1260

1261
        let ordered_preferred_block_digests = match anchor.champion {
1✔
1262
            Some((_height, digest)) => vec![digest],
×
1263
            None => {
1264
                // Find candidate-UCA digests based on a sparse distribution of
1265
                // block heights skewed towards own tip height
1266
                let request_heights = Self::batch_request_uca_candidate_heights(own_tip_height);
1✔
1267
                let mut ordered_preferred_block_digests = vec![];
1✔
1268
                for height in request_heights {
2✔
1269
                    let digest = global_state
1✔
1270
                        .chain
1✔
1271
                        .archival_state()
1✔
1272
                        .archival_block_mmr
1✔
1273
                        .ammr()
1✔
1274
                        .get_leaf_async(height.into())
1✔
1275
                        .await;
1✔
1276
                    ordered_preferred_block_digests.push(digest);
1✔
1277
                }
1278
                ordered_preferred_block_digests
1✔
1279
            }
1280
        };
1281

1282
        // Send message to the relevant peer loop to request the blocks
1283
        let chosen_peer = chosen_peer.unwrap();
1✔
1284
        info!(
1✔
1285
            "Sending block batch request to {}\nrequesting blocks descending from {}\n height {}",
×
1286
            chosen_peer, own_tip_hash, own_tip_height
1287
        );
1288
        let pmsg = MainToPeerTask::RequestBlockBatch(MainToPeerTaskBatchBlockRequest {
1✔
1289
            peer_addr_target: *chosen_peer,
1✔
1290
            known_blocks: ordered_preferred_block_digests,
1✔
1291
            anchor_mmr: anchor.block_mmr.clone(),
1✔
1292
        });
1✔
1293
        self.main_to_peer_broadcast(pmsg);
1✔
1294

1295
        // Record that this request was sent to the peer
1296
        let requested_block_height = own_tip_height.next();
1✔
1297
        main_loop_state
1✔
1298
            .sync_state
1✔
1299
            .record_request(requested_block_height, *chosen_peer, self.now());
1✔
1300

1301
        Ok(())
1✔
1302
    }
35✔
1303

1304
    /// Scheduled task for upgrading the proofs of transactions in the mempool.
1305
    ///
1306
    /// Will either perform a merge of two transactions supported with single
1307
    /// proofs, or will upgrade a transaction proof of the type
1308
    /// `ProofCollection` to `SingleProof`.
1309
    ///
1310
    /// All proving takes place in a spawned task such that it doesn't block
1311
    /// the main loop. The MutableMainLoopState gets the JoinHandle of the
1312
    /// spawned upgrade task such that its status can be expected.
1313
    async fn proof_upgrader(&mut self, main_loop_state: &mut MutableMainLoopState) -> Result<()> {
11✔
1314
        fn attempt_upgrade(
11✔
1315
            global_state: &GlobalState,
11✔
1316
            now: SystemTime,
11✔
1317
            tx_upgrade_interval: Option<Duration>,
11✔
1318
            main_loop_state: &MutableMainLoopState,
11✔
1319
        ) -> Result<bool> {
11✔
1320
            let duration_since_last_upgrade =
11✔
1321
                now.duration_since(global_state.net.last_tx_proof_upgrade_attempt)?;
11✔
1322
            let previous_upgrade_task_is_still_running = main_loop_state
11✔
1323
                .proof_upgrader_task
11✔
1324
                .as_ref()
11✔
1325
                .is_some_and(|x| !x.is_finished());
11✔
1326
            Ok(global_state.net.sync_anchor.is_none()
11✔
1327
                && global_state.proving_capability() == TxProvingCapability::SingleProof
11✔
1328
                && !previous_upgrade_task_is_still_running
3✔
1329
                && tx_upgrade_interval
3✔
1330
                    .is_some_and(|upgrade_interval| duration_since_last_upgrade > upgrade_interval))
3✔
1331
        }
11✔
1332

1333
        trace!("Running proof upgrader scheduled task");
11✔
1334

1335
        // Check if it's time to run the proof-upgrader, and if we're capable
1336
        // of upgrading a transaction proof.
1337
        let tx_upgrade_interval = self.global_state_lock.cli().tx_upgrade_interval();
11✔
1338
        let (upgrade_candidate, tx_origin) = {
1✔
1339
            let global_state = self.global_state_lock.lock_guard().await;
11✔
1340
            let now = self.now();
11✔
1341
            if !attempt_upgrade(&global_state, now, tx_upgrade_interval, main_loop_state)? {
11✔
1342
                trace!("Not attempting upgrade.");
10✔
1343
                return Ok(());
10✔
1344
            }
1✔
1345

1346
            debug!("Attempting to run transaction-proof-upgrade");
1✔
1347

1348
            // Find a candidate for proof upgrade
1349
            let Some((upgrade_candidate, tx_origin)) = get_upgrade_task_from_mempool(&global_state)
1✔
1350
            else {
1351
                debug!("Found no transaction-proof to upgrade");
×
1352
                return Ok(());
×
1353
            };
1354

1355
            (upgrade_candidate, tx_origin)
1✔
1356
        };
1357

1358
        info!(
1✔
1359
            "Attempting to upgrade transaction proofs of: {}",
×
1360
            upgrade_candidate.affected_txids().iter().join("; ")
×
1361
        );
1362

1363
        // Perform the upgrade, if we're not using the prover for anything else,
1364
        // like mining, or proving our own transaction. Running the prover takes
1365
        // a long time (minutes), so we spawn a task for this such that we do
1366
        // not block the main loop.
1367
        let vm_job_queue = vm_job_queue();
1✔
1368
        let perform_ms_update_if_needed =
1✔
1369
            self.global_state_lock.cli().proving_capability() == TxProvingCapability::SingleProof;
1✔
1370

1371
        let global_state_lock_clone = self.global_state_lock.clone();
1✔
1372
        let main_to_peer_broadcast_tx_clone = self.main_to_peer_broadcast_tx.clone();
1✔
1373
        let proof_upgrader_task =
1✔
1374
            tokio::task::Builder::new()
1✔
1375
                .name("proof_upgrader")
1✔
1376
                .spawn(async move {
1✔
1377
                    upgrade_candidate
1✔
1378
                        .handle_upgrade(
1✔
1379
                            vm_job_queue,
1✔
1380
                            tx_origin,
1✔
1381
                            perform_ms_update_if_needed,
1✔
1382
                            global_state_lock_clone,
1✔
1383
                            main_to_peer_broadcast_tx_clone,
1✔
1384
                        )
1✔
1385
                        .await
1✔
1386
                })?;
1✔
1387

1388
        main_loop_state.proof_upgrader_task = Some(proof_upgrader_task);
1✔
1389

1390
        Ok(())
1✔
1391
    }
11✔
1392

1393
    /// Post-processing when new block has arrived. Spawn a task to update
1394
    /// transactions in the mempool. Only when the spawned task has completed,
1395
    /// should the miner continue.
1396
    fn spawn_mempool_txs_update_job(
32✔
1397
        &self,
32✔
1398
        main_loop_state: &mut MutableMainLoopState,
32✔
1399
        update_jobs: Vec<UpdateMutatorSetDataJob>,
32✔
1400
    ) {
32✔
1401
        // job completion of the spawned task is communicated through the
1402
        // `update_mempool_txs_handle` channel.
1403
        let vm_job_queue = vm_job_queue();
32✔
1404
        if let Some(handle) = main_loop_state.update_mempool_txs_handle.as_ref() {
32✔
1405
            handle.abort();
23✔
1406
        }
23✔
1407
        let (update_sender, update_receiver) =
32✔
1408
            mpsc::channel::<Vec<Transaction>>(TX_UPDATER_CHANNEL_CAPACITY);
32✔
1409

1410
        // note: if this task is cancelled, the job will continue
1411
        // because TritonVmJobOptions::cancel_job_rx is None.
1412
        // see how compose_task handles cancellation in mine_loop.
1413
        let job_options = self
32✔
1414
            .global_state_lock
32✔
1415
            .cli()
32✔
1416
            .proof_job_options(TritonVmJobPriority::Highest);
32✔
1417
        main_loop_state.update_mempool_txs_handle = Some(
32✔
1418
            tokio::task::Builder::new()
32✔
1419
                .name("mempool tx ms-updater")
32✔
1420
                .spawn(async move {
32✔
1421
                    Self::update_mempool_jobs(
32✔
1422
                        update_jobs,
32✔
1423
                        vm_job_queue.clone(),
32✔
1424
                        update_sender,
32✔
1425
                        job_options,
32✔
1426
                    )
32✔
1427
                    .await
32✔
1428
                })
32✔
1429
                .unwrap(),
32✔
1430
        );
1431
        main_loop_state.update_mempool_receiver = update_receiver;
32✔
1432
    }
32✔
1433

1434
    pub async fn run(&mut self) -> Result<i32> {
8✔
1435
        info!("Starting main loop");
8✔
1436

1437
        let task_handles = std::mem::take(&mut self.task_handles);
8✔
1438

1439
        // Handle incoming connections, messages from peer tasks, and messages from the mining task
1440
        let mut main_loop_state = MutableMainLoopState::new(task_handles);
8✔
1441

1442
        // Set up various timers.
1443
        //
1444
        // The `MissedTickBehavior::Delay` is appropriate for tasks that don't
1445
        // do anything meaningful if executed in quick succession. For example,
1446
        // pruning stale information immediately after pruning stale information
1447
        // is almost certainly a no-op.
1448
        // Similarly, tasks performing network operations (e.g., peer discovery)
1449
        // should probably not try to “catch up” if some ticks were missed.
1450

1451
        // Don't run peer discovery immediately at startup since outgoing
1452
        // connections started from lib.rs may not have finished yet.
1453
        let mut peer_discovery_interval = time::interval_at(
8✔
1454
            Instant::now() + PEER_DISCOVERY_INTERVAL,
8✔
1455
            PEER_DISCOVERY_INTERVAL,
1456
        );
1457
        peer_discovery_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
8✔
1458

1459
        let mut block_sync_interval = time::interval(SYNC_REQUEST_INTERVAL);
8✔
1460
        block_sync_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
8✔
1461

1462
        let mut mempool_cleanup_interval = time::interval(MEMPOOL_PRUNE_INTERVAL);
8✔
1463
        mempool_cleanup_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
8✔
1464

1465
        let mut utxo_notification_cleanup_interval = time::interval(EXPECTED_UTXOS_PRUNE_INTERVAL);
8✔
1466
        utxo_notification_cleanup_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
8✔
1467

1468
        let mut mp_resync_interval = time::interval(MP_RESYNC_INTERVAL);
8✔
1469
        mp_resync_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
8✔
1470

1471
        let mut tx_proof_upgrade_interval = time::interval(TRANSACTION_UPGRADE_CHECK_INTERVAL);
8✔
1472
        tx_proof_upgrade_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
8✔
1473

1474
        // Spawn tasks to monitor for SIGTERM, SIGINT, and SIGQUIT. These
1475
        // signals are only used on Unix systems.
1476
        let (tx_term, mut rx_term) = mpsc::channel::<()>(2);
8✔
1477
        let (tx_int, mut rx_int) = mpsc::channel::<()>(2);
8✔
1478
        let (tx_quit, mut rx_quit) = mpsc::channel::<()>(2);
8✔
1479
        #[cfg(unix)]
1480
        {
1481
            use tokio::signal::unix::signal;
1482
            use tokio::signal::unix::SignalKind;
1483

1484
            // Monitor for SIGTERM
1485
            let mut sigterm = signal(SignalKind::terminate())?;
8✔
1486
            tokio::task::Builder::new()
8✔
1487
                .name("sigterm_handler")
8✔
1488
                .spawn(async move {
8✔
1489
                    if sigterm.recv().await.is_some() {
8✔
1490
                        info!("Received SIGTERM");
×
1491
                        tx_term.send(()).await.unwrap();
×
1492
                    }
×
1493
                })?;
×
1494

1495
            // Monitor for SIGINT
1496
            let mut sigint = signal(SignalKind::interrupt())?;
8✔
1497
            tokio::task::Builder::new()
8✔
1498
                .name("sigint_handler")
8✔
1499
                .spawn(async move {
8✔
1500
                    if sigint.recv().await.is_some() {
8✔
1501
                        info!("Received SIGINT");
×
1502
                        tx_int.send(()).await.unwrap();
×
1503
                    }
×
1504
                })?;
×
1505

1506
            // Monitor for SIGQUIT
1507
            let mut sigquit = signal(SignalKind::quit())?;
8✔
1508
            tokio::task::Builder::new()
8✔
1509
                .name("sigquit_handler")
8✔
1510
                .spawn(async move {
8✔
1511
                    if sigquit.recv().await.is_some() {
8✔
1512
                        info!("Received SIGQUIT");
×
1513
                        tx_quit.send(()).await.unwrap();
×
1514
                    }
×
1515
                })?;
×
1516
        }
1517

1518
        #[cfg(not(unix))]
1519
        drop((tx_term, tx_int, tx_quit));
1520

1521
        let exit_code: i32 = loop {
×
1522
            select! {
145✔
1523
                Ok(()) = signal::ctrl_c() => {
145✔
1524
                    info!("Detected Ctrl+c signal.");
×
1525
                    break SUCCESS_EXIT_CODE;
×
1526
                }
1527

1528
                // Monitor for SIGTERM, SIGINT, and SIGQUIT.
1529
                Some(_) = rx_term.recv() => {
145✔
1530
                    info!("Detected SIGTERM signal.");
×
1531
                    break SUCCESS_EXIT_CODE;
×
1532
                }
1533
                Some(_) = rx_int.recv() => {
145✔
1534
                    info!("Detected SIGINT signal.");
×
1535
                    break SUCCESS_EXIT_CODE;
×
1536
                }
1537
                Some(_) = rx_quit.recv() => {
145✔
1538
                    info!("Detected SIGQUIT signal.");
×
1539
                    break SUCCESS_EXIT_CODE;
×
1540
                }
1541

1542
                // Handle incoming connections from peer
1543
                Ok((stream, peer_address)) = self.incoming_peer_listener.accept() => {
145✔
1544
                    // Return early if no incoming connections are accepted. Do
1545
                    // not send application-handshake.
1546
                    if self.global_state_lock.cli().disallow_all_incoming_peer_connections() {
3✔
1547
                        warn!("Got incoming connection despite not accepting any. Ignoring");
×
1548
                        continue;
×
1549
                    }
3✔
1550

1551
                    let state = self.global_state_lock.lock_guard().await;
3✔
1552
                    let main_to_peer_broadcast_rx_clone: broadcast::Receiver<MainToPeerTask> = self.main_to_peer_broadcast_tx.subscribe();
3✔
1553
                    let peer_task_to_main_tx_clone: mpsc::Sender<PeerTaskToMain> = self.peer_task_to_main_tx.clone();
3✔
1554
                    let own_handshake_data: HandshakeData = state.get_own_handshakedata();
3✔
1555
                    let global_state_lock = self.global_state_lock.clone(); // bump arc refcount.
3✔
1556
                    let incoming_peer_task_handle = tokio::task::Builder::new()
3✔
1557
                        .name("answer_peer_wrapper")
3✔
1558
                        .spawn(async move {
3✔
1559
                        match answer_peer(
3✔
1560
                            stream,
3✔
1561
                            global_state_lock,
3✔
1562
                            peer_address,
3✔
1563
                            main_to_peer_broadcast_rx_clone,
3✔
1564
                            peer_task_to_main_tx_clone,
3✔
1565
                            own_handshake_data,
3✔
1566
                        ).await {
3✔
1567
                            Ok(()) => (),
×
1568
                            Err(err) => error!("Got error: {:?}", err),
×
1569
                        }
1570
                    })?;
×
1571
                    main_loop_state.task_handles.push(incoming_peer_task_handle);
3✔
1572
                    main_loop_state.task_handles.retain(|th| !th.is_finished());
9✔
1573
                }
1574

1575
                // Handle messages from peer tasks
1576
                Some(msg) = self.peer_task_to_main_rx.recv() => {
145✔
1577
                    debug!("Received message sent to main task.");
18✔
1578
                    self.handle_peer_task_message(
18✔
1579
                        msg,
18✔
1580
                        &mut main_loop_state,
18✔
1581
                    )
18✔
1582
                    .await?
18✔
1583
                }
1584

1585
                // Handle messages from miner task
1586
                Some(main_message) = self.miner_to_main_rx.recv() => {
145✔
1587
                    let exit_code = self.handle_miner_task_message(main_message, &mut main_loop_state).await?;
×
1588

1589
                    if let Some(exit_code) = exit_code {
×
1590
                        break exit_code;
×
1591
                    }
×
1592

1593
                }
1594

1595
                // Handle the completion of mempool tx-update jobs after new block.
1596
                Some(ms_updated_transactions) = main_loop_state.update_mempool_receiver.recv() => {
145✔
1597
                    self.handle_updated_mempool_txs(ms_updated_transactions).await;
31✔
1598
                }
1599

1600
                // Handle messages from rpc server task
1601
                Some(rpc_server_message) = self.rpc_server_to_main_rx.recv() => {
145✔
1602
                    let shutdown_after_execution = self.handle_rpc_server_message(rpc_server_message.clone(), &mut main_loop_state).await?;
23✔
1603
                    if shutdown_after_execution {
23✔
1604
                        break SUCCESS_EXIT_CODE
×
1605
                    }
23✔
1606
                }
1607

1608
                // Handle peer discovery
1609
                _ = peer_discovery_interval.tick() => {
145✔
1610
                    log_slow_scope!(fn_name!() + "::select::peer_discovery_interval");
×
1611

1612
                    // Check number of peers we are connected to and connect to
1613
                    // more peers if needed.
1614
                    debug!("Timer: peer discovery job");
×
1615

1616
                    // this check makes regtest mode behave in a local, controlled way
1617
                    // because no regtest nodes attempt to discover eachother, so the only
1618
                    // peers are those that are manually added.
1619
                    // see: https://github.com/Neptune-Crypto/neptune-core/issues/539#issuecomment-2764701027
1620
                    if self.global_state_lock.cli().network.use_mock_proof() {
×
1621
                        debug!("peer discovery disabled when network uses mock proofs (eg regtest)")
×
1622
                    } else {
1623
                        self.prune_peers().await?;
×
1624
                        self.reconnect(&mut main_loop_state).await?;
×
1625
                        self.discover_peers(&mut main_loop_state).await?;
×
1626
                    }
1627
                }
1628

1629
                // Handle synchronization (i.e. batch-downloading of blocks)
1630
                _ = block_sync_interval.tick() => {
145✔
1631
                    log_slow_scope!(fn_name!() + "::select::block_sync_interval");
32✔
1632

1633
                    trace!("Timer: block-synchronization job");
32✔
1634
                    self.block_sync(&mut main_loop_state).await?;
32✔
1635
                }
1636

1637
                // Clean up mempool: remove stale / too old transactions
1638
                _ = mempool_cleanup_interval.tick() => {
145✔
1639
                    log_slow_scope!(fn_name!() + "::select::mempool_cleanup_interval");
8✔
1640

1641
                    debug!("Timer: mempool-cleaner job");
8✔
1642
                    self
8✔
1643
                        .global_state_lock
8✔
1644
                        .lock_guard_mut()
8✔
1645
                        .await
8✔
1646
                        .mempool_prune_stale_transactions()
8✔
1647
                        .await;
8✔
1648
                }
1649

1650
                // Clean up incoming UTXO notifications: remove stale / too old
1651
                // UTXO notifications from pool
1652
                _ = utxo_notification_cleanup_interval.tick() => {
145✔
1653
                    log_slow_scope!(fn_name!() + "::select::utxo_notification_cleanup_interval");
8✔
1654

1655
                    debug!("Timer: UTXO notification pool cleanup job");
8✔
1656

1657
                    // Danger: possible loss of funds.
1658
                    //
1659
                    // See description of prune_stale_expected_utxos().
1660
                    //
1661
                    // This call is disabled until such time as a thorough
1662
                    // evaluation and perhaps reimplementation determines that
1663
                    // it can be called safely without possible loss of funds.
1664
                    // self.global_state_lock.lock_mut(|s| s.wallet_state.prune_stale_expected_utxos()).await;
1665
                }
1666

1667
                // Handle membership proof resynchronization
1668
                _ = mp_resync_interval.tick() => {
145✔
1669
                    log_slow_scope!(fn_name!() + "::select::mp_resync_interval");
8✔
1670

1671
                    debug!("Timer: Membership proof resync job");
8✔
1672
                    self.global_state_lock.resync_membership_proofs().await?;
8✔
1673
                }
1674

1675
                // run the proof upgrader
1676
                _ = tx_proof_upgrade_interval.tick() => {
145✔
1677
                    log_slow_scope!(fn_name!() + "::select::tx_proof_upgrade_interval");
8✔
1678

1679
                    trace!("Timer: tx-proof-upgrader");
8✔
1680
                    self.proof_upgrader(&mut main_loop_state).await?;
8✔
1681
                }
1682

1683
            }
1684
        };
1685

1686
        self.graceful_shutdown(main_loop_state.task_handles).await?;
×
1687
        info!("Shutdown completed.");
×
1688

1689
        Ok(exit_code)
×
1690
    }
×
1691

1692
    /// Handle messages from the RPC server. Returns `true` iff the client should shut down
1693
    /// after handling this message.
UNCOV
1694
    async fn handle_rpc_server_message(
×
UNCOV
1695
        &mut self,
×
UNCOV
1696
        msg: RPCServerToMain,
×
UNCOV
1697
        main_loop_state: &mut MutableMainLoopState,
×
1698
    ) -> Result<bool> {
23✔
1699
        match msg {
23✔
1700
            RPCServerToMain::BroadcastTx(transaction) => {
4✔
1701
                debug!(
4✔
1702
                    "`main` received following transaction from RPC Server. {} inputs, {} outputs. Synced to mutator set hash: {}",
×
1703
                    transaction.kernel.inputs.len(),
×
1704
                    transaction.kernel.outputs.len(),
×
1705
                    transaction.kernel.mutator_set_hash
×
1706
                );
1707

1708
                // note: this Tx must already have been added to the mempool by
1709
                // sender.  This occurs in GlobalStateLock::record_transaction().
1710

1711
                // Is this a transaction we can share with peers? If so, share
1712
                // it immediately.
1713
                if let Ok(notification) = transaction.as_ref().try_into() {
4✔
1714
                    let pmsg = MainToPeerTask::TransactionNotification(notification);
×
1715
                    self.main_to_peer_broadcast(pmsg);
×
1716
                } else {
×
1717
                    // Otherwise, upgrade its proof quality, and share it by
1718
                    // spinning up the proof upgrader.
1719
                    let TransactionProof::Witness(primitive_witness) = transaction.proof.clone()
4✔
1720
                    else {
1721
                        panic!("Expected Primitive witness. Got: {:?}", transaction.proof);
×
1722
                    };
1723

1724
                    let vm_job_queue = vm_job_queue();
4✔
1725

1726
                    let proving_capability = self.global_state_lock.cli().proving_capability();
4✔
1727
                    let network = self.global_state_lock.cli().network;
4✔
1728
                    let upgrade_job = UpgradeJob::from_primitive_witness(
4✔
1729
                        network,
4✔
1730
                        proving_capability,
4✔
1731
                        primitive_witness,
4✔
1732
                    );
1733

1734
                    // note: handle_upgrade() hands off proving to the
1735
                    //       triton-vm job queue and waits for job completion.
1736
                    // note: handle_upgrade() broadcasts to peers on success.
1737

1738
                    let global_state_lock_clone = self.global_state_lock.clone();
4✔
1739
                    let main_to_peer_broadcast_tx_clone = self.main_to_peer_broadcast_tx.clone();
4✔
1740
                    let _proof_upgrader_task = tokio::task::Builder::new()
4✔
1741
                        .name("proof_upgrader")
4✔
1742
                        .spawn(async move {
4✔
1743
                        upgrade_job
4✔
1744
                            .handle_upgrade(
4✔
1745
                                vm_job_queue.clone(),
4✔
1746
                                TransactionOrigin::Own,
4✔
1747
                                true,
4✔
1748
                                global_state_lock_clone,
4✔
1749
                                main_to_peer_broadcast_tx_clone,
4✔
1750
                            )
4✔
1751
                            .await
4✔
1752
                    })?;
4✔
1753

1754
                    // main_loop_state.proof_upgrader_task = Some(proof_upgrader_task);
1755
                    // If transaction could not be shared immediately because
1756
                    // it contains secret data, upgrade its proof-type.
1757
                }
1758

1759
                // do not shut down
1760
                Ok(false)
4✔
1761
            }
1762
            RPCServerToMain::BroadcastMempoolTransactions => {
1763
                info!("Broadcasting transaction notifications for all shareable transactions in mempool");
×
1764
                let state = self.global_state_lock.lock_guard().await;
×
1765
                let txs = state.mempool.get_sorted_iter().collect_vec();
×
1766
                for (txid, _) in txs {
×
1767
                    // Since a read-lock is held over global state, the
1768
                    // transaction must exist in the mempool.
1769
                    let tx = state
×
1770
                        .mempool
×
1771
                        .get(txid)
×
1772
                        .expect("Transaction from iter must exist in mempool");
×
1773
                    let notification = TransactionNotification::try_from(tx);
×
1774
                    match notification {
×
1775
                        Ok(notification) => {
×
1776
                            let pmsg = MainToPeerTask::TransactionNotification(notification);
×
1777
                            self.main_to_peer_broadcast(pmsg);
×
1778
                        }
×
1779
                        Err(error) => {
×
1780
                            warn!("{error}");
×
1781
                        }
1782
                    };
1783
                }
1784
                Ok(false)
×
1785
            }
1786
            RPCServerToMain::ClearMempool => {
1787
                info!("Clearing mempool");
×
1788
                self.global_state_lock
×
1789
                    .lock_guard_mut()
×
1790
                    .await
×
1791
                    .mempool_clear()
×
1792
                    .await;
×
1793

1794
                Ok(false)
×
1795
            }
1796
            RPCServerToMain::ProofOfWorkSolution(new_block) => {
19✔
1797
                info!("Handling PoW solution from RPC call");
19✔
1798

1799
                self.handle_self_guessed_block(main_loop_state, new_block)
19✔
1800
                    .await?;
19✔
1801
                Ok(false)
19✔
1802
            }
1803
            RPCServerToMain::PauseMiner => {
1804
                info!("Received RPC request to stop miner");
×
1805

1806
                self.main_to_miner_tx.send(MainToMiner::StopMining);
×
1807
                Ok(false)
×
1808
            }
1809
            RPCServerToMain::RestartMiner => {
1810
                info!("Received RPC request to start miner");
×
1811
                self.main_to_miner_tx.send(MainToMiner::StartMining);
×
1812
                Ok(false)
×
1813
            }
1814
            RPCServerToMain::Shutdown => {
1815
                info!("Received RPC shutdown request.");
×
1816

1817
                // shut down
1818
                Ok(true)
×
1819
            }
1820
        }
1821
    }
23✔
1822

1823
    async fn graceful_shutdown(&mut self, task_handles: Vec<JoinHandle<()>>) -> Result<()> {
×
1824
        info!("Shutdown initiated.");
×
1825

1826
        // Stop mining
1827
        self.main_to_miner_tx.send(MainToMiner::Shutdown);
×
1828

1829
        // Send 'bye' message to all peers.
1830
        let pmsg = MainToPeerTask::DisconnectAll();
×
1831
        self.main_to_peer_broadcast(pmsg);
×
1832
        debug!("sent bye");
×
1833

1834
        // Flush all databases
1835
        self.global_state_lock.flush_databases().await?;
×
1836

1837
        tokio::time::sleep(Duration::from_millis(50)).await;
×
1838

1839
        // Child processes should have finished by now. If not, abort them violently.
1840
        task_handles.iter().for_each(|jh| jh.abort());
×
1841

1842
        // wait for all to finish.
1843
        futures::future::join_all(task_handles).await;
×
1844

1845
        Ok(())
×
1846
    }
×
1847

1848
    // broadcasts message to peers (if any connected)
1849
    //
1850
    // panics if broadcast failed and channel receiver_count is non-zero
1851
    // indicating we have peer connections.
1852
    fn main_to_peer_broadcast(&self, msg: MainToPeerTask) {
43✔
1853
        if let Err(e) = self.main_to_peer_broadcast_tx.send(msg) {
43✔
1854
            // tbd: maybe we should just log an error and ignore rather
1855
            // than panic.  but for now this preserves prior behavior
1856
            let receiver_count = self.main_to_peer_broadcast_tx.receiver_count();
7✔
1857
            assert_eq!(
7✔
1858
                receiver_count, 0,
1859
                "failed to broadcast message from main to {} peer loops: {:?}",
×
1860
                receiver_count, e
1861
            );
1862
        }
36✔
1863
    }
43✔
1864
}
1865

1866
#[cfg(test)]
1867
#[cfg_attr(coverage_nightly, coverage(off))]
1868
mod tests {
1869
    use std::str::FromStr;
1870
    use std::time::UNIX_EPOCH;
1871

1872
    use macro_rules_attr::apply;
1873
    use tracing_test::traced_test;
1874

1875
    use super::*;
1876
    use crate::config_models::cli_args;
1877
    use crate::config_models::network::Network;
1878
    use crate::tests::shared::get_dummy_peer_incoming;
1879
    use crate::tests::shared::get_test_genesis_setup;
1880
    use crate::tests::shared::invalid_empty_block;
1881
    use crate::tests::shared_tokio_runtime;
1882
    use crate::MINER_CHANNEL_CAPACITY;
1883

1884
    impl MainLoopHandler {
1885
        fn mutable(&mut self) -> MutableMainLoopState {
1886
            MutableMainLoopState::new(std::mem::take(&mut self.task_handles))
1887
        }
1888
    }
1889

1890
    struct TestSetup {
1891
        main_loop_handler: MainLoopHandler,
1892
        main_to_peer_rx: broadcast::Receiver<MainToPeerTask>,
1893
    }
1894

1895
    async fn setup(num_init_peers_outgoing: u8, num_peers_incoming: u8) -> TestSetup {
1896
        const CHANNEL_CAPACITY_MINER_TO_MAIN: usize = 10;
1897

1898
        let network = Network::Main;
1899
        let (
1900
            main_to_peer_tx,
1901
            main_to_peer_rx,
1902
            peer_to_main_tx,
1903
            peer_to_main_rx,
1904
            mut state,
1905
            _own_handshake_data,
1906
        ) = get_test_genesis_setup(network, num_init_peers_outgoing, cli_args::Args::default())
1907
            .await
1908
            .unwrap();
1909
        assert!(
1910
            state
1911
                .lock_guard()
1912
                .await
1913
                .net
1914
                .peer_map
1915
                .iter()
1916
                .all(|(_addr, peer)| peer.connection_is_outbound()),
1917
            "Test assumption: All initial peers must represent outgoing connections."
1918
        );
1919

1920
        for i in 0..num_peers_incoming {
1921
            let peer_address = SocketAddr::from_str(&format!("255.254.253.{i}:8080")).unwrap();
1922
            state
1923
                .lock_guard_mut()
1924
                .await
1925
                .net
1926
                .peer_map
1927
                .insert(peer_address, get_dummy_peer_incoming(peer_address));
1928
        }
1929

1930
        let incoming_peer_listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1931

1932
        let (main_to_miner_tx, _main_to_miner_rx) =
1933
            mpsc::channel::<MainToMiner>(MINER_CHANNEL_CAPACITY);
1934
        let (_miner_to_main_tx, miner_to_main_rx) =
1935
            mpsc::channel::<MinerToMain>(CHANNEL_CAPACITY_MINER_TO_MAIN);
1936
        let (_rpc_server_to_main_tx, rpc_server_to_main_rx) =
1937
            mpsc::channel::<RPCServerToMain>(CHANNEL_CAPACITY_MINER_TO_MAIN);
1938

1939
        let task_join_handles = vec![];
1940

1941
        let main_loop_handler = MainLoopHandler::new(
1942
            incoming_peer_listener,
1943
            state,
1944
            main_to_peer_tx,
1945
            peer_to_main_tx,
1946
            main_to_miner_tx,
1947
            peer_to_main_rx,
1948
            miner_to_main_rx,
1949
            rpc_server_to_main_rx,
1950
            task_join_handles,
1951
        );
1952
        TestSetup {
1953
            main_loop_handler,
1954
            main_to_peer_rx,
1955
        }
1956
    }
1957

1958
    #[apply(shared_tokio_runtime)]
1959
    async fn handle_self_guessed_block_new_tip() {
1960
        // A new tip is registered by main_loop. Verify correct state update.
1961
        let TestSetup {
1962
            mut main_loop_handler,
1963
            mut main_to_peer_rx,
1964
            ..
1965
        } = setup(1, 0).await;
1966
        let network = main_loop_handler.global_state_lock.cli().network;
1967
        let mut mutable_main_loop_state = main_loop_handler.mutable();
1968

1969
        let block1 = invalid_empty_block(&Block::genesis(network));
1970

1971
        assert!(
1972
            main_loop_handler
1973
                .global_state_lock
1974
                .lock_guard()
1975
                .await
1976
                .chain
1977
                .light_state()
1978
                .header()
1979
                .height
1980
                .is_genesis(),
1981
            "Tip must be genesis prior to handling of new block"
1982
        );
1983

1984
        let block1 = Box::new(block1);
1985
        main_loop_handler
1986
            .handle_self_guessed_block(&mut mutable_main_loop_state, block1.clone())
1987
            .await
1988
            .unwrap();
1989
        let new_block_height: u64 = main_loop_handler
1990
            .global_state_lock
1991
            .lock_guard()
1992
            .await
1993
            .chain
1994
            .light_state()
1995
            .header()
1996
            .height
1997
            .into();
1998
        assert_eq!(
1999
            1u64, new_block_height,
2000
            "Tip height must be 1 after handling of new block"
2001
        );
2002
        let msg_to_peer_loops = main_to_peer_rx.recv().await.unwrap();
2003
        if let MainToPeerTask::Block(block_to_peers) = msg_to_peer_loops {
2004
            assert_eq!(
2005
                block1, block_to_peers,
2006
                "Peer loops must have received block 1"
2007
            );
2008
        } else {
2009
            panic!("Must have sent block notification to peer loops")
2010
        }
2011
    }
2012

2013
    mod sync_mode {
2014
        use tasm_lib::twenty_first::util_types::mmr::mmr_accumulator::MmrAccumulator;
2015
        use test_strategy::proptest;
2016

2017
        use super::*;
2018
        use crate::tests::shared::get_dummy_socket_address;
2019

2020
        #[proptest]
2021
        fn batch_request_heights_prop(#[strategy(0u64..100_000_000_000)] own_height: u64) {
2022
            batch_request_heights_sanity(own_height);
2023
        }
2024

2025
        #[test]
2026
        fn batch_request_heights_unit() {
2027
            let own_height = 1_000_000u64;
2028
            batch_request_heights_sanity(own_height);
2029
        }
2030

2031
        fn batch_request_heights_sanity(own_height: u64) {
2032
            let heights = MainLoopHandler::batch_request_uca_candidate_heights(own_height.into());
2033

2034
            let mut heights_rev = heights.clone();
2035
            heights_rev.reverse();
2036
            assert!(
2037
                heights_rev.is_sorted(),
2038
                "Heights must be sorted from high-to-low"
2039
            );
2040

2041
            heights_rev.dedup();
2042
            assert_eq!(heights_rev.len(), heights.len(), "duplicates");
2043

2044
            assert_eq!(heights[0], own_height.into(), "starts with own tip height");
2045
            assert!(
2046
                heights.last().unwrap().is_genesis(),
2047
                "ends with genesis block"
2048
            );
2049
        }
2050

2051
        #[apply(shared_tokio_runtime)]
2052
        #[traced_test]
2053
        async fn sync_mode_abandoned_on_global_timeout() {
2054
            let num_outgoing_connections = 0;
2055
            let num_incoming_connections = 0;
2056
            let TestSetup {
2057
                mut main_loop_handler,
2058
                main_to_peer_rx: _main_to_peer_rx,
2059
                ..
2060
            } = setup(num_outgoing_connections, num_incoming_connections).await;
2061
            let mut mutable_main_loop_state = main_loop_handler.mutable();
2062

2063
            main_loop_handler
2064
                .block_sync(&mut mutable_main_loop_state)
2065
                .await
2066
                .expect("Must return OK when no sync mode is set");
2067

2068
            // Mock that we are in a valid sync state
2069
            let claimed_max_height = 1_000u64.into();
2070
            let claimed_max_pow = ProofOfWork::new([100; 6]);
2071
            main_loop_handler
2072
                .global_state_lock
2073
                .lock_guard_mut()
2074
                .await
2075
                .net
2076
                .sync_anchor = Some(SyncAnchor::new(
2077
                claimed_max_pow,
2078
                MmrAccumulator::new_from_leafs(vec![]),
2079
            ));
2080
            mutable_main_loop_state.sync_state.peer_sync_states.insert(
2081
                get_dummy_socket_address(0),
2082
                PeerSynchronizationState::new(claimed_max_height, claimed_max_pow),
2083
            );
2084

2085
            let sync_start_time = main_loop_handler
2086
                .global_state_lock
2087
                .lock_guard()
2088
                .await
2089
                .net
2090
                .sync_anchor
2091
                .as_ref()
2092
                .unwrap()
2093
                .updated;
2094
            main_loop_handler
2095
                .block_sync(&mut mutable_main_loop_state)
2096
                .await
2097
                .expect("Must return OK when sync mode has not timed out yet");
2098
            assert!(
2099
                main_loop_handler
2100
                    .global_state_lock
2101
                    .lock_guard()
2102
                    .await
2103
                    .net
2104
                    .sync_anchor
2105
                    .is_some(),
2106
                "Sync mode must still be set before timeout has occurred"
2107
            );
2108

2109
            assert_eq!(
2110
                sync_start_time,
2111
                main_loop_handler
2112
                    .global_state_lock
2113
                    .lock_guard()
2114
                    .await
2115
                    .net
2116
                    .sync_anchor
2117
                    .as_ref()
2118
                    .unwrap()
2119
                    .updated,
2120
                "timestamp may not be updated without state change"
2121
            );
2122

2123
            // Mock that sync-mode has timed out
2124
            main_loop_handler = main_loop_handler.with_mocked_time(
2125
                SystemTime::now() + GLOBAL_SYNCHRONIZATION_TIMEOUT + Duration::from_secs(1),
2126
            );
2127

2128
            main_loop_handler
2129
                .block_sync(&mut mutable_main_loop_state)
2130
                .await
2131
                .expect("Must return OK when sync mode has timed out");
2132
            assert!(
2133
                main_loop_handler
2134
                    .global_state_lock
2135
                    .lock_guard()
2136
                    .await
2137
                    .net
2138
                    .sync_anchor
2139
                    .is_none(),
2140
                "Sync mode must be unset on timeout"
2141
            );
2142
        }
2143
    }
2144

2145
    mod proof_upgrader {
2146
        use super::*;
2147
        use crate::models::blockchain::transaction::Transaction;
2148
        use crate::models::blockchain::transaction::TransactionProof;
2149
        use crate::models::blockchain::type_scripts::native_currency_amount::NativeCurrencyAmount;
2150
        use crate::models::peer::transfer_transaction::TransactionProofQuality;
2151
        use crate::models::proof_abstractions::timestamp::Timestamp;
2152
        use crate::models::state::tx_creation_config::TxCreationConfig;
2153
        use crate::models::state::wallet::transaction_output::TxOutput;
2154

2155
        async fn tx_no_outputs(
2156
            global_state_lock: &mut GlobalStateLock,
2157
            tx_proof_type: TxProvingCapability,
2158
            fee: NativeCurrencyAmount,
2159
        ) -> Arc<Transaction> {
2160
            let change_key = global_state_lock
2161
                .lock_guard()
2162
                .await
2163
                .wallet_state
2164
                .wallet_entropy
2165
                .nth_generation_spending_key_for_tests(0);
2166
            let in_seven_months = global_state_lock
2167
                .lock_guard()
2168
                .await
2169
                .chain
2170
                .light_state()
2171
                .header()
2172
                .timestamp
2173
                + Timestamp::months(7);
2174

2175
            let config = TxCreationConfig::default()
2176
                .recover_change_off_chain(change_key.into())
2177
                .with_prover_capability(tx_proof_type);
2178
            global_state_lock
2179
                .api()
2180
                .tx_initiator_internal()
2181
                .create_transaction(Vec::<TxOutput>::new().into(), fee, in_seven_months, config)
2182
                .await
2183
                .unwrap()
2184
                .transaction
2185
        }
2186

2187
        #[apply(shared_tokio_runtime)]
2188
        #[traced_test]
2189
        async fn upgrade_proof_collection_to_single_proof_foreign_tx() {
2190
            let num_outgoing_connections = 0;
2191
            let num_incoming_connections = 0;
2192
            let TestSetup {
2193
                mut main_loop_handler,
2194
                mut main_to_peer_rx,
2195
                ..
2196
            } = setup(num_outgoing_connections, num_incoming_connections).await;
2197

2198
            // Force instance to create SingleProofs, otherwise CI and other
2199
            // weak machines fail.
2200
            let mocked_cli = cli_args::Args {
2201
                tx_proving_capability: Some(TxProvingCapability::SingleProof),
2202
                tx_proof_upgrade_interval: 100, // seconds
2203
                ..Default::default()
2204
            };
2205

2206
            main_loop_handler
2207
                .global_state_lock
2208
                .set_cli(mocked_cli)
2209
                .await;
2210
            let mut main_loop_handler = main_loop_handler.with_mocked_time(SystemTime::now());
2211
            let mut mutable_main_loop_state = main_loop_handler.mutable();
2212

2213
            assert!(
2214
                main_loop_handler
2215
                    .proof_upgrader(&mut mutable_main_loop_state)
2216
                    .await
2217
                    .is_ok(),
2218
                "Scheduled task returns OK when run on empty mempool"
2219
            );
2220

2221
            let fee = NativeCurrencyAmount::coins(1);
2222
            let proof_collection_tx = tx_no_outputs(
2223
                &mut main_loop_handler.global_state_lock,
2224
                TxProvingCapability::ProofCollection,
2225
                fee,
2226
            )
2227
            .await;
2228

2229
            main_loop_handler
2230
                .global_state_lock
2231
                .lock_guard_mut()
2232
                .await
2233
                .mempool_insert((*proof_collection_tx).clone(), TransactionOrigin::Foreign)
2234
                .await;
2235

2236
            assert!(
2237
                main_loop_handler
2238
                    .proof_upgrader(&mut mutable_main_loop_state)
2239
                    .await
2240
                    .is_ok(),
2241
                "Scheduled task returns OK when it's not yet time to upgrade"
2242
            );
2243

2244
            assert!(
2245
                matches!(
2246
                    main_loop_handler
2247
                        .global_state_lock
2248
                        .lock_guard()
2249
                        .await
2250
                        .mempool
2251
                        .get(proof_collection_tx.kernel.txid())
2252
                        .unwrap()
2253
                        .proof,
2254
                    TransactionProof::ProofCollection(_)
2255
                ),
2256
                "Proof in mempool must still be of type proof collection"
2257
            );
2258

2259
            // Mock that enough time has passed to perform the upgrade. Then
2260
            // perform the upgrade.
2261
            let mut main_loop_handler =
2262
                main_loop_handler.with_mocked_time(SystemTime::now() + Duration::from_secs(300));
2263
            assert!(
2264
                main_loop_handler
2265
                    .proof_upgrader(&mut mutable_main_loop_state)
2266
                    .await
2267
                    .is_ok(),
2268
                "Scheduled task must return OK when it's time to upgrade"
2269
            );
2270

2271
            // Wait for upgrade task to finish.
2272
            let handle = mutable_main_loop_state.proof_upgrader_task.unwrap().await;
2273
            assert!(
2274
                handle.is_ok(),
2275
                "Proof-upgrade task must finish successfully."
2276
            );
2277

2278
            // At this point there should be one transaction in the mempool,
2279
            // which is (if all is well) the merger of the ProofCollection
2280
            // transaction inserted above and one of the upgrader's fee
2281
            // gobblers. The point is that this transaction is a SingleProof
2282
            // transaction, so test that.
2283

2284
            let (merged_txid, _) = main_loop_handler
2285
                .global_state_lock
2286
                .lock_guard()
2287
                .await
2288
                .mempool
2289
                .get_sorted_iter()
2290
                .next_back()
2291
                .expect("mempool should contain one item here");
2292

2293
            assert!(
2294
                matches!(
2295
                    main_loop_handler
2296
                        .global_state_lock
2297
                        .lock_guard()
2298
                        .await
2299
                        .mempool
2300
                        .get(merged_txid)
2301
                        .unwrap()
2302
                        .proof,
2303
                    TransactionProof::SingleProof(_)
2304
                ),
2305
                "Proof in mempool must now be of type single proof"
2306
            );
2307

2308
            match main_to_peer_rx.recv().await {
2309
                Ok(MainToPeerTask::TransactionNotification(tx_noti)) => {
2310
                    assert_eq!(merged_txid, tx_noti.txid);
2311
                    assert_eq!(TransactionProofQuality::SingleProof, tx_noti.proof_quality);
2312
                },
2313
                other => panic!("Must have sent transaction notification to peer loop after successful proof upgrade. Got:\n{other:?}"),
2314
            }
2315
        }
2316
    }
2317

2318
    mod peer_discovery {
2319
        use super::*;
2320

2321
        #[apply(shared_tokio_runtime)]
2322
        #[traced_test]
2323
        async fn prune_peers_too_many_connections() {
2324
            let num_init_peers_outgoing = 10;
2325
            let num_init_peers_incoming = 4;
2326
            let TestSetup {
2327
                mut main_loop_handler,
2328
                mut main_to_peer_rx,
2329
                ..
2330
            } = setup(num_init_peers_outgoing, num_init_peers_incoming).await;
2331

2332
            let mocked_cli = cli_args::Args {
2333
                max_num_peers: num_init_peers_outgoing as usize,
2334
                ..Default::default()
2335
            };
2336

2337
            main_loop_handler
2338
                .global_state_lock
2339
                .set_cli(mocked_cli)
2340
                .await;
2341

2342
            main_loop_handler.prune_peers().await.unwrap();
2343
            assert_eq!(4, main_to_peer_rx.len());
2344
            for _ in 0..4 {
2345
                let peer_msg = main_to_peer_rx.recv().await.unwrap();
2346
                assert!(matches!(peer_msg, MainToPeerTask::Disconnect(_)))
2347
            }
2348
        }
2349

2350
        #[apply(shared_tokio_runtime)]
2351
        #[traced_test]
2352
        async fn prune_peers_not_too_many_connections() {
2353
            let num_init_peers_outgoing = 10;
2354
            let num_init_peers_incoming = 1;
2355
            let TestSetup {
2356
                mut main_loop_handler,
2357
                main_to_peer_rx,
2358
                ..
2359
            } = setup(num_init_peers_outgoing, num_init_peers_incoming).await;
2360

2361
            let mocked_cli = cli_args::Args {
2362
                max_num_peers: 200,
2363
                ..Default::default()
2364
            };
2365

2366
            main_loop_handler
2367
                .global_state_lock
2368
                .set_cli(mocked_cli)
2369
                .await;
2370

2371
            main_loop_handler.prune_peers().await.unwrap();
2372
            assert!(main_to_peer_rx.is_empty());
2373
        }
2374

2375
        #[apply(shared_tokio_runtime)]
2376
        #[traced_test]
2377
        async fn skip_peer_discovery_if_peer_limit_is_exceeded() {
2378
            let num_init_peers_outgoing = 2;
2379
            let num_init_peers_incoming = 0;
2380
            let TestSetup {
2381
                mut main_loop_handler,
2382
                ..
2383
            } = setup(num_init_peers_outgoing, num_init_peers_incoming).await;
2384

2385
            let mocked_cli = cli_args::Args {
2386
                max_num_peers: 0,
2387
                ..Default::default()
2388
            };
2389
            main_loop_handler
2390
                .global_state_lock
2391
                .set_cli(mocked_cli)
2392
                .await;
2393
            let mut mutable_state = main_loop_handler.mutable();
2394
            main_loop_handler
2395
                .discover_peers(&mut mutable_state)
2396
                .await
2397
                .unwrap();
2398

2399
            assert!(logs_contain("Skipping peer discovery."));
2400
        }
2401

2402
        #[apply(shared_tokio_runtime)]
2403
        #[traced_test]
2404
        async fn performs_peer_discovery_on_few_connections() {
2405
            let num_init_peers_outgoing = 2;
2406
            let num_init_peers_incoming = 0;
2407
            let TestSetup {
2408
                mut main_loop_handler,
2409
                mut main_to_peer_rx,
2410
                ..
2411
            } = setup(num_init_peers_outgoing, num_init_peers_incoming).await;
2412

2413
            // Set CLI to attempt to make more connections
2414
            let mocked_cli = cli_args::Args {
2415
                max_num_peers: 10,
2416
                ..Default::default()
2417
            };
2418
            main_loop_handler
2419
                .global_state_lock
2420
                .set_cli(mocked_cli)
2421
                .await;
2422
            let mut mutable_state = main_loop_handler.mutable();
2423
            main_loop_handler
2424
                .discover_peers(&mut mutable_state)
2425
                .await
2426
                .unwrap();
2427

2428
            let peer_discovery_sent_messages_on_peer_channel = main_to_peer_rx.try_recv().is_ok();
2429
            assert!(peer_discovery_sent_messages_on_peer_channel);
2430
            assert!(logs_contain("Performing peer discovery"));
2431
        }
2432
    }
2433

2434
    #[test]
2435
    fn older_systemtime_ranks_first() {
2436
        let start = UNIX_EPOCH;
2437
        let other = UNIX_EPOCH + Duration::from_secs(1000);
2438
        let mut instants = [start, other];
2439

2440
        assert_eq!(
2441
            start,
2442
            instants.iter().copied().min_by(|l, r| l.cmp(r)).unwrap()
2443
        );
2444

2445
        instants.reverse();
2446

2447
        assert_eq!(
2448
            start,
2449
            instants.iter().copied().min_by(|l, r| l.cmp(r)).unwrap()
2450
        );
2451
    }
2452
    mod bootstrapper_mode {
2453

2454
        use rand::Rng;
2455

2456
        use super::*;
2457
        use crate::models::peer::PeerMessage;
2458
        use crate::models::peer::TransferConnectionStatus;
2459
        use crate::tests::shared::get_dummy_peer_connection_data_genesis;
2460
        use crate::tests::shared::to_bytes;
2461

2462
        #[apply(shared_tokio_runtime)]
2463
        #[traced_test]
2464
        async fn disconnect_from_oldest_peer_upon_connection_request() {
2465
            // Set up a node in bootstrapper mode and connected to a given
2466
            // number of peers, which is one less than the maximum. Initiate a
2467
            // connection request. Verify that the oldest of the existing
2468
            // connections is dropped.
2469

2470
            let network = Network::Main;
2471
            let num_init_peers_outgoing = 5;
2472
            let num_init_peers_incoming = 0;
2473
            let TestSetup {
2474
                mut main_loop_handler,
2475
                mut main_to_peer_rx,
2476
                ..
2477
            } = setup(num_init_peers_outgoing, num_init_peers_incoming).await;
2478

2479
            let mocked_cli = cli_args::Args {
2480
                max_num_peers: usize::from(num_init_peers_outgoing) + 1,
2481
                bootstrap: true,
2482
                network,
2483
                ..Default::default()
2484
            };
2485
            main_loop_handler
2486
                .global_state_lock
2487
                .set_cli(mocked_cli)
2488
                .await;
2489

2490
            let mut mutable_main_loop_state = main_loop_handler.mutable();
2491

2492
            // check sanity: at startup, we are connected to the initial number of peers
2493
            assert_eq!(
2494
                usize::from(num_init_peers_outgoing),
2495
                main_loop_handler
2496
                    .global_state_lock
2497
                    .lock_guard()
2498
                    .await
2499
                    .net
2500
                    .peer_map
2501
                    .len()
2502
            );
2503

2504
            // randomize "connection established" timestamps
2505
            let mut rng = rand::rng();
2506
            let now = SystemTime::now();
2507
            let now_as_unix_timestamp = now.duration_since(UNIX_EPOCH).unwrap();
2508
            main_loop_handler
2509
                .global_state_lock
2510
                .lock_guard_mut()
2511
                .await
2512
                .net
2513
                .peer_map
2514
                .iter_mut()
2515
                .for_each(|(_socket_address, peer_info)| {
2516
                    peer_info.set_connection_established(
2517
                        UNIX_EPOCH
2518
                            + Duration::from_millis(
2519
                                rng.random_range(0..(now_as_unix_timestamp.as_millis() as u64)),
2520
                            ),
2521
                    );
2522
                });
2523

2524
            // compute which peer will be dropped, for later reference
2525
            let expected_drop_peer_socket_address = main_loop_handler
2526
                .global_state_lock
2527
                .lock_guard()
2528
                .await
2529
                .net
2530
                .peer_map
2531
                .iter()
2532
                .min_by(|l, r| {
2533
                    l.1.connection_established()
2534
                        .cmp(&r.1.connection_established())
2535
                })
2536
                .map(|(socket_address, _peer_info)| socket_address)
2537
                .copied()
2538
                .unwrap();
2539

2540
            // simulate incoming connection
2541
            let (peer_handshake_data, peer_socket_address) =
2542
                get_dummy_peer_connection_data_genesis(network, 1);
2543
            let own_handshake_data = main_loop_handler
2544
                .global_state_lock
2545
                .lock_guard()
2546
                .await
2547
                .get_own_handshakedata();
2548
            assert_eq!(peer_handshake_data.network, own_handshake_data.network,);
2549
            assert_eq!(peer_handshake_data.version, own_handshake_data.version,);
2550
            let mock_stream = tokio_test::io::Builder::new()
2551
                .read(
2552
                    &to_bytes(&PeerMessage::Handshake(Box::new((
2553
                        crate::MAGIC_STRING_REQUEST.to_vec(),
2554
                        peer_handshake_data.clone(),
2555
                    ))))
2556
                    .unwrap(),
2557
                )
2558
                .write(
2559
                    &to_bytes(&PeerMessage::Handshake(Box::new((
2560
                        crate::MAGIC_STRING_RESPONSE.to_vec(),
2561
                        own_handshake_data.clone(),
2562
                    ))))
2563
                    .unwrap(),
2564
                )
2565
                .write(
2566
                    &to_bytes(&PeerMessage::ConnectionStatus(
2567
                        TransferConnectionStatus::Accepted,
2568
                    ))
2569
                    .unwrap(),
2570
                )
2571
                .build();
2572
            let peer_to_main_tx_clone = main_loop_handler.peer_task_to_main_tx.clone();
2573
            let global_state_lock_clone = main_loop_handler.global_state_lock.clone();
2574
            let (_main_to_peer_tx_mock, main_to_peer_rx_mock) = tokio::sync::broadcast::channel(10);
2575
            let incoming_peer_task_handle = tokio::task::Builder::new()
2576
                .name("answer_peer_wrapper")
2577
                .spawn(async move {
2578
                    match answer_peer(
2579
                        mock_stream,
2580
                        global_state_lock_clone,
2581
                        peer_socket_address,
2582
                        main_to_peer_rx_mock,
2583
                        peer_to_main_tx_clone,
2584
                        own_handshake_data,
2585
                    )
2586
                    .await
2587
                    {
2588
                        Ok(()) => (),
2589
                        Err(err) => error!("Got error: {:?}", err),
2590
                    }
2591
                })
2592
                .unwrap();
2593

2594
            // `answer_peer_wrapper` should send a
2595
            // `DisconnectFromLongestLivedPeer` message to main
2596
            let peer_to_main_message = main_loop_handler.peer_task_to_main_rx.recv().await.unwrap();
2597
            assert!(matches!(
2598
                peer_to_main_message,
2599
                PeerTaskToMain::DisconnectFromLongestLivedPeer,
2600
            ));
2601

2602
            // process this message
2603
            main_loop_handler
2604
                .handle_peer_task_message(peer_to_main_message, &mut mutable_main_loop_state)
2605
                .await
2606
                .unwrap();
2607

2608
            // main loop should send a `Disconnect` message
2609
            let main_to_peers_message = main_to_peer_rx.recv().await.unwrap();
2610
            let MainToPeerTask::Disconnect(observed_drop_peer_socket_address) =
2611
                main_to_peers_message
2612
            else {
2613
                panic!("Expected disconnect, got {main_to_peers_message:?}");
2614
            };
2615

2616
            // matched observed droppee against expectation
2617
            assert_eq!(
2618
                expected_drop_peer_socket_address,
2619
                observed_drop_peer_socket_address,
2620
            );
2621
            println!("Dropped connection with {expected_drop_peer_socket_address}.");
2622

2623
            // don't forget to terminate the peer task, which is still running
2624
            incoming_peer_task_handle.abort();
2625
        }
2626
    }
2627
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc