• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Neptune-Crypto / neptune-core / 14784987429

01 May 2025 10:53PM UTC coverage: 74.963% (-0.1%) from 75.081%
14784987429

push

github

dan-da
chore: store db backups in migration_backups dir

purpose: stores db backups (prior to schema migration) in a
sub-directory "migration_backups" instead of in the same directory
as the in-use databases.

This directory is intended to be shared by all DBs that we use.

It is *not* intended to be shared by backups for other purposes.

Some refactoring of WalletConfiguration and DataDirectory was
required to do this cleanly, especially as this directory is not
specific to the wallet database.

36 of 37 new or added lines in 3 files covered. (97.3%)

201 existing lines in 8 files now uncovered.

26129 of 34856 relevant lines covered (74.96%)

294449.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.1
/src/main_loop.rs
1
pub mod proof_upgrader;
2

3
use std::collections::HashMap;
4
use std::net::SocketAddr;
5
use std::sync::Arc;
6
use std::time::Duration;
7
use std::time::SystemTime;
8

9
use anyhow::Result;
10
use itertools::Itertools;
11
use proof_upgrader::get_upgrade_task_from_mempool;
12
use proof_upgrader::UpdateMutatorSetDataJob;
13
use proof_upgrader::UpgradeJob;
14
use rand::prelude::IteratorRandom;
15
use rand::seq::IndexedRandom;
16
use tokio::net::TcpListener;
17
use tokio::select;
18
use tokio::signal;
19
use tokio::sync::broadcast;
20
use tokio::sync::mpsc;
21
use tokio::task::JoinHandle;
22
use tokio::time;
23
use tokio::time::Instant;
24
use tokio::time::MissedTickBehavior;
25
use tracing::debug;
26
use tracing::error;
27
use tracing::info;
28
use tracing::trace;
29
use tracing::warn;
30

31
use crate::connect_to_peers::answer_peer;
32
use crate::connect_to_peers::call_peer;
33
use crate::job_queue::triton_vm::vm_job_queue;
34
use crate::job_queue::triton_vm::TritonVmJobPriority;
35
use crate::job_queue::triton_vm::TritonVmJobQueue;
36
use crate::macros::fn_name;
37
use crate::macros::log_slow_scope;
38
use crate::models::blockchain::block::block_header::BlockHeader;
39
use crate::models::blockchain::block::block_height::BlockHeight;
40
use crate::models::blockchain::block::difficulty_control::ProofOfWork;
41
use crate::models::blockchain::block::Block;
42
use crate::models::blockchain::transaction::Transaction;
43
use crate::models::blockchain::transaction::TransactionProof;
44
use crate::models::channel::MainToMiner;
45
use crate::models::channel::MainToPeerTask;
46
use crate::models::channel::MainToPeerTaskBatchBlockRequest;
47
use crate::models::channel::MinerToMain;
48
use crate::models::channel::PeerTaskToMain;
49
use crate::models::channel::RPCServerToMain;
50
use crate::models::peer::handshake_data::HandshakeData;
51
use crate::models::peer::peer_info::PeerInfo;
52
use crate::models::peer::transaction_notification::TransactionNotification;
53
use crate::models::peer::PeerSynchronizationState;
54
use crate::models::proof_abstractions::tasm::program::TritonVmProofJobOptions;
55
use crate::models::state::block_proposal::BlockProposal;
56
use crate::models::state::mempool::TransactionOrigin;
57
use crate::models::state::networking_state::SyncAnchor;
58
use crate::models::state::tx_proving_capability::TxProvingCapability;
59
use crate::models::state::GlobalState;
60
use crate::models::state::GlobalStateLock;
61
use crate::SUCCESS_EXIT_CODE;
62

63
const PEER_DISCOVERY_INTERVAL: Duration = Duration::from_secs(2 * 60);
64
const SYNC_REQUEST_INTERVAL: Duration = Duration::from_secs(3);
65
const MEMPOOL_PRUNE_INTERVAL: Duration = Duration::from_secs(30 * 60);
66
const MP_RESYNC_INTERVAL: Duration = Duration::from_secs(59);
67
const EXPECTED_UTXOS_PRUNE_INTERVAL: Duration = Duration::from_secs(19 * 60);
68

69
/// Interval for when transaction-upgrade checker is run. Note that this does
70
/// *not* define how often a transaction-proof upgrade is actually performed.
71
/// Only how often we check if we're ready to perform an upgrade.
72
const TRANSACTION_UPGRADE_CHECK_INTERVAL: Duration = Duration::from_secs(60);
73

74
const SANCTION_PEER_TIMEOUT_FACTOR: u64 = 40;
75

76
/// Number of seconds within which an individual peer is expected to respond
77
/// to a synchronization request.
78
const INDIVIDUAL_PEER_SYNCHRONIZATION_TIMEOUT: Duration =
79
    Duration::from_secs(SYNC_REQUEST_INTERVAL.as_secs() * SANCTION_PEER_TIMEOUT_FACTOR);
80

81
/// Number of seconds that a synchronization may run without any progress.
82
const GLOBAL_SYNCHRONIZATION_TIMEOUT: Duration =
83
    Duration::from_secs(INDIVIDUAL_PEER_SYNCHRONIZATION_TIMEOUT.as_secs() * 4);
84

85
const POTENTIAL_PEER_MAX_COUNT_AS_A_FACTOR_OF_MAX_PEERS: usize = 20;
86
pub(crate) const MAX_NUM_DIGESTS_IN_BATCH_REQUEST: usize = 200;
87
const TX_UPDATER_CHANNEL_CAPACITY: usize = 1;
88

89
/// Wraps a transmission channel.
90
///
91
/// To be used for the transmission channel to the miner, because
92
///  a) the miner might not exist in which case there would be no-one to empty
93
///     the channel; and
94
///  b) contrary to other channels, transmission failures here are not critical.
95
#[derive(Debug)]
96
struct MainToMinerChannel(Option<mpsc::Sender<MainToMiner>>);
97

98
impl MainToMinerChannel {
99
    /// Send a message to the miner task (if any).
100
    fn send(&self, message: MainToMiner) {
53✔
101
        // Do no use the async `send` function because it blocks until there
102
        // is spare capacity on the channel. Messages to the miner are not
103
        // critical so if there is no capacity left, just log an error
104
        // message.
105
        if let Some(channel) = &self.0 {
53✔
106
            if let Err(e) = channel.try_send(message) {
×
107
                error!("Failed to send pause message to miner thread:\n{e}");
×
108
            }
×
109
        }
53✔
110
    }
53✔
111
}
112

113
/// MainLoop is the immutable part of the input for the main loop function
114
#[derive(Debug)]
115
pub struct MainLoopHandler {
116
    incoming_peer_listener: TcpListener,
117
    global_state_lock: GlobalStateLock,
118

119
    // note: broadcast::Sender::send() does not block
120
    main_to_peer_broadcast_tx: broadcast::Sender<MainToPeerTask>,
121

122
    // note: mpsc::Sender::send() blocks if channel full.
123
    // locks should not be held across it.
124
    peer_task_to_main_tx: mpsc::Sender<PeerTaskToMain>,
125

126
    // note: MainToMinerChannel::send() does not block.  might log error.
127
    main_to_miner_tx: MainToMinerChannel,
128

129
    peer_task_to_main_rx: mpsc::Receiver<PeerTaskToMain>,
130
    miner_to_main_rx: mpsc::Receiver<MinerToMain>,
131
    rpc_server_to_main_rx: mpsc::Receiver<RPCServerToMain>,
132
    task_handles: Vec<JoinHandle<()>>,
133

134
    #[cfg(test)]
135
    mock_now: Option<SystemTime>,
136
}
137

138
/// The mutable part of the main loop function
139
struct MutableMainLoopState {
140
    /// Information used to batch-download blocks.
141
    sync_state: SyncState,
142

143
    /// Information about potential peers for new connections.
144
    potential_peers: PotentialPeersState,
145

146
    /// A list of join-handles to spawned tasks.
147
    task_handles: Vec<JoinHandle<()>>,
148

149
    /// A join-handle to a task performing transaction-proof upgrades.
150
    proof_upgrader_task: Option<JoinHandle<()>>,
151

152
    /// A join-handle to a task running the update of the mempool transactions.
153
    update_mempool_txs_handle: Option<JoinHandle<()>>,
154

155
    /// A channel that the task updating mempool transactions can use to
156
    /// communicate its result.
157
    update_mempool_receiver: mpsc::Receiver<Vec<Transaction>>,
158
}
159

160
impl MutableMainLoopState {
161
    fn new(task_handles: Vec<JoinHandle<()>>) -> Self {
14✔
162
        let (_dummy_sender, dummy_receiver) =
14✔
163
            mpsc::channel::<Vec<Transaction>>(TX_UPDATER_CHANNEL_CAPACITY);
14✔
164
        Self {
14✔
165
            sync_state: SyncState::default(),
14✔
166
            potential_peers: PotentialPeersState::default(),
14✔
167
            task_handles,
14✔
168
            proof_upgrader_task: None,
14✔
169
            update_mempool_txs_handle: None,
14✔
170
            update_mempool_receiver: dummy_receiver,
14✔
171
        }
14✔
172
    }
14✔
173
}
174

175
/// handles batch-downloading of blocks if we are more than n blocks behind
176
#[derive(Default, Debug)]
177
struct SyncState {
178
    peer_sync_states: HashMap<SocketAddr, PeerSynchronizationState>,
179
    last_sync_request: Option<(SystemTime, BlockHeight, SocketAddr)>,
180
}
181

182
impl SyncState {
183
    fn record_request(
1✔
184
        &mut self,
1✔
185
        requested_block_height: BlockHeight,
1✔
186
        peer: SocketAddr,
1✔
187
        now: SystemTime,
1✔
188
    ) {
1✔
189
        self.last_sync_request = Some((now, requested_block_height, peer));
1✔
190
    }
1✔
191

192
    /// Return a list of peers that have reported to be in possession of blocks
193
    /// with a PoW above a threshold.
194
    fn get_potential_peers_for_sync_request(&self, threshold_pow: ProofOfWork) -> Vec<SocketAddr> {
2✔
195
        self.peer_sync_states
2✔
196
            .iter()
2✔
197
            .filter(|(_sa, sync_state)| sync_state.claimed_max_pow > threshold_pow)
2✔
198
            .map(|(sa, _)| *sa)
2✔
199
            .collect()
2✔
200
    }
2✔
201

202
    /// Determine if a peer should be sanctioned for failing to respond to a
203
    /// synchronization request fast enough. Also determine if a new request
204
    /// should be made or the previous one should be allowed to run for longer.
205
    ///
206
    /// Returns (peer to be sanctioned, attempt new request).
207
    fn get_status_of_last_request(
1✔
208
        &self,
1✔
209
        current_block_height: BlockHeight,
1✔
210
        now: SystemTime,
1✔
211
    ) -> (Option<SocketAddr>, bool) {
1✔
212
        // A peer is sanctioned if no answer has been received after N times the sync request
1✔
213
        // interval.
1✔
214
        match self.last_sync_request {
1✔
215
            None => {
216
                // No sync request has been made since startup of program
217
                (None, true)
1✔
218
            }
219
            Some((req_time, requested_height, peer_sa)) => {
×
220
                if requested_height < current_block_height {
×
221
                    // The last sync request updated the state
222
                    (None, true)
×
223
                } else if req_time + INDIVIDUAL_PEER_SYNCHRONIZATION_TIMEOUT < now {
×
224
                    // The last sync request was not answered, sanction peer
225
                    // and make a new sync request.
226
                    (Some(peer_sa), true)
×
227
                } else {
228
                    // The last sync request has not yet been answered. But it has
229
                    // not timed out yet.
230
                    (None, false)
×
231
                }
232
            }
233
        }
234
    }
1✔
235
}
236

237
/// holds information about a potential peer in the process of peer discovery
238
struct PotentialPeerInfo {
239
    _reported: SystemTime,
240
    _reported_by: SocketAddr,
241
    instance_id: u128,
242
    distance: u8,
243
}
244

245
impl PotentialPeerInfo {
246
    fn new(reported_by: SocketAddr, instance_id: u128, distance: u8, now: SystemTime) -> Self {
3✔
247
        Self {
3✔
248
            _reported: now,
3✔
249
            _reported_by: reported_by,
3✔
250
            instance_id,
3✔
251
            distance,
3✔
252
        }
3✔
253
    }
3✔
254
}
255

256
/// holds information about a set of potential peers in the process of peer discovery
257
struct PotentialPeersState {
258
    potential_peers: HashMap<SocketAddr, PotentialPeerInfo>,
259
}
260

261
impl PotentialPeersState {
262
    fn default() -> Self {
14✔
263
        Self {
14✔
264
            potential_peers: HashMap::new(),
14✔
265
        }
14✔
266
    }
14✔
267

268
    fn add(
3✔
269
        &mut self,
3✔
270
        reported_by: SocketAddr,
3✔
271
        potential_peer: (SocketAddr, u128),
3✔
272
        max_peers: usize,
3✔
273
        distance: u8,
3✔
274
        now: SystemTime,
3✔
275
    ) {
3✔
276
        let potential_peer_socket_address = potential_peer.0;
3✔
277
        let potential_peer_instance_id = potential_peer.1;
3✔
278

3✔
279
        // This check *should* make it likely that a potential peer is always
3✔
280
        // registered with the lowest observed distance.
3✔
281
        if self
3✔
282
            .potential_peers
3✔
283
            .contains_key(&potential_peer_socket_address)
3✔
284
        {
285
            return;
×
286
        }
3✔
287

3✔
288
        // If this data structure is full, remove a random entry. Then add this.
3✔
289
        if self.potential_peers.len()
3✔
290
            > max_peers * POTENTIAL_PEER_MAX_COUNT_AS_A_FACTOR_OF_MAX_PEERS
3✔
291
        {
×
292
            let mut rng = rand::rng();
×
293
            let random_potential_peer = self
×
294
                .potential_peers
×
295
                .keys()
×
296
                .choose(&mut rng)
×
297
                .unwrap()
×
298
                .to_owned();
×
299
            self.potential_peers.remove(&random_potential_peer);
×
300
        }
3✔
301

302
        let insert_value =
3✔
303
            PotentialPeerInfo::new(reported_by, potential_peer_instance_id, distance, now);
3✔
304
        self.potential_peers
3✔
305
            .insert(potential_peer_socket_address, insert_value);
3✔
306
    }
3✔
307

308
    /// Return a peer from the potential peer list that we aren't connected to
309
    /// and  that isn't our own address.
310
    ///
311
    /// Favors peers with a high distance and with IPs that we are not already
312
    /// connected to.
313
    ///
314
    /// Returns (socket address, peer distance)
315
    fn get_candidate(
1✔
316
        &self,
1✔
317
        connected_clients: &[PeerInfo],
1✔
318
        own_instance_id: u128,
1✔
319
    ) -> Option<(SocketAddr, u8)> {
1✔
320
        let peers_instance_ids: Vec<u128> =
1✔
321
            connected_clients.iter().map(|x| x.instance_id()).collect();
2✔
322

1✔
323
        // Only pick those peers that report a listening port
1✔
324
        let peers_listen_addresses: Vec<SocketAddr> = connected_clients
1✔
325
            .iter()
1✔
326
            .filter_map(|x| x.listen_address())
2✔
327
            .collect();
1✔
328

1✔
329
        // Find the appropriate candidates
1✔
330
        let candidates = self
1✔
331
            .potential_peers
1✔
332
            .iter()
1✔
333
            // Prevent connecting to self. Note that we *only* use instance ID to prevent this,
1✔
334
            // meaning this will allow multiple nodes e.g. running on the same computer to form
1✔
335
            // a complete graph.
1✔
336
            .filter(|pp| pp.1.instance_id != own_instance_id)
1✔
337
            // Prevent connecting to peer we already are connected to
338
            .filter(|potential_peer| !peers_instance_ids.contains(&potential_peer.1.instance_id))
1✔
339
            .filter(|potential_peer| !peers_listen_addresses.contains(potential_peer.0))
1✔
340
            .collect::<Vec<_>>();
1✔
341

1✔
342
        // Prefer candidates with IPs that we are not already connected to but
1✔
343
        // connect to repeated IPs in case we don't have other options, as
1✔
344
        // repeated IPs may just be multiple machines on the same NAT'ed IPv4
1✔
345
        // address.
1✔
346
        let mut connected_ips = peers_listen_addresses.into_iter().map(|x| x.ip());
1✔
347
        let candidates = if candidates
1✔
348
            .iter()
1✔
349
            .any(|candidate| !connected_ips.contains(&candidate.0.ip()))
1✔
350
        {
351
            candidates
×
352
                .into_iter()
×
353
                .filter(|candidate| !connected_ips.contains(&candidate.0.ip()))
×
354
                .collect()
×
355
        } else {
356
            candidates
1✔
357
        };
358

359
        // Get the candidate list with the highest distance
360
        let max_distance_candidates = candidates.iter().max_by_key(|pp| pp.1.distance);
1✔
361

362
        // Pick a random candidate from the appropriate candidates
363
        let mut rng = rand::rng();
1✔
364
        max_distance_candidates
1✔
365
            .iter()
1✔
366
            .choose(&mut rng)
1✔
367
            .map(|x| (x.0.to_owned(), x.1.distance))
1✔
368
    }
1✔
369
}
370

371
/// Return a boolean indicating if synchronization mode should be left
372
fn stay_in_sync_mode(
×
373
    own_block_tip_header: &BlockHeader,
×
374
    sync_state: &SyncState,
×
375
    sync_mode_threshold: usize,
×
376
) -> bool {
×
377
    let max_claimed_pow = sync_state
×
378
        .peer_sync_states
×
379
        .values()
×
380
        .max_by_key(|x| x.claimed_max_pow);
×
381
    match max_claimed_pow {
×
382
        None => false, // No peer have passed the sync challenge phase.
×
383

384
        // Synchronization is left when the remaining number of block is half of what has
385
        // been indicated to fit into RAM
386
        Some(max_claim) => {
×
387
            own_block_tip_header.cumulative_proof_of_work < max_claim.claimed_max_pow
×
388
                && max_claim.claimed_max_height - own_block_tip_header.height
×
389
                    > sync_mode_threshold as i128 / 2
×
390
        }
391
    }
392
}
×
393

394
impl MainLoopHandler {
395
    // todo: find a way to avoid triggering lint
396
    #[allow(clippy::too_many_arguments)]
397
    pub(crate) fn new(
16✔
398
        incoming_peer_listener: TcpListener,
16✔
399
        global_state_lock: GlobalStateLock,
16✔
400
        main_to_peer_broadcast_tx: broadcast::Sender<MainToPeerTask>,
16✔
401
        peer_task_to_main_tx: mpsc::Sender<PeerTaskToMain>,
16✔
402
        main_to_miner_tx: mpsc::Sender<MainToMiner>,
16✔
403

16✔
404
        peer_task_to_main_rx: mpsc::Receiver<PeerTaskToMain>,
16✔
405
        miner_to_main_rx: mpsc::Receiver<MinerToMain>,
16✔
406
        rpc_server_to_main_rx: mpsc::Receiver<RPCServerToMain>,
16✔
407
        task_handles: Vec<JoinHandle<()>>,
16✔
408
    ) -> Self {
16✔
409
        let maybe_main_to_miner_tx = if global_state_lock.cli().mine() {
16✔
410
            Some(main_to_miner_tx)
×
411
        } else {
412
            None
16✔
413
        };
414
        Self {
16✔
415
            incoming_peer_listener,
16✔
416
            global_state_lock,
16✔
417
            main_to_miner_tx: MainToMinerChannel(maybe_main_to_miner_tx),
16✔
418
            main_to_peer_broadcast_tx,
16✔
419
            peer_task_to_main_tx,
16✔
420

16✔
421
            peer_task_to_main_rx,
16✔
422
            miner_to_main_rx,
16✔
423
            rpc_server_to_main_rx,
16✔
424
            task_handles,
16✔
425

16✔
426
            #[cfg(test)]
16✔
427
            mock_now: None,
16✔
428
        }
16✔
429
    }
16✔
430

431
    pub fn global_state_lock(&mut self) -> GlobalStateLock {
8✔
432
        self.global_state_lock.clone()
8✔
433
    }
8✔
434

435
    /// Allows for mocked timestamps such that time dependencies may be tested.
436
    #[cfg(test)]
437
    fn with_mocked_time(mut self, mocked_time: SystemTime) -> Self {
3✔
438
        self.mock_now = Some(mocked_time);
3✔
439
        self
3✔
440
    }
3✔
441

442
    fn now(&self) -> SystemTime {
18✔
443
        #[cfg(not(test))]
18✔
444
        {
18✔
445
            SystemTime::now()
18✔
446
        }
18✔
447
        #[cfg(test)]
18✔
448
        {
18✔
449
            self.mock_now.unwrap_or(SystemTime::now())
18✔
450
        }
18✔
451
    }
18✔
452

453
    /// Run a list of Triton VM prover jobs that update the mutator set state
454
    /// for transactions.
455
    ///
456
    /// Sends the result back through the provided channel.
457
    async fn update_mempool_jobs(
31✔
458
        update_jobs: Vec<UpdateMutatorSetDataJob>,
31✔
459
        job_queue: Arc<TritonVmJobQueue>,
31✔
460
        transaction_update_sender: mpsc::Sender<Vec<Transaction>>,
31✔
461
        proof_job_options: TritonVmProofJobOptions,
31✔
462
    ) {
31✔
463
        debug!(
31✔
464
            "Attempting to update transaction proofs of {} transactions",
×
465
            update_jobs.len()
×
466
        );
467
        let mut result = vec![];
31✔
468
        for job in update_jobs {
31✔
469
            // Jobs for updating txs in the mempool have highest priority since
470
            // they block the composer from continuing.
471
            // TODO: Handle errors better here.
472
            let job_result = job
×
473
                .upgrade(job_queue.clone(), proof_job_options.clone())
×
474
                .await
×
475
                .unwrap();
×
476
            result.push(job_result);
×
477
        }
478

479
        transaction_update_sender
31✔
480
            .send(result)
31✔
481
            .await
31✔
482
            .expect("Receiver for updated txs in main loop must still exist");
31✔
483
    }
31✔
484

485
    /// Handles a list of transactions whose proof has been updated with new
486
    /// mutator set data.
487
    async fn handle_updated_mempool_txs(&mut self, updated_txs: Vec<Transaction>) {
30✔
488
        // Update mempool with updated transactions
489
        {
490
            let mut state = self.global_state_lock.lock_guard_mut().await;
30✔
491
            for updated in &updated_txs {
29✔
492
                let txid = updated.kernel.txid();
×
493
                if let Some(tx) = state.mempool.get_mut(txid) {
×
494
                    *tx = updated.to_owned();
×
495
                } else {
×
496
                    warn!("Updated transaction which is no longer in mempool");
×
497
                }
498
            }
499
        }
500

501
        // Then notify all peers
502
        for updated in updated_txs {
29✔
503
            let pmsg = MainToPeerTask::TransactionNotification((&updated).try_into().unwrap());
×
504
            self.main_to_peer_broadcast(pmsg);
×
505
        }
×
506

507
        // Tell miner that it can now start composing next block.
508
        self.main_to_miner_tx.send(MainToMiner::Continue);
29✔
509
    }
29✔
510

511
    /// Process a block whose PoW solution was solved by this client (or an
512
    /// external program) and has not been seen by the rest of the network yet.
513
    ///
514
    /// Shares block with all connected peers, updates own state, and updates
515
    /// any mempool transactions to be valid under this new block.
516
    ///
517
    /// Locking:
518
    ///  * acquires `global_state_lock` for read and write
519
    async fn handle_self_guessed_block(
1✔
520
        &mut self,
1✔
521
        main_loop_state: &mut MutableMainLoopState,
1✔
522
        new_block: Box<Block>,
1✔
523
    ) -> Result<()> {
20✔
524
        let new_block_hash = new_block.hash();
20✔
525

20✔
526
        // clone block in advance, so lock is held less time.
20✔
527
        // note that this clone is wasted if block is not more canonical
20✔
528
        // but that should be the less common case.
20✔
529
        //
20✔
530
        // perf: in the future we should use Arc systematically to avoid these
20✔
531
        // expensive block clones.
20✔
532
        let new_block_clone = (*new_block).clone();
20✔
533

534
        // important!  the is_canonical check and set_new_tip() need to be an
535
        // atomic operation, ie called within the same write-lock acquisition.
536
        //
537
        // this avoids a race condition where block B and C are both more
538
        // canonical than A, but B is more than C, yet C replaces B because it
539
        // was only checked against A.
540
        //
541
        // we release the lock as quickly as possible.
542
        let update_jobs = {
20✔
543
            let mut gsm = self.global_state_lock.lock_guard_mut().await;
20✔
544

545
            // bail out if incoming block is not more canonical than present tip.
546
            if !gsm.incoming_block_is_more_canonical(&new_block) {
20✔
547
                drop(gsm); // drop lock right away before send.
×
548
                warn!("Got new block from miner that was not child of tip. Discarding.");
×
549
                self.main_to_miner_tx.send(MainToMiner::Continue);
×
550
                return Ok(());
×
551
            }
20✔
552

20✔
553
            // set new tip and obtain list of update-jobs to perform.
20✔
554
            // the jobs update mutator-set data for:
20✔
555
            //   all tx if we are in composer role.
20✔
556
            //   else self-owned tx.
20✔
557
            // see: Mempool::update_with_block_and_predecessor()
20✔
558
            gsm.set_new_tip(new_block_clone).await?
20✔
559
        }; // write-lock is dropped here.
560

561
        // Share block with peers right away.
562
        let pmsg = MainToPeerTask::Block(new_block);
20✔
563
        self.main_to_peer_broadcast(pmsg);
20✔
564

20✔
565
        info!("Locally-mined block is new tip: {}", new_block_hash);
20✔
566
        info!("broadcasting new block to peers");
20✔
567

568
        self.spawn_mempool_txs_update_job(main_loop_state, update_jobs);
20✔
569

20✔
570
        Ok(())
20✔
571
    }
20✔
572

573
    /// Locking:
574
    ///   * acquires `global_state_lock` for write
575
    async fn handle_miner_task_message(
×
576
        &mut self,
×
577
        msg: MinerToMain,
×
578
        main_loop_state: &mut MutableMainLoopState,
×
579
    ) -> Result<Option<i32>> {
×
580
        match msg {
×
581
            MinerToMain::NewBlockFound(new_block_info) => {
×
582
                log_slow_scope!(fn_name!() + "::MinerToMain::NewBlockFound");
×
583

×
584
                let new_block = new_block_info.block;
×
585

×
586
                info!("Miner found new block: {}", new_block.kernel.header.height);
×
587
                self.handle_self_guessed_block(main_loop_state, new_block)
×
588
                    .await?;
×
589
            }
590
            MinerToMain::BlockProposal(boxed_proposal) => {
×
591
                let (block, expected_utxos) = *boxed_proposal;
×
592

593
                // If block proposal from miner does not build on current tip,
594
                // don't broadcast it. This check covers reorgs as well.
595
                let current_tip = self
×
596
                    .global_state_lock
×
597
                    .lock_guard()
×
598
                    .await
×
599
                    .chain
600
                    .light_state()
×
601
                    .clone();
×
602
                if block.header().prev_block_digest != current_tip.hash() {
×
603
                    warn!(
×
604
                        "Got block proposal from miner that does not build on current tip. \
×
605
                           Rejecting. If this happens a lot, then maybe this machine is too \
×
606
                           slow to competitively compose blocks. Consider running the client only \
×
607
                           with the guesser flag set and not the compose flag."
×
608
                    );
609
                    self.main_to_miner_tx.send(MainToMiner::Continue);
×
610
                    return Ok(None);
×
611
                }
×
612

×
613
                // Ensure proposal validity before sharing
×
614
                if !block
×
615
                    .is_valid(
×
616
                        &current_tip,
×
617
                        block.header().timestamp,
×
618
                        self.global_state_lock.cli().network,
×
619
                    )
×
620
                    .await
×
621
                {
622
                    error!("Own block proposal invalid. This should not happen.");
×
623
                    self.main_to_miner_tx.send(MainToMiner::Continue);
×
624
                    return Ok(None);
×
625
                }
×
626

×
627
                if !self.global_state_lock.cli().secret_compositions {
×
628
                    let pmsg = MainToPeerTask::BlockProposalNotification((&block).into());
×
629
                    self.main_to_peer_broadcast(pmsg);
×
630
                }
×
631

632
                {
633
                    // Use block proposal and add expected UTXOs from this
634
                    // proposal.
635
                    let mut state = self.global_state_lock.lock_guard_mut().await;
×
636
                    state.mining_state.block_proposal =
×
637
                        BlockProposal::own_proposal(block.clone(), expected_utxos.clone());
×
638
                    state.wallet_state.add_expected_utxos(expected_utxos).await;
×
639
                }
640

641
                // Indicate to miner that block proposal was successfully
642
                // received by main-loop.
643
                self.main_to_miner_tx.send(MainToMiner::Continue);
×
644
            }
645
            MinerToMain::Shutdown(exit_code) => {
×
646
                return Ok(Some(exit_code));
×
647
            }
648
        }
649

650
        Ok(None)
×
651
    }
×
652

653
    /// Locking:
654
    ///   * acquires `global_state_lock` for write
655
    async fn handle_peer_task_message(
19✔
656
        &mut self,
19✔
657
        msg: PeerTaskToMain,
19✔
658
        main_loop_state: &mut MutableMainLoopState,
19✔
659
    ) -> Result<()> {
19✔
660
        debug!("Received {} from a peer task", msg.get_type());
19✔
661
        let cli_args = self.global_state_lock.cli().clone();
19✔
662
        match msg {
19✔
663
            PeerTaskToMain::NewBlocks(blocks) => {
12✔
664
                log_slow_scope!(fn_name!() + "::PeerTaskToMain::NewBlocks");
12✔
665

12✔
666
                let last_block = blocks.last().unwrap().to_owned();
12✔
667
                let update_jobs = {
12✔
668
                    // The peer tasks also check this condition, if block is more canonical than current
669
                    // tip, but we have to check it again since the block update might have already been applied
670
                    // through a message from another peer (or from own miner).
671
                    let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
12✔
672
                    let new_canonical =
12✔
673
                        global_state_mut.incoming_block_is_more_canonical(&last_block);
12✔
674

12✔
675
                    if !new_canonical {
12✔
676
                        // The blocks are not canonical, but: if we are in sync
677
                        // mode and these blocks beat our current champion, then
678
                        // we store them anyway, without marking them as tip.
679
                        let Some(sync_anchor) = global_state_mut.net.sync_anchor.as_mut() else {
×
680
                            warn!(
×
681
                                "Blocks were not new, and we're not syncing. Not storing blocks."
×
682
                            );
683
                            return Ok(());
×
684
                        };
685
                        if sync_anchor
×
686
                            .champion
×
687
                            .is_some_and(|(height, _)| height >= last_block.header().height)
×
688
                        {
689
                            warn!("Repeated blocks received in sync mode, not storing");
×
690
                            return Ok(());
×
691
                        }
×
692

×
693
                        sync_anchor.catch_up(last_block.header().height, last_block.hash());
×
694

695
                        for block in blocks {
×
696
                            global_state_mut.store_block_not_tip(block).await?;
×
697
                        }
698

699
                        return Ok(());
×
700
                    }
12✔
701

12✔
702
                    info!(
12✔
703
                        "Last block from peer is new canonical tip: {}; height: {}",
×
704
                        last_block.hash(),
×
705
                        last_block.header().height
×
706
                    );
707

708
                    // Ask miner to stop work until state update is completed
709
                    self.main_to_miner_tx.send(MainToMiner::WaitForContinue);
12✔
710

12✔
711
                    // Get out of sync mode if needed
12✔
712
                    if global_state_mut.net.sync_anchor.is_some() {
12✔
713
                        let stay_in_sync_mode = stay_in_sync_mode(
×
714
                            &last_block.kernel.header,
×
715
                            &main_loop_state.sync_state,
×
716
                            cli_args.sync_mode_threshold,
×
717
                        );
×
718
                        if !stay_in_sync_mode {
×
719
                            info!("Exiting sync mode");
×
720
                            global_state_mut.net.sync_anchor = None;
×
721
                            self.main_to_miner_tx.send(MainToMiner::StopSyncing);
×
722
                        }
×
723
                    }
12✔
724

725
                    let mut update_jobs: Vec<UpdateMutatorSetDataJob> = vec![];
12✔
726
                    for new_block in blocks {
24✔
727
                        debug!(
12✔
728
                            "Storing block {} in database. Height: {}, Mined: {}",
×
729
                            new_block.hash(),
×
730
                            new_block.kernel.header.height,
×
731
                            new_block.kernel.header.timestamp.standard_format()
×
732
                        );
733

734
                        // Potential race condition here.
735
                        // What if last block is new and canonical, but first
736
                        // block is already known then we'll store the same block
737
                        // twice. That should be OK though, as the appropriate
738
                        // database entries are simply overwritten with the new
739
                        // block info. See the
740
                        // [GlobalState::tests::setting_same_tip_twice_is_allowed]
741
                        // test for a test of this phenomenon.
742

743
                        let update_jobs_ = global_state_mut.set_new_tip(new_block).await?;
12✔
744
                        update_jobs.extend(update_jobs_);
12✔
745
                    }
746

747
                    update_jobs
12✔
748
                };
12✔
749

12✔
750
                // Inform all peers about new block
12✔
751
                let pmsg = MainToPeerTask::Block(Box::new(last_block.clone()));
12✔
752
                self.main_to_peer_broadcast(pmsg);
12✔
753

12✔
754
                // Spawn task to handle mempool tx-updating after new blocks.
12✔
755
                // TODO: Do clever trick to collapse all jobs relating to the same transaction,
12✔
756
                //       identified by transaction-ID, into *one* update job.
12✔
757
                self.spawn_mempool_txs_update_job(main_loop_state, update_jobs);
12✔
758

12✔
759
                // Inform miner about new block.
12✔
760
                self.main_to_miner_tx.send(MainToMiner::NewBlock);
12✔
761
            }
762
            PeerTaskToMain::AddPeerMaxBlockHeight {
763
                peer_address,
×
764
                claimed_height,
×
765
                claimed_cumulative_pow,
×
766
                claimed_block_mmra,
×
767
            } => {
×
768
                log_slow_scope!(fn_name!() + "::PeerTaskToMain::AddPeerMaxBlockHeight");
×
769

×
770
                let claimed_state =
×
771
                    PeerSynchronizationState::new(claimed_height, claimed_cumulative_pow);
×
772
                main_loop_state
×
773
                    .sync_state
×
774
                    .peer_sync_states
×
775
                    .insert(peer_address, claimed_state);
×
776

777
                // Check if synchronization mode should be activated.
778
                // Synchronization mode is entered if accumulated PoW exceeds
779
                // our tip and if the height difference is positive and beyond
780
                // a threshold value.
781
                let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
×
782
                if global_state_mut.sync_mode_criterion(claimed_height, claimed_cumulative_pow)
×
783
                    && global_state_mut
×
784
                        .net
×
785
                        .sync_anchor
×
786
                        .as_ref()
×
787
                        .is_none_or(|sa| sa.cumulative_proof_of_work < claimed_cumulative_pow)
×
788
                {
789
                    info!(
×
790
                        "Entering synchronization mode due to peer {} indicating tip height {}; cumulative pow: {:?}",
×
791
                        peer_address, claimed_height, claimed_cumulative_pow
792
                    );
793
                    global_state_mut.net.sync_anchor =
×
794
                        Some(SyncAnchor::new(claimed_cumulative_pow, claimed_block_mmra));
×
795
                    self.main_to_miner_tx.send(MainToMiner::StartSyncing);
×
796
                }
×
797
            }
798
            PeerTaskToMain::RemovePeerMaxBlockHeight(socket_addr) => {
×
799
                log_slow_scope!(fn_name!() + "::PeerTaskToMain::RemovePeerMaxBlockHeight");
×
800

×
801
                debug!(
×
802
                    "Removing max block height from sync data structure for peer {}",
×
803
                    socket_addr
804
                );
805
                main_loop_state
×
806
                    .sync_state
×
807
                    .peer_sync_states
×
808
                    .remove(&socket_addr);
×
809

810
                // Get out of sync mode if needed.
811
                let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
×
812

813
                if global_state_mut.net.sync_anchor.is_some() {
×
814
                    let stay_in_sync_mode = stay_in_sync_mode(
×
815
                        global_state_mut.chain.light_state().header(),
×
816
                        &main_loop_state.sync_state,
×
817
                        cli_args.sync_mode_threshold,
×
818
                    );
×
819
                    if !stay_in_sync_mode {
×
820
                        info!("Exiting sync mode");
×
821
                        global_state_mut.net.sync_anchor = None;
×
822
                    }
×
823
                }
×
824
            }
825
            PeerTaskToMain::PeerDiscoveryAnswer((pot_peers, reported_by, distance)) => {
3✔
826
                log_slow_scope!(fn_name!() + "::PeerTaskToMain::PeerDiscoveryAnswer");
3✔
827

3✔
828
                let max_peers = self.global_state_lock.cli().max_num_peers;
3✔
829
                for pot_peer in pot_peers {
6✔
830
                    main_loop_state.potential_peers.add(
3✔
831
                        reported_by,
3✔
832
                        pot_peer,
3✔
833
                        max_peers,
3✔
834
                        distance,
3✔
835
                        self.now(),
3✔
836
                    );
3✔
837
                }
3✔
838
            }
839
            PeerTaskToMain::Transaction(pt2m_transaction) => {
3✔
840
                log_slow_scope!(fn_name!() + "::PeerTaskToMain::Transaction");
3✔
841

3✔
842
                debug!(
3✔
843
                    "`peer_loop` received following transaction from peer. {} inputs, {} outputs. Synced to mutator set hash: {}",
×
844
                    pt2m_transaction.transaction.kernel.inputs.len(),
×
845
                    pt2m_transaction.transaction.kernel.outputs.len(),
×
846
                    pt2m_transaction.transaction.kernel.mutator_set_hash
×
847
                );
848

849
                {
850
                    let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
3✔
851
                    if pt2m_transaction.confirmable_for_block
3✔
852
                        != global_state_mut.chain.light_state().hash()
3✔
853
                    {
854
                        warn!("main loop got unmined transaction with bad mutator set data, discarding transaction");
×
855
                        return Ok(());
×
856
                    }
3✔
857

3✔
858
                    // Insert into mempool
3✔
859
                    global_state_mut
3✔
860
                        .mempool_insert(
3✔
861
                            pt2m_transaction.transaction.to_owned(),
3✔
862
                            TransactionOrigin::Foreign,
3✔
863
                        )
3✔
864
                        .await;
3✔
865
                }
866

867
                // send notification to peers
868
                let transaction_notification: TransactionNotification =
3✔
869
                    (&pt2m_transaction.transaction).try_into()?;
3✔
870

871
                let pmsg = MainToPeerTask::TransactionNotification(transaction_notification);
3✔
872
                self.main_to_peer_broadcast(pmsg);
3✔
873
            }
874
            PeerTaskToMain::BlockProposal(block) => {
×
875
                log_slow_scope!(fn_name!() + "::PeerTaskToMain::BlockProposal");
×
876

×
877
                debug!("main loop received block proposal from peer loop");
×
878

879
                // Due to race-conditions, we need to verify that this
880
                // block proposal is still the immediate child of tip. If it is,
881
                // and it has a higher guesser fee than what we're currently
882
                // working on, then we switch to this, and notify the miner to
883
                // mine on this new block. We don't need to verify the block's
884
                // validity, since that was done in peer loop.
885
                // To ensure atomicity, a write-lock must be held over global
886
                // state while we check if this proposal is favorable.
887
                {
888
                    info!("Received new favorable block proposal for mining operation.");
×
889
                    let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
×
890
                    let verdict = global_state_mut.favor_incoming_block_proposal(
×
891
                        block.header().height,
×
892
                        block.total_guesser_reward(),
×
893
                    );
×
894
                    if let Err(reject_reason) = verdict {
×
895
                        warn!("main loop got unfavorable block proposal. Reason: {reject_reason}");
×
896
                        return Ok(());
×
897
                    }
×
898

×
899
                    global_state_mut.mining_state.block_proposal =
×
900
                        BlockProposal::foreign_proposal(*block.clone());
×
901
                }
×
902

×
903
                // Notify all peers of the block proposal we just accepted
×
904
                let pmsg = MainToPeerTask::BlockProposalNotification((&*block).into());
×
905
                self.main_to_peer_broadcast(pmsg);
×
906

×
907
                self.main_to_miner_tx.send(MainToMiner::NewBlockProposal);
×
908
            }
909
            PeerTaskToMain::DisconnectFromLongestLivedPeer => {
910
                let global_state = self.global_state_lock.lock_guard().await;
1✔
911

912
                // get all peers
913
                let all_peers = global_state.net.peer_map.iter();
1✔
914

1✔
915
                // filter out CLI peers
1✔
916
                let disconnect_candidates =
1✔
917
                    all_peers.filter(|p| !global_state.cli_peers().contains(p.0));
5✔
918

919
                // find the one with the oldest connection
920
                let longest_lived_peer = disconnect_candidates.min_by(
1✔
921
                    |(_socket_address_left, peer_info_left),
922
                     (_socket_address_right, peer_info_right)| {
4✔
923
                        peer_info_left
4✔
924
                            .connection_established()
4✔
925
                            .cmp(&peer_info_right.connection_established())
4✔
926
                    },
4✔
927
                );
928

929
                // tell to disconnect
930
                if let Some((peer_socket, _peer_info)) = longest_lived_peer {
1✔
931
                    let pmsg = MainToPeerTask::Disconnect(peer_socket.to_owned());
1✔
932
                    self.main_to_peer_broadcast(pmsg);
1✔
933
                }
1✔
934
            }
935
        }
936

937
        Ok(())
19✔
938
    }
19✔
939

940
    /// If necessary, disconnect from peers.
941
    ///
942
    /// While a reasonable effort is made to never have more connections than
943
    /// [`max_num_peers`](crate::config_models::cli_args::Args::max_num_peers),
944
    /// this is not guaranteed. For example, bootstrap nodes temporarily allow a
945
    /// surplus of incoming connections to provide their service more reliably.
946
    ///
947
    /// Never disconnects peers listed as CLI arguments.
948
    ///
949
    /// Locking:
950
    ///   * acquires `global_state_lock` for read
951
    async fn prune_peers(&self) -> Result<()> {
2✔
952
        // fetch all relevant info from global state; don't hold the lock
2✔
953
        let cli_args = self.global_state_lock.cli();
2✔
954
        let connected_peers = self
2✔
955
            .global_state_lock
2✔
956
            .lock_guard()
2✔
957
            .await
2✔
958
            .net
959
            .peer_map
960
            .values()
2✔
961
            .cloned()
2✔
962
            .collect_vec();
2✔
963

2✔
964
        let num_peers = connected_peers.len();
2✔
965
        let max_num_peers = cli_args.max_num_peers;
2✔
966
        if num_peers <= max_num_peers {
2✔
967
            debug!("No need to prune any peer connections.");
1✔
968
            return Ok(());
1✔
969
        }
1✔
970
        warn!("Connected to {num_peers} peers, which exceeds the maximum ({max_num_peers}).");
1✔
971

972
        // If all connections are outbound, it's OK to exceed the max.
973
        if connected_peers.iter().all(|p| p.connection_is_outbound()) {
9✔
974
            warn!("Not disconnecting from any peer because all connections are outbound.");
×
975
            return Ok(());
×
976
        }
1✔
977

1✔
978
        let num_peers_to_disconnect = num_peers - max_num_peers;
1✔
979
        let peers_to_disconnect = connected_peers
1✔
980
            .into_iter()
1✔
981
            .filter(|peer| !cli_args.peers.contains(&peer.connected_address()))
14✔
982
            .choose_multiple(&mut rand::rng(), num_peers_to_disconnect);
1✔
983
        match peers_to_disconnect.len() {
1✔
984
            0 => warn!("Not disconnecting from any peer because of manual override."),
×
985
            i => info!("Disconnecting from {i} peers."),
1✔
986
        }
987
        for peer in peers_to_disconnect {
5✔
988
            let pmsg = MainToPeerTask::Disconnect(peer.connected_address());
4✔
989
            self.main_to_peer_broadcast(pmsg);
4✔
990
        }
4✔
991

992
        Ok(())
1✔
993
    }
2✔
994

995
    /// If necessary, reconnect to the peers listed as CLI arguments.
996
    ///
997
    /// Locking:
998
    ///   * acquires `global_state_lock` for read
999
    async fn reconnect(&self, main_loop_state: &mut MutableMainLoopState) -> Result<()> {
×
1000
        let connected_peers = self
×
1001
            .global_state_lock
×
1002
            .lock_guard()
×
1003
            .await
×
1004
            .net
1005
            .peer_map
1006
            .keys()
×
1007
            .copied()
×
1008
            .collect_vec();
×
1009
        let peers_with_lost_connection = self
×
1010
            .global_state_lock
×
1011
            .cli()
×
1012
            .peers
×
1013
            .iter()
×
1014
            .filter(|peer| !connected_peers.contains(peer));
×
1015

1016
        // If no connection was lost, there's nothing to do.
1017
        if peers_with_lost_connection.clone().count() == 0 {
×
1018
            return Ok(());
×
1019
        }
×
1020

1021
        // Else, try to reconnect.
1022
        let own_handshake_data = self
×
1023
            .global_state_lock
×
1024
            .lock_guard()
×
1025
            .await
×
1026
            .get_own_handshakedata();
×
1027
        for &peer_with_lost_connection in peers_with_lost_connection {
×
1028
            // Disallow reconnection if peer is in bad standing
1029
            let peer_standing = self
×
1030
                .global_state_lock
×
1031
                .lock_guard()
×
1032
                .await
×
1033
                .net
1034
                .get_peer_standing_from_database(peer_with_lost_connection.ip())
×
1035
                .await;
×
1036
            if peer_standing.is_some_and(|standing| standing.is_bad()) {
×
1037
                info!("Not reconnecting to peer in bad standing: {peer_with_lost_connection}");
×
1038
                continue;
×
1039
            }
×
1040

×
1041
            info!("Attempting to reconnect to peer: {peer_with_lost_connection}");
×
1042
            let global_state_lock = self.global_state_lock.clone();
×
1043
            let main_to_peer_broadcast_rx = self.main_to_peer_broadcast_tx.subscribe();
×
1044
            let peer_task_to_main_tx = self.peer_task_to_main_tx.to_owned();
×
1045
            let own_handshake_data = own_handshake_data.clone();
×
1046
            let outgoing_connection_task = tokio::task::Builder::new()
×
1047
                .name("call_peer_wrapper_1")
×
1048
                .spawn(async move {
×
1049
                    call_peer(
×
1050
                        peer_with_lost_connection,
×
1051
                        global_state_lock,
×
1052
                        main_to_peer_broadcast_rx,
×
1053
                        peer_task_to_main_tx,
×
1054
                        own_handshake_data,
×
1055
                        1, // All CLI-specified peers have distance 1
×
1056
                    )
×
1057
                    .await;
×
1058
                })?;
×
1059
            main_loop_state.task_handles.push(outgoing_connection_task);
×
1060
            main_loop_state.task_handles.retain(|th| !th.is_finished());
×
1061
        }
1062

1063
        Ok(())
×
1064
    }
×
1065

1066
    /// Perform peer discovery.
1067
    ///
1068
    /// Peer discovery involves finding potential peers from connected peers
1069
    /// and attempts to establish a connection with one of them.
1070
    ///
1071
    /// Locking:
1072
    ///   * acquires `global_state_lock` for read
1073
    async fn discover_peers(&self, main_loop_state: &mut MutableMainLoopState) -> Result<()> {
2✔
1074
        // fetch all relevant info from global state, then release the lock
2✔
1075
        let cli_args = self.global_state_lock.cli();
2✔
1076
        let global_state = self.global_state_lock.lock_guard().await;
2✔
1077
        let connected_peers = global_state.net.peer_map.values().cloned().collect_vec();
2✔
1078
        let own_instance_id = global_state.net.instance_id;
2✔
1079
        let own_handshake_data = global_state.get_own_handshakedata();
2✔
1080
        drop(global_state);
2✔
1081

2✔
1082
        let num_peers = connected_peers.len();
2✔
1083
        let max_num_peers = cli_args.max_num_peers;
2✔
1084

2✔
1085
        // Don't make an outgoing connection if
2✔
1086
        // - the peer limit is reached (or exceeded), or
2✔
1087
        // - the peer limit is _almost_ reached; reserve the last slot for an
2✔
1088
        //   incoming connection.
2✔
1089
        if num_peers >= max_num_peers || num_peers > 2 && num_peers - 1 == max_num_peers {
2✔
1090
            info!("Connected to {num_peers} peers. The configured max is {max_num_peers} peers.");
1✔
1091
            info!("Skipping peer discovery.");
1✔
1092
            return Ok(());
1✔
1093
        }
1✔
1094

1✔
1095
        info!("Performing peer discovery");
1✔
1096

1097
        // Ask all peers for their peer lists. This will eventually – once the
1098
        // responses have come in – update the list of potential peers.
1099
        let pmsg = MainToPeerTask::MakePeerDiscoveryRequest;
1✔
1100
        self.main_to_peer_broadcast(pmsg);
1✔
1101

1102
        // Get a peer candidate from the list of potential peers. Generally,
1103
        // the peer lists requested in the previous step will not have come in
1104
        // yet. Therefore, the new candidate is selected based on somewhat
1105
        // (but not overly) old information.
1106
        let Some((peer_candidate, candidate_distance)) = main_loop_state
1✔
1107
            .potential_peers
1✔
1108
            .get_candidate(&connected_peers, own_instance_id)
1✔
1109
        else {
1110
            info!("Found no peer candidate to connect to. Not making new connection.");
1✔
1111
            return Ok(());
1✔
1112
        };
1113

1114
        // Try to connect to the selected candidate.
1115
        info!("Connecting to peer {peer_candidate} with distance {candidate_distance}");
×
1116
        let global_state_lock = self.global_state_lock.clone();
×
1117
        let main_to_peer_broadcast_rx = self.main_to_peer_broadcast_tx.subscribe();
×
1118
        let peer_task_to_main_tx = self.peer_task_to_main_tx.to_owned();
×
1119
        let outgoing_connection_task = tokio::task::Builder::new()
×
1120
            .name("call_peer_wrapper_2")
×
1121
            .spawn(async move {
×
1122
                call_peer(
×
1123
                    peer_candidate,
×
1124
                    global_state_lock,
×
1125
                    main_to_peer_broadcast_rx,
×
1126
                    peer_task_to_main_tx,
×
1127
                    own_handshake_data,
×
1128
                    candidate_distance,
×
1129
                )
×
1130
                .await;
×
1131
            })?;
×
1132
        main_loop_state.task_handles.push(outgoing_connection_task);
×
1133
        main_loop_state.task_handles.retain(|th| !th.is_finished());
×
1134

1135
        // Immediately request the new peer's peer list. This allows
1136
        // incorporating the new peer's peers into the list of potential peers,
1137
        // to be used in the next round of peer discovery.
1138
        let m2pmsg = MainToPeerTask::MakeSpecificPeerDiscoveryRequest(peer_candidate);
×
1139
        self.main_to_peer_broadcast(m2pmsg);
×
1140

×
1141
        Ok(())
×
1142
    }
2✔
1143

1144
    /// Return a list of block heights for a block-batch request.
1145
    ///
1146
    /// Returns an ordered list of the heights of *most preferred block*
1147
    /// to build on, where current tip is always the most preferred block.
1148
    ///
1149
    /// Uses a factor to ensure that the peer will always have something to
1150
    /// build on top of by providing potential starting points all the way
1151
    /// back to genesis.
1152
    fn batch_request_uca_candidate_heights(own_tip_height: BlockHeight) -> Vec<BlockHeight> {
258✔
1153
        const FACTOR: f64 = 1.07f64;
1154

1155
        let mut look_behind = 0;
258✔
1156
        let mut ret = vec![];
258✔
1157

1158
        // A factor of 1.07 can look back ~1m blocks in 200 digests.
1159
        while ret.len() < MAX_NUM_DIGESTS_IN_BATCH_REQUEST - 1 {
51,374✔
1160
            let height = match own_tip_height.checked_sub(look_behind) {
51,118✔
1161
                None => break,
1✔
1162
                Some(height) if height.is_genesis() => break,
51,117✔
1163
                Some(height) => height,
51,116✔
1164
            };
51,116✔
1165

51,116✔
1166
            ret.push(height);
51,116✔
1167
            look_behind = ((look_behind as f64 + 1.0) * FACTOR).floor() as u64;
51,116✔
1168
        }
1169

1170
        ret.push(BlockHeight::genesis());
258✔
1171

258✔
1172
        ret
258✔
1173
    }
258✔
1174

1175
    /// Logic for requesting the batch-download of blocks from peers
1176
    ///
1177
    /// Locking:
1178
    ///   * acquires `global_state_lock` for read
1179
    async fn block_sync(&mut self, main_loop_state: &mut MutableMainLoopState) -> Result<()> {
36✔
1180
        let global_state = self.global_state_lock.lock_guard().await;
36✔
1181

1182
        // Check if we are in sync mode
1183
        let Some(anchor) = &global_state.net.sync_anchor else {
36✔
1184
            return Ok(());
34✔
1185
        };
1186

1187
        info!("Running sync");
2✔
1188

1189
        let (own_tip_hash, own_tip_height, own_cumulative_pow) = (
2✔
1190
            global_state.chain.light_state().hash(),
2✔
1191
            global_state.chain.light_state().kernel.header.height,
2✔
1192
            global_state
2✔
1193
                .chain
2✔
1194
                .light_state()
2✔
1195
                .kernel
2✔
1196
                .header
2✔
1197
                .cumulative_proof_of_work,
2✔
1198
        );
2✔
1199

2✔
1200
        // Check if sync mode has timed out entirely, in which case it should
2✔
1201
        // be abandoned.
2✔
1202
        let anchor = anchor.to_owned();
2✔
1203
        if self.now().duration_since(anchor.updated)? > GLOBAL_SYNCHRONIZATION_TIMEOUT {
2✔
1204
            warn!("Sync mode has timed out. Abandoning sync mode.");
1✔
1205

1206
            // Abandon attempt, and punish all peers claiming to serve these
1207
            // blocks.
1208
            drop(global_state);
1✔
1209
            self.global_state_lock
1✔
1210
                .lock_guard_mut()
1✔
1211
                .await
1✔
1212
                .net
1✔
1213
                .sync_anchor = None;
1✔
1214

1215
            let peers_to_punish = main_loop_state
1✔
1216
                .sync_state
1✔
1217
                .get_potential_peers_for_sync_request(own_cumulative_pow);
1✔
1218

1219
            for peer in peers_to_punish {
2✔
1220
                let pmsg = MainToPeerTask::PeerSynchronizationTimeout(peer);
1✔
1221
                self.main_to_peer_broadcast(pmsg);
1✔
1222
            }
1✔
1223

1224
            return Ok(());
1✔
1225
        }
1✔
1226

1✔
1227
        let (peer_to_sanction, try_new_request): (Option<SocketAddr>, bool) = main_loop_state
1✔
1228
            .sync_state
1✔
1229
            .get_status_of_last_request(own_tip_height, self.now());
1✔
1230

1231
        // Sanction peer if they failed to respond
1232
        if let Some(peer) = peer_to_sanction {
1✔
1233
            let pmsg = MainToPeerTask::PeerSynchronizationTimeout(peer);
×
1234
            self.main_to_peer_broadcast(pmsg);
×
1235
        }
1✔
1236

1237
        if !try_new_request {
1✔
1238
            info!("Waiting for last sync to complete.");
×
1239
            return Ok(());
×
1240
        }
1✔
1241

1✔
1242
        // Create the next request from the reported
1✔
1243
        info!("Creating new sync request");
1✔
1244

1245
        // Pick a random peer that has reported to have relevant blocks
1246
        let candidate_peers = main_loop_state
1✔
1247
            .sync_state
1✔
1248
            .get_potential_peers_for_sync_request(own_cumulative_pow);
1✔
1249
        let chosen_peer = candidate_peers.choose(&mut rand::rng());
1✔
1250
        assert!(
1✔
1251
            chosen_peer.is_some(),
1✔
1252
            "A synchronization candidate must be available for a request. \
×
1253
            Otherwise, the data structure is in an invalid state and syncing should not be active"
×
1254
        );
1255

1256
        let ordered_preferred_block_digests = match anchor.champion {
1✔
1257
            Some((_height, digest)) => vec![digest],
×
1258
            None => {
1259
                // Find candidate-UCA digests based on a sparse distribution of
1260
                // block heights skewed towards own tip height
1261
                let request_heights = Self::batch_request_uca_candidate_heights(own_tip_height);
1✔
1262
                let mut ordered_preferred_block_digests = vec![];
1✔
1263
                for height in request_heights {
2✔
1264
                    let digest = global_state
1✔
1265
                        .chain
1✔
1266
                        .archival_state()
1✔
1267
                        .archival_block_mmr
1✔
1268
                        .ammr()
1✔
1269
                        .get_leaf_async(height.into())
1✔
1270
                        .await;
1✔
1271
                    ordered_preferred_block_digests.push(digest);
1✔
1272
                }
1273
                ordered_preferred_block_digests
1✔
1274
            }
1275
        };
1276

1277
        // Send message to the relevant peer loop to request the blocks
1278
        let chosen_peer = chosen_peer.unwrap();
1✔
1279
        info!(
1✔
1280
            "Sending block batch request to {}\nrequesting blocks descending from {}\n height {}",
×
1281
            chosen_peer, own_tip_hash, own_tip_height
1282
        );
1283
        let pmsg = MainToPeerTask::RequestBlockBatch(MainToPeerTaskBatchBlockRequest {
1✔
1284
            peer_addr_target: *chosen_peer,
1✔
1285
            known_blocks: ordered_preferred_block_digests,
1✔
1286
            anchor_mmr: anchor.block_mmr.clone(),
1✔
1287
        });
1✔
1288
        self.main_to_peer_broadcast(pmsg);
1✔
1289

1✔
1290
        // Record that this request was sent to the peer
1✔
1291
        let requested_block_height = own_tip_height.next();
1✔
1292
        main_loop_state
1✔
1293
            .sync_state
1✔
1294
            .record_request(requested_block_height, *chosen_peer, self.now());
1✔
1295

1✔
1296
        Ok(())
1✔
1297
    }
36✔
1298

1299
    /// Scheduled task for upgrading the proofs of transactions in the mempool.
1300
    ///
1301
    /// Will either perform a merge of two transactions supported with single
1302
    /// proofs, or will upgrade a transaction proof of the type
1303
    /// `ProofCollection` to `SingleProof`.
1304
    ///
1305
    /// All proving takes place in a spawned task such that it doesn't block
1306
    /// the main loop. The MutableMainLoopState gets the JoinHandle of the
1307
    /// spawned upgrade task such that its status can be expected.
1308
    async fn proof_upgrader(&mut self, main_loop_state: &mut MutableMainLoopState) -> Result<()> {
11✔
1309
        fn attempt_upgrade(
11✔
1310
            global_state: &GlobalState,
11✔
1311
            now: SystemTime,
11✔
1312
            tx_upgrade_interval: Option<Duration>,
11✔
1313
            main_loop_state: &MutableMainLoopState,
11✔
1314
        ) -> Result<bool> {
11✔
1315
            let duration_since_last_upgrade =
11✔
1316
                now.duration_since(global_state.net.last_tx_proof_upgrade_attempt)?;
11✔
1317
            let previous_upgrade_task_is_still_running = main_loop_state
11✔
1318
                .proof_upgrader_task
11✔
1319
                .as_ref()
11✔
1320
                .is_some_and(|x| !x.is_finished());
11✔
1321
            Ok(global_state.net.sync_anchor.is_none()
11✔
1322
                && global_state.proving_capability() == TxProvingCapability::SingleProof
11✔
1323
                && !previous_upgrade_task_is_still_running
3✔
1324
                && tx_upgrade_interval
3✔
1325
                    .is_some_and(|upgrade_interval| duration_since_last_upgrade > upgrade_interval))
3✔
1326
        }
11✔
1327

1328
        trace!("Running proof upgrader scheduled task");
11✔
1329

1330
        // Check if it's time to run the proof-upgrader, and if we're capable
1331
        // of upgrading a transaction proof.
1332
        let tx_upgrade_interval = self.global_state_lock.cli().tx_upgrade_interval();
11✔
1333
        let (upgrade_candidate, tx_origin) = {
1✔
1334
            let global_state = self.global_state_lock.lock_guard().await;
11✔
1335
            let now = self.now();
11✔
1336
            if !attempt_upgrade(&global_state, now, tx_upgrade_interval, main_loop_state)? {
11✔
1337
                trace!("Not attempting upgrade.");
10✔
1338
                return Ok(());
10✔
1339
            }
1✔
1340

1✔
1341
            debug!("Attempting to run transaction-proof-upgrade");
1✔
1342

1343
            // Find a candidate for proof upgrade
1344
            let Some((upgrade_candidate, tx_origin)) = get_upgrade_task_from_mempool(&global_state)
1✔
1345
            else {
1346
                debug!("Found no transaction-proof to upgrade");
×
1347
                return Ok(());
×
1348
            };
1349

1350
            (upgrade_candidate, tx_origin)
1✔
1351
        };
1✔
1352

1✔
1353
        info!(
1✔
1354
            "Attempting to upgrade transaction proofs of: {}",
×
1355
            upgrade_candidate.affected_txids().iter().join("; ")
×
1356
        );
1357

1358
        // Perform the upgrade, if we're not using the prover for anything else,
1359
        // like mining, or proving our own transaction. Running the prover takes
1360
        // a long time (minutes), so we spawn a task for this such that we do
1361
        // not block the main loop.
1362
        let vm_job_queue = vm_job_queue();
1✔
1363
        let perform_ms_update_if_needed =
1✔
1364
            self.global_state_lock.cli().proving_capability() == TxProvingCapability::SingleProof;
1✔
1365

1✔
1366
        let global_state_lock_clone = self.global_state_lock.clone();
1✔
1367
        let main_to_peer_broadcast_tx_clone = self.main_to_peer_broadcast_tx.clone();
1✔
1368
        let proof_upgrader_task =
1✔
1369
            tokio::task::Builder::new()
1✔
1370
                .name("proof_upgrader")
1✔
1371
                .spawn(async move {
1✔
1372
                    upgrade_candidate
1✔
1373
                        .handle_upgrade(
1✔
1374
                            vm_job_queue,
1✔
1375
                            tx_origin,
1✔
1376
                            perform_ms_update_if_needed,
1✔
1377
                            global_state_lock_clone,
1✔
1378
                            main_to_peer_broadcast_tx_clone,
1✔
1379
                        )
1✔
1380
                        .await
1✔
1381
                })?;
1✔
1382

1383
        main_loop_state.proof_upgrader_task = Some(proof_upgrader_task);
1✔
1384

1✔
1385
        Ok(())
1✔
1386
    }
11✔
1387

1388
    /// Post-processing when new block has arrived. Spawn a task to update
1389
    /// transactions in the mempool. Only when the spawned task has completed,
1390
    /// should the miner continue.
1391
    fn spawn_mempool_txs_update_job(
32✔
1392
        &self,
32✔
1393
        main_loop_state: &mut MutableMainLoopState,
32✔
1394
        update_jobs: Vec<UpdateMutatorSetDataJob>,
32✔
1395
    ) {
32✔
1396
        // job completion of the spawned task is communicated through the
32✔
1397
        // `update_mempool_txs_handle` channel.
32✔
1398
        let vm_job_queue = vm_job_queue();
32✔
1399
        if let Some(handle) = main_loop_state.update_mempool_txs_handle.as_ref() {
32✔
1400
            handle.abort();
23✔
1401
        }
23✔
1402
        let (update_sender, update_receiver) =
32✔
1403
            mpsc::channel::<Vec<Transaction>>(TX_UPDATER_CHANNEL_CAPACITY);
32✔
1404

32✔
1405
        // note: if this task is cancelled, the job will continue
32✔
1406
        // because TritonVmJobOptions::cancel_job_rx is None.
32✔
1407
        // see how compose_task handles cancellation in mine_loop.
32✔
1408
        let job_options = self
32✔
1409
            .global_state_lock
32✔
1410
            .cli()
32✔
1411
            .proof_job_options(TritonVmJobPriority::Highest);
32✔
1412
        main_loop_state.update_mempool_txs_handle = Some(
32✔
1413
            tokio::task::Builder::new()
32✔
1414
                .name("mempool tx ms-updater")
32✔
1415
                .spawn(async move {
32✔
1416
                    Self::update_mempool_jobs(
31✔
1417
                        update_jobs,
31✔
1418
                        vm_job_queue.clone(),
31✔
1419
                        update_sender,
31✔
1420
                        job_options,
31✔
1421
                    )
31✔
1422
                    .await
31✔
1423
                })
31✔
1424
                .unwrap(),
32✔
1425
        );
32✔
1426
        main_loop_state.update_mempool_receiver = update_receiver;
32✔
1427
    }
32✔
1428

1429
    pub async fn run(&mut self) -> Result<i32> {
8✔
1430
        info!("Starting main loop");
8✔
1431

1432
        let task_handles = std::mem::take(&mut self.task_handles);
8✔
1433

8✔
1434
        // Handle incoming connections, messages from peer tasks, and messages from the mining task
8✔
1435
        let mut main_loop_state = MutableMainLoopState::new(task_handles);
8✔
1436

8✔
1437
        // Set up various timers.
8✔
1438
        //
8✔
1439
        // The `MissedTickBehavior::Delay` is appropriate for tasks that don't
8✔
1440
        // do anything meaningful if executed in quick succession. For example,
8✔
1441
        // pruning stale information immediately after pruning stale information
8✔
1442
        // is almost certainly a no-op.
8✔
1443
        // Similarly, tasks performing network operations (e.g., peer discovery)
8✔
1444
        // should probably not try to “catch up” if some ticks were missed.
8✔
1445

8✔
1446
        // Don't run peer discovery immediately at startup since outgoing
8✔
1447
        // connections started from lib.rs may not have finished yet.
8✔
1448
        let mut peer_discovery_interval = time::interval_at(
8✔
1449
            Instant::now() + PEER_DISCOVERY_INTERVAL,
8✔
1450
            PEER_DISCOVERY_INTERVAL,
8✔
1451
        );
8✔
1452
        peer_discovery_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
8✔
1453

8✔
1454
        let mut block_sync_interval = time::interval(SYNC_REQUEST_INTERVAL);
8✔
1455
        block_sync_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
8✔
1456

8✔
1457
        let mut mempool_cleanup_interval = time::interval(MEMPOOL_PRUNE_INTERVAL);
8✔
1458
        mempool_cleanup_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
8✔
1459

8✔
1460
        let mut utxo_notification_cleanup_interval = time::interval(EXPECTED_UTXOS_PRUNE_INTERVAL);
8✔
1461
        utxo_notification_cleanup_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
8✔
1462

8✔
1463
        let mut mp_resync_interval = time::interval(MP_RESYNC_INTERVAL);
8✔
1464
        mp_resync_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
8✔
1465

8✔
1466
        let mut tx_proof_upgrade_interval = time::interval(TRANSACTION_UPGRADE_CHECK_INTERVAL);
8✔
1467
        tx_proof_upgrade_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
8✔
1468

8✔
1469
        // Spawn tasks to monitor for SIGTERM, SIGINT, and SIGQUIT. These
8✔
1470
        // signals are only used on Unix systems.
8✔
1471
        let (tx_term, mut rx_term) = mpsc::channel::<()>(2);
8✔
1472
        let (tx_int, mut rx_int) = mpsc::channel::<()>(2);
8✔
1473
        let (tx_quit, mut rx_quit) = mpsc::channel::<()>(2);
8✔
1474
        #[cfg(unix)]
1475
        {
1476
            use tokio::signal::unix::signal;
1477
            use tokio::signal::unix::SignalKind;
1478

1479
            // Monitor for SIGTERM
1480
            let mut sigterm = signal(SignalKind::terminate())?;
8✔
1481
            tokio::task::Builder::new()
8✔
1482
                .name("sigterm_handler")
8✔
1483
                .spawn(async move {
8✔
1484
                    if sigterm.recv().await.is_some() {
8✔
1485
                        info!("Received SIGTERM");
×
1486
                        tx_term.send(()).await.unwrap();
×
1487
                    }
×
1488
                })?;
×
1489

1490
            // Monitor for SIGINT
1491
            let mut sigint = signal(SignalKind::interrupt())?;
8✔
1492
            tokio::task::Builder::new()
8✔
1493
                .name("sigint_handler")
8✔
1494
                .spawn(async move {
8✔
1495
                    if sigint.recv().await.is_some() {
8✔
1496
                        info!("Received SIGINT");
×
1497
                        tx_int.send(()).await.unwrap();
×
1498
                    }
×
1499
                })?;
×
1500

1501
            // Monitor for SIGQUIT
1502
            let mut sigquit = signal(SignalKind::quit())?;
8✔
1503
            tokio::task::Builder::new()
8✔
1504
                .name("sigquit_handler")
8✔
1505
                .spawn(async move {
8✔
1506
                    if sigquit.recv().await.is_some() {
8✔
1507
                        info!("Received SIGQUIT");
×
1508
                        tx_quit.send(()).await.unwrap();
×
1509
                    }
×
1510
                })?;
×
1511
        }
1512

1513
        #[cfg(not(unix))]
1514
        drop((tx_term, tx_int, tx_quit));
1515

1516
        let exit_code: i32 = loop {
×
1517
            select! {
147✔
1518
                Ok(()) = signal::ctrl_c() => {
147✔
1519
                    info!("Detected Ctrl+c signal.");
×
1520
                    break SUCCESS_EXIT_CODE;
×
1521
                }
1522

1523
                // Monitor for SIGTERM, SIGINT, and SIGQUIT.
1524
                Some(_) = rx_term.recv() => {
147✔
1525
                    info!("Detected SIGTERM signal.");
×
1526
                    break SUCCESS_EXIT_CODE;
×
1527
                }
1528
                Some(_) = rx_int.recv() => {
147✔
1529
                    info!("Detected SIGINT signal.");
×
1530
                    break SUCCESS_EXIT_CODE;
×
1531
                }
1532
                Some(_) = rx_quit.recv() => {
147✔
1533
                    info!("Detected SIGQUIT signal.");
×
1534
                    break SUCCESS_EXIT_CODE;
×
1535
                }
1536

1537
                // Handle incoming connections from peer
1538
                Ok((stream, peer_address)) = self.incoming_peer_listener.accept() => {
147✔
1539
                    // Return early if no incoming connections are accepted. Do
1540
                    // not send application-handshake.
1541
                    if self.global_state_lock.cli().disallow_all_incoming_peer_connections() {
3✔
1542
                        warn!("Got incoming connection despite not accepting any. Ignoring");
×
1543
                        continue;
×
1544
                    }
3✔
1545

1546
                    let state = self.global_state_lock.lock_guard().await;
3✔
1547
                    let main_to_peer_broadcast_rx_clone: broadcast::Receiver<MainToPeerTask> = self.main_to_peer_broadcast_tx.subscribe();
3✔
1548
                    let peer_task_to_main_tx_clone: mpsc::Sender<PeerTaskToMain> = self.peer_task_to_main_tx.clone();
3✔
1549
                    let own_handshake_data: HandshakeData = state.get_own_handshakedata();
3✔
1550
                    let global_state_lock = self.global_state_lock.clone(); // bump arc refcount.
3✔
1551
                    let incoming_peer_task_handle = tokio::task::Builder::new()
3✔
1552
                        .name("answer_peer_wrapper")
3✔
1553
                        .spawn(async move {
3✔
1554
                        match answer_peer(
3✔
1555
                            stream,
3✔
1556
                            global_state_lock,
3✔
1557
                            peer_address,
3✔
1558
                            main_to_peer_broadcast_rx_clone,
3✔
1559
                            peer_task_to_main_tx_clone,
3✔
1560
                            own_handshake_data,
3✔
1561
                        ).await {
3✔
1562
                            Ok(()) => (),
×
1563
                            Err(err) => error!("Got error: {:?}", err),
×
1564
                        }
1565
                    })?;
×
1566
                    main_loop_state.task_handles.push(incoming_peer_task_handle);
3✔
1567
                    main_loop_state.task_handles.retain(|th| !th.is_finished());
9✔
1568
                }
1569

1570
                // Handle messages from peer tasks
1571
                Some(msg) = self.peer_task_to_main_rx.recv() => {
147✔
1572
                    debug!("Received message sent to main task.");
18✔
1573
                    self.handle_peer_task_message(
18✔
1574
                        msg,
18✔
1575
                        &mut main_loop_state,
18✔
1576
                    )
18✔
1577
                    .await?
18✔
1578
                }
1579

1580
                // Handle messages from miner task
1581
                Some(main_message) = self.miner_to_main_rx.recv() => {
147✔
1582
                    let exit_code = self.handle_miner_task_message(main_message, &mut main_loop_state).await?;
×
1583

1584
                    if let Some(exit_code) = exit_code {
×
1585
                        break exit_code;
×
1586
                    }
×
1587

1588
                }
1589

1590
                // Handle the completion of mempool tx-update jobs after new block.
1591
                Some(ms_updated_transactions) = main_loop_state.update_mempool_receiver.recv() => {
147✔
1592
                    self.handle_updated_mempool_txs(ms_updated_transactions).await;
30✔
1593
                }
1594

1595
                // Handle messages from rpc server task
1596
                Some(rpc_server_message) = self.rpc_server_to_main_rx.recv() => {
147✔
1597
                    let shutdown_after_execution = self.handle_rpc_server_message(rpc_server_message.clone(), &mut main_loop_state).await?;
24✔
1598
                    if shutdown_after_execution {
24✔
1599
                        break SUCCESS_EXIT_CODE
×
1600
                    }
24✔
1601
                }
1602

1603
                // Handle peer discovery
1604
                _ = peer_discovery_interval.tick() => {
147✔
1605
                    log_slow_scope!(fn_name!() + "::select::peer_discovery_interval");
×
1606

×
1607
                    // Check number of peers we are connected to and connect to
×
1608
                    // more peers if needed.
×
1609
                    debug!("Timer: peer discovery job");
×
1610

1611
                    // this check makes regtest mode behave in a local, controlled way
1612
                    // because no regtest nodes attempt to discover eachother, so the only
1613
                    // peers are those that are manually added.
1614
                    // see: https://github.com/Neptune-Crypto/neptune-core/issues/539#issuecomment-2764701027
1615
                    if self.global_state_lock.cli().network.use_mock_proof() {
×
1616
                        debug!("peer discovery disabled when network uses mock proofs (eg regtest)")
×
1617
                    } else {
1618
                        self.prune_peers().await?;
×
1619
                        self.reconnect(&mut main_loop_state).await?;
×
1620
                        self.discover_peers(&mut main_loop_state).await?;
×
1621
                    }
1622
                }
1623

1624
                // Handle synchronization (i.e. batch-downloading of blocks)
1625
                _ = block_sync_interval.tick() => {
147✔
1626
                    log_slow_scope!(fn_name!() + "::select::block_sync_interval");
33✔
1627

33✔
1628
                    trace!("Timer: block-synchronization job");
33✔
1629
                    self.block_sync(&mut main_loop_state).await?;
33✔
1630
                }
1631

1632
                // Clean up mempool: remove stale / too old transactions
1633
                _ = mempool_cleanup_interval.tick() => {
147✔
1634
                    log_slow_scope!(fn_name!() + "::select::mempool_cleanup_interval");
8✔
1635

8✔
1636
                    debug!("Timer: mempool-cleaner job");
8✔
1637
                    self
8✔
1638
                        .global_state_lock
8✔
1639
                        .lock_guard_mut()
8✔
1640
                        .await
8✔
1641
                        .mempool_prune_stale_transactions()
8✔
1642
                        .await;
8✔
1643
                }
1644

1645
                // Clean up incoming UTXO notifications: remove stale / too old
1646
                // UTXO notifications from pool
1647
                _ = utxo_notification_cleanup_interval.tick() => {
147✔
1648
                    log_slow_scope!(fn_name!() + "::select::utxo_notification_cleanup_interval");
8✔
1649

8✔
1650
                    debug!("Timer: UTXO notification pool cleanup job");
8✔
1651

1652
                    // Danger: possible loss of funds.
1653
                    //
1654
                    // See description of prune_stale_expected_utxos().
1655
                    //
1656
                    // This call is disabled until such time as a thorough
1657
                    // evaluation and perhaps reimplementation determines that
1658
                    // it can be called safely without possible loss of funds.
1659
                    // self.global_state_lock.lock_mut(|s| s.wallet_state.prune_stale_expected_utxos()).await;
1660
                }
1661

1662
                // Handle membership proof resynchronization
1663
                _ = mp_resync_interval.tick() => {
147✔
1664
                    log_slow_scope!(fn_name!() + "::select::mp_resync_interval");
8✔
1665

8✔
1666
                    debug!("Timer: Membership proof resync job");
8✔
1667
                    self.global_state_lock.resync_membership_proofs().await?;
8✔
1668
                }
1669

1670
                // run the proof upgrader
1671
                _ = tx_proof_upgrade_interval.tick() => {
147✔
1672
                    log_slow_scope!(fn_name!() + "::select::tx_proof_upgrade_interval");
8✔
1673

8✔
1674
                    trace!("Timer: tx-proof-upgrader");
8✔
1675
                    self.proof_upgrader(&mut main_loop_state).await?;
8✔
1676
                }
1677

1678
            }
1679
        };
1680

1681
        self.graceful_shutdown(main_loop_state.task_handles).await?;
×
1682
        info!("Shutdown completed.");
×
1683

1684
        Ok(exit_code)
×
1685
    }
×
1686

1687
    /// Handle messages from the RPC server. Returns `true` iff the client should shut down
1688
    /// after handling this message.
UNCOV
1689
    async fn handle_rpc_server_message(
×
UNCOV
1690
        &mut self,
×
UNCOV
1691
        msg: RPCServerToMain,
×
UNCOV
1692
        main_loop_state: &mut MutableMainLoopState,
×
1693
    ) -> Result<bool> {
24✔
1694
        match msg {
24✔
1695
            RPCServerToMain::BroadcastTx(transaction) => {
5✔
1696
                debug!(
5✔
1697
                    "`main` received following transaction from RPC Server. {} inputs, {} outputs. Synced to mutator set hash: {}",
×
1698
                    transaction.kernel.inputs.len(),
×
1699
                    transaction.kernel.outputs.len(),
×
1700
                    transaction.kernel.mutator_set_hash
×
1701
                );
1702

1703
                // note: this Tx must already have been added to the mempool by
1704
                // sender.  This occurs in GlobalStateLock::record_transaction().
1705

1706
                // Is this a transaction we can share with peers? If so, share
1707
                // it immediately.
1708
                if let Ok(notification) = transaction.as_ref().try_into() {
5✔
1709
                    let pmsg = MainToPeerTask::TransactionNotification(notification);
×
1710
                    self.main_to_peer_broadcast(pmsg);
×
1711
                } else {
×
1712
                    // Otherwise, upgrade its proof quality, and share it by
1713
                    // spinning up the proof upgrader.
1714
                    let TransactionProof::Witness(primitive_witness) = transaction.proof.clone()
5✔
1715
                    else {
1716
                        panic!("Expected Primitive witness. Got: {:?}", transaction.proof);
×
1717
                    };
1718

1719
                    let vm_job_queue = vm_job_queue();
5✔
1720

5✔
1721
                    let proving_capability = self.global_state_lock.cli().proving_capability();
5✔
1722
                    let network = self.global_state_lock.cli().network;
5✔
1723
                    let upgrade_job = UpgradeJob::from_primitive_witness(
5✔
1724
                        network,
5✔
1725
                        proving_capability,
5✔
1726
                        primitive_witness,
5✔
1727
                    );
5✔
1728

5✔
1729
                    // note: handle_upgrade() hands off proving to the
5✔
1730
                    //       triton-vm job queue and waits for job completion.
5✔
1731
                    // note: handle_upgrade() broadcasts to peers on success.
5✔
1732

5✔
1733
                    let global_state_lock_clone = self.global_state_lock.clone();
5✔
1734
                    let main_to_peer_broadcast_tx_clone = self.main_to_peer_broadcast_tx.clone();
5✔
1735
                    let _proof_upgrader_task = tokio::task::Builder::new()
5✔
1736
                        .name("proof_upgrader")
5✔
1737
                        .spawn(async move {
5✔
1738
                        upgrade_job
4✔
1739
                            .handle_upgrade(
4✔
1740
                                vm_job_queue.clone(),
4✔
1741
                                TransactionOrigin::Own,
4✔
1742
                                true,
4✔
1743
                                global_state_lock_clone,
4✔
1744
                                main_to_peer_broadcast_tx_clone,
4✔
1745
                            )
4✔
1746
                            .await
4✔
1747
                    })?;
4✔
1748

1749
                    // main_loop_state.proof_upgrader_task = Some(proof_upgrader_task);
1750
                    // If transaction could not be shared immediately because
1751
                    // it contains secret data, upgrade its proof-type.
1752
                }
1753

1754
                // do not shut down
1755
                Ok(false)
5✔
1756
            }
1757
            RPCServerToMain::BroadcastMempoolTransactions => {
1758
                info!("Broadcasting transaction notifications for all shareable transactions in mempool");
×
1759
                let state = self.global_state_lock.lock_guard().await;
×
1760
                let txs = state.mempool.get_sorted_iter().collect_vec();
×
1761
                for (txid, _) in txs {
×
1762
                    // Since a read-lock is held over global state, the
1763
                    // transaction must exist in the mempool.
1764
                    let tx = state
×
1765
                        .mempool
×
1766
                        .get(txid)
×
1767
                        .expect("Transaction from iter must exist in mempool");
×
1768
                    let notification = TransactionNotification::try_from(tx);
×
1769
                    match notification {
×
1770
                        Ok(notification) => {
×
1771
                            let pmsg = MainToPeerTask::TransactionNotification(notification);
×
1772
                            self.main_to_peer_broadcast(pmsg);
×
1773
                        }
×
1774
                        Err(error) => {
×
1775
                            warn!("{error}");
×
1776
                        }
1777
                    };
1778
                }
1779
                Ok(false)
×
1780
            }
1781
            RPCServerToMain::ClearMempool => {
1782
                info!("Clearing mempool");
×
1783
                self.global_state_lock
×
1784
                    .lock_guard_mut()
×
1785
                    .await
×
1786
                    .mempool_clear()
×
1787
                    .await;
×
1788

1789
                Ok(false)
×
1790
            }
1791
            RPCServerToMain::ProofOfWorkSolution(new_block) => {
19✔
1792
                info!("Handling PoW solution from RPC call");
19✔
1793

1794
                self.handle_self_guessed_block(main_loop_state, new_block)
19✔
1795
                    .await?;
19✔
1796
                Ok(false)
19✔
1797
            }
1798
            RPCServerToMain::PauseMiner => {
1799
                info!("Received RPC request to stop miner");
×
1800

1801
                self.main_to_miner_tx.send(MainToMiner::StopMining);
×
1802
                Ok(false)
×
1803
            }
1804
            RPCServerToMain::RestartMiner => {
1805
                info!("Received RPC request to start miner");
×
1806
                self.main_to_miner_tx.send(MainToMiner::StartMining);
×
1807
                Ok(false)
×
1808
            }
1809
            RPCServerToMain::Shutdown => {
1810
                info!("Received RPC shutdown request.");
×
1811

1812
                // shut down
1813
                Ok(true)
×
1814
            }
1815
        }
1816
    }
24✔
1817

1818
    async fn graceful_shutdown(&mut self, task_handles: Vec<JoinHandle<()>>) -> Result<()> {
×
1819
        info!("Shutdown initiated.");
×
1820

1821
        // Stop mining
1822
        self.main_to_miner_tx.send(MainToMiner::Shutdown);
×
1823

×
1824
        // Send 'bye' message to all peers.
×
1825
        let pmsg = MainToPeerTask::DisconnectAll();
×
1826
        self.main_to_peer_broadcast(pmsg);
×
1827
        debug!("sent bye");
×
1828

1829
        // Flush all databases
1830
        self.global_state_lock.flush_databases().await?;
×
1831

1832
        tokio::time::sleep(Duration::from_millis(50)).await;
×
1833

1834
        // Child processes should have finished by now. If not, abort them violently.
1835
        task_handles.iter().for_each(|jh| jh.abort());
×
1836

1837
        // wait for all to finish.
1838
        futures::future::join_all(task_handles).await;
×
1839

1840
        Ok(())
×
1841
    }
×
1842

1843
    // broadcasts message to peers (if any connected)
1844
    //
1845
    // panics if broadcast failed and channel receiver_count is non-zero
1846
    // indicating we have peer connections.
1847
    fn main_to_peer_broadcast(&self, msg: MainToPeerTask) {
43✔
1848
        if let Err(e) = self.main_to_peer_broadcast_tx.send(msg) {
43✔
1849
            // tbd: maybe we should just log an error and ignore rather
1850
            // than panic.  but for now this preserves prior behavior
1851
            let receiver_count = self.main_to_peer_broadcast_tx.receiver_count();
8✔
1852
            assert_eq!(
8✔
1853
                receiver_count, 0,
1854
                "failed to broadcast message from main to {} peer loops: {:?}",
×
1855
                receiver_count, e
1856
            );
1857
        }
35✔
1858
    }
43✔
1859
}
1860

1861
#[cfg(test)]
1862
#[cfg_attr(coverage_nightly, coverage(off))]
1863
mod tests {
1864
    use std::str::FromStr;
1865
    use std::time::UNIX_EPOCH;
1866

1867
    use macro_rules_attr::apply;
1868
    use tracing_test::traced_test;
1869

1870
    use super::*;
1871
    use crate::config_models::cli_args;
1872
    use crate::config_models::network::Network;
1873
    use crate::tests::shared::get_dummy_peer_incoming;
1874
    use crate::tests::shared::get_test_genesis_setup;
1875
    use crate::tests::shared::invalid_empty_block;
1876
    use crate::tests::shared_tokio_runtime;
1877
    use crate::MINER_CHANNEL_CAPACITY;
1878

1879
    impl MainLoopHandler {
1880
        fn mutable(&mut self) -> MutableMainLoopState {
1881
            MutableMainLoopState::new(std::mem::take(&mut self.task_handles))
1882
        }
1883
    }
1884

1885
    struct TestSetup {
1886
        main_loop_handler: MainLoopHandler,
1887
        main_to_peer_rx: broadcast::Receiver<MainToPeerTask>,
1888
    }
1889

1890
    async fn setup(num_init_peers_outgoing: u8, num_peers_incoming: u8) -> TestSetup {
1891
        const CHANNEL_CAPACITY_MINER_TO_MAIN: usize = 10;
1892

1893
        let network = Network::Main;
1894
        let (
1895
            main_to_peer_tx,
1896
            main_to_peer_rx,
1897
            peer_to_main_tx,
1898
            peer_to_main_rx,
1899
            mut state,
1900
            _own_handshake_data,
1901
        ) = get_test_genesis_setup(network, num_init_peers_outgoing, cli_args::Args::default())
1902
            .await
1903
            .unwrap();
1904
        assert!(
1905
            state
1906
                .lock_guard()
1907
                .await
1908
                .net
1909
                .peer_map
1910
                .iter()
1911
                .all(|(_addr, peer)| peer.connection_is_outbound()),
1912
            "Test assumption: All initial peers must represent outgoing connections."
1913
        );
1914

1915
        for i in 0..num_peers_incoming {
1916
            let peer_address = SocketAddr::from_str(&format!("255.254.253.{i}:8080")).unwrap();
1917
            state
1918
                .lock_guard_mut()
1919
                .await
1920
                .net
1921
                .peer_map
1922
                .insert(peer_address, get_dummy_peer_incoming(peer_address));
1923
        }
1924

1925
        let incoming_peer_listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1926

1927
        let (main_to_miner_tx, _main_to_miner_rx) =
1928
            mpsc::channel::<MainToMiner>(MINER_CHANNEL_CAPACITY);
1929
        let (_miner_to_main_tx, miner_to_main_rx) =
1930
            mpsc::channel::<MinerToMain>(CHANNEL_CAPACITY_MINER_TO_MAIN);
1931
        let (_rpc_server_to_main_tx, rpc_server_to_main_rx) =
1932
            mpsc::channel::<RPCServerToMain>(CHANNEL_CAPACITY_MINER_TO_MAIN);
1933

1934
        let task_join_handles = vec![];
1935

1936
        let main_loop_handler = MainLoopHandler::new(
1937
            incoming_peer_listener,
1938
            state,
1939
            main_to_peer_tx,
1940
            peer_to_main_tx,
1941
            main_to_miner_tx,
1942
            peer_to_main_rx,
1943
            miner_to_main_rx,
1944
            rpc_server_to_main_rx,
1945
            task_join_handles,
1946
        );
1947
        TestSetup {
1948
            main_loop_handler,
1949
            main_to_peer_rx,
1950
        }
1951
    }
1952

1953
    #[apply(shared_tokio_runtime)]
1954
    async fn handle_self_guessed_block_new_tip() {
1955
        // A new tip is registered by main_loop. Verify correct state update.
1956
        let TestSetup {
1957
            mut main_loop_handler,
1958
            mut main_to_peer_rx,
1959
            ..
1960
        } = setup(1, 0).await;
1961
        let network = main_loop_handler.global_state_lock.cli().network;
1962
        let mut mutable_main_loop_state = main_loop_handler.mutable();
1963

1964
        let block1 = invalid_empty_block(&Block::genesis(network));
1965

1966
        assert!(
1967
            main_loop_handler
1968
                .global_state_lock
1969
                .lock_guard()
1970
                .await
1971
                .chain
1972
                .light_state()
1973
                .header()
1974
                .height
1975
                .is_genesis(),
1976
            "Tip must be genesis prior to handling of new block"
1977
        );
1978

1979
        let block1 = Box::new(block1);
1980
        main_loop_handler
1981
            .handle_self_guessed_block(&mut mutable_main_loop_state, block1.clone())
1982
            .await
1983
            .unwrap();
1984
        let new_block_height: u64 = main_loop_handler
1985
            .global_state_lock
1986
            .lock_guard()
1987
            .await
1988
            .chain
1989
            .light_state()
1990
            .header()
1991
            .height
1992
            .into();
1993
        assert_eq!(
1994
            1u64, new_block_height,
1995
            "Tip height must be 1 after handling of new block"
1996
        );
1997
        let msg_to_peer_loops = main_to_peer_rx.recv().await.unwrap();
1998
        if let MainToPeerTask::Block(block_to_peers) = msg_to_peer_loops {
1999
            assert_eq!(
2000
                block1, block_to_peers,
2001
                "Peer loops must have received block 1"
2002
            );
2003
        } else {
2004
            panic!("Must have sent block notification to peer loops")
2005
        }
2006
    }
2007

2008
    mod sync_mode {
2009
        use tasm_lib::twenty_first::util_types::mmr::mmr_accumulator::MmrAccumulator;
2010
        use test_strategy::proptest;
2011

2012
        use super::*;
2013
        use crate::tests::shared::get_dummy_socket_address;
2014

2015
        #[proptest]
2016
        fn batch_request_heights_prop(#[strategy(0u64..100_000_000_000)] own_height: u64) {
2017
            batch_request_heights_sanity(own_height);
2018
        }
2019

2020
        #[test]
2021
        fn batch_request_heights_unit() {
2022
            let own_height = 1_000_000u64;
2023
            batch_request_heights_sanity(own_height);
2024
        }
2025

2026
        fn batch_request_heights_sanity(own_height: u64) {
2027
            let heights = MainLoopHandler::batch_request_uca_candidate_heights(own_height.into());
2028

2029
            let mut heights_rev = heights.clone();
2030
            heights_rev.reverse();
2031
            assert!(
2032
                heights_rev.is_sorted(),
2033
                "Heights must be sorted from high-to-low"
2034
            );
2035

2036
            heights_rev.dedup();
2037
            assert_eq!(heights_rev.len(), heights.len(), "duplicates");
2038

2039
            assert_eq!(heights[0], own_height.into(), "starts with own tip height");
2040
            assert!(
2041
                heights.last().unwrap().is_genesis(),
2042
                "ends with genesis block"
2043
            );
2044
        }
2045

2046
        #[apply(shared_tokio_runtime)]
2047
        #[traced_test]
2048
        async fn sync_mode_abandoned_on_global_timeout() {
2049
            let num_outgoing_connections = 0;
2050
            let num_incoming_connections = 0;
2051
            let TestSetup {
2052
                mut main_loop_handler,
2053
                main_to_peer_rx: _main_to_peer_rx,
2054
                ..
2055
            } = setup(num_outgoing_connections, num_incoming_connections).await;
2056
            let mut mutable_main_loop_state = main_loop_handler.mutable();
2057

2058
            main_loop_handler
2059
                .block_sync(&mut mutable_main_loop_state)
2060
                .await
2061
                .expect("Must return OK when no sync mode is set");
2062

2063
            // Mock that we are in a valid sync state
2064
            let claimed_max_height = 1_000u64.into();
2065
            let claimed_max_pow = ProofOfWork::new([100; 6]);
2066
            main_loop_handler
2067
                .global_state_lock
2068
                .lock_guard_mut()
2069
                .await
2070
                .net
2071
                .sync_anchor = Some(SyncAnchor::new(
2072
                claimed_max_pow,
2073
                MmrAccumulator::new_from_leafs(vec![]),
2074
            ));
2075
            mutable_main_loop_state.sync_state.peer_sync_states.insert(
2076
                get_dummy_socket_address(0),
2077
                PeerSynchronizationState::new(claimed_max_height, claimed_max_pow),
2078
            );
2079

2080
            let sync_start_time = main_loop_handler
2081
                .global_state_lock
2082
                .lock_guard()
2083
                .await
2084
                .net
2085
                .sync_anchor
2086
                .as_ref()
2087
                .unwrap()
2088
                .updated;
2089
            main_loop_handler
2090
                .block_sync(&mut mutable_main_loop_state)
2091
                .await
2092
                .expect("Must return OK when sync mode has not timed out yet");
2093
            assert!(
2094
                main_loop_handler
2095
                    .global_state_lock
2096
                    .lock_guard()
2097
                    .await
2098
                    .net
2099
                    .sync_anchor
2100
                    .is_some(),
2101
                "Sync mode must still be set before timeout has occurred"
2102
            );
2103

2104
            assert_eq!(
2105
                sync_start_time,
2106
                main_loop_handler
2107
                    .global_state_lock
2108
                    .lock_guard()
2109
                    .await
2110
                    .net
2111
                    .sync_anchor
2112
                    .as_ref()
2113
                    .unwrap()
2114
                    .updated,
2115
                "timestamp may not be updated without state change"
2116
            );
2117

2118
            // Mock that sync-mode has timed out
2119
            main_loop_handler = main_loop_handler.with_mocked_time(
2120
                SystemTime::now() + GLOBAL_SYNCHRONIZATION_TIMEOUT + Duration::from_secs(1),
2121
            );
2122

2123
            main_loop_handler
2124
                .block_sync(&mut mutable_main_loop_state)
2125
                .await
2126
                .expect("Must return OK when sync mode has timed out");
2127
            assert!(
2128
                main_loop_handler
2129
                    .global_state_lock
2130
                    .lock_guard()
2131
                    .await
2132
                    .net
2133
                    .sync_anchor
2134
                    .is_none(),
2135
                "Sync mode must be unset on timeout"
2136
            );
2137
        }
2138
    }
2139

2140
    mod proof_upgrader {
2141
        use super::*;
2142
        use crate::models::blockchain::transaction::Transaction;
2143
        use crate::models::blockchain::transaction::TransactionProof;
2144
        use crate::models::blockchain::type_scripts::native_currency_amount::NativeCurrencyAmount;
2145
        use crate::models::peer::transfer_transaction::TransactionProofQuality;
2146
        use crate::models::proof_abstractions::timestamp::Timestamp;
2147
        use crate::models::state::tx_creation_config::TxCreationConfig;
2148
        use crate::models::state::wallet::transaction_output::TxOutput;
2149

2150
        async fn tx_no_outputs(
2151
            global_state_lock: &mut GlobalStateLock,
2152
            tx_proof_type: TxProvingCapability,
2153
            fee: NativeCurrencyAmount,
2154
        ) -> Arc<Transaction> {
2155
            let change_key = global_state_lock
2156
                .lock_guard()
2157
                .await
2158
                .wallet_state
2159
                .wallet_entropy
2160
                .nth_generation_spending_key_for_tests(0);
2161
            let in_seven_months = global_state_lock
2162
                .lock_guard()
2163
                .await
2164
                .chain
2165
                .light_state()
2166
                .header()
2167
                .timestamp
2168
                + Timestamp::months(7);
2169

2170
            let config = TxCreationConfig::default()
2171
                .recover_change_off_chain(change_key.into())
2172
                .with_prover_capability(tx_proof_type);
2173
            global_state_lock
2174
                .api()
2175
                .tx_initiator_internal()
2176
                .create_transaction(Vec::<TxOutput>::new().into(), fee, in_seven_months, config)
2177
                .await
2178
                .unwrap()
2179
                .transaction
2180
        }
2181

2182
        #[apply(shared_tokio_runtime)]
2183
        #[traced_test]
2184
        async fn upgrade_proof_collection_to_single_proof_foreign_tx() {
2185
            let num_outgoing_connections = 0;
2186
            let num_incoming_connections = 0;
2187
            let TestSetup {
2188
                mut main_loop_handler,
2189
                mut main_to_peer_rx,
2190
                ..
2191
            } = setup(num_outgoing_connections, num_incoming_connections).await;
2192

2193
            // Force instance to create SingleProofs, otherwise CI and other
2194
            // weak machines fail.
2195
            let mocked_cli = cli_args::Args {
2196
                tx_proving_capability: Some(TxProvingCapability::SingleProof),
2197
                tx_proof_upgrade_interval: 100, // seconds
2198
                ..Default::default()
2199
            };
2200

2201
            main_loop_handler
2202
                .global_state_lock
2203
                .set_cli(mocked_cli)
2204
                .await;
2205
            let mut main_loop_handler = main_loop_handler.with_mocked_time(SystemTime::now());
2206
            let mut mutable_main_loop_state = main_loop_handler.mutable();
2207

2208
            assert!(
2209
                main_loop_handler
2210
                    .proof_upgrader(&mut mutable_main_loop_state)
2211
                    .await
2212
                    .is_ok(),
2213
                "Scheduled task returns OK when run on empty mempool"
2214
            );
2215

2216
            let fee = NativeCurrencyAmount::coins(1);
2217
            let proof_collection_tx = tx_no_outputs(
2218
                &mut main_loop_handler.global_state_lock,
2219
                TxProvingCapability::ProofCollection,
2220
                fee,
2221
            )
2222
            .await;
2223

2224
            main_loop_handler
2225
                .global_state_lock
2226
                .lock_guard_mut()
2227
                .await
2228
                .mempool_insert((*proof_collection_tx).clone(), TransactionOrigin::Foreign)
2229
                .await;
2230

2231
            assert!(
2232
                main_loop_handler
2233
                    .proof_upgrader(&mut mutable_main_loop_state)
2234
                    .await
2235
                    .is_ok(),
2236
                "Scheduled task returns OK when it's not yet time to upgrade"
2237
            );
2238

2239
            assert!(
2240
                matches!(
2241
                    main_loop_handler
2242
                        .global_state_lock
2243
                        .lock_guard()
2244
                        .await
2245
                        .mempool
2246
                        .get(proof_collection_tx.kernel.txid())
2247
                        .unwrap()
2248
                        .proof,
2249
                    TransactionProof::ProofCollection(_)
2250
                ),
2251
                "Proof in mempool must still be of type proof collection"
2252
            );
2253

2254
            // Mock that enough time has passed to perform the upgrade. Then
2255
            // perform the upgrade.
2256
            let mut main_loop_handler =
2257
                main_loop_handler.with_mocked_time(SystemTime::now() + Duration::from_secs(300));
2258
            assert!(
2259
                main_loop_handler
2260
                    .proof_upgrader(&mut mutable_main_loop_state)
2261
                    .await
2262
                    .is_ok(),
2263
                "Scheduled task must return OK when it's time to upgrade"
2264
            );
2265

2266
            // Wait for upgrade task to finish.
2267
            let handle = mutable_main_loop_state.proof_upgrader_task.unwrap().await;
2268
            assert!(
2269
                handle.is_ok(),
2270
                "Proof-upgrade task must finish successfully."
2271
            );
2272

2273
            // At this point there should be one transaction in the mempool,
2274
            // which is (if all is well) the merger of the ProofCollection
2275
            // transaction inserted above and one of the upgrader's fee
2276
            // gobblers. The point is that this transaction is a SingleProof
2277
            // transaction, so test that.
2278

2279
            let (merged_txid, _) = main_loop_handler
2280
                .global_state_lock
2281
                .lock_guard()
2282
                .await
2283
                .mempool
2284
                .get_sorted_iter()
2285
                .next_back()
2286
                .expect("mempool should contain one item here");
2287

2288
            assert!(
2289
                matches!(
2290
                    main_loop_handler
2291
                        .global_state_lock
2292
                        .lock_guard()
2293
                        .await
2294
                        .mempool
2295
                        .get(merged_txid)
2296
                        .unwrap()
2297
                        .proof,
2298
                    TransactionProof::SingleProof(_)
2299
                ),
2300
                "Proof in mempool must now be of type single proof"
2301
            );
2302

2303
            match main_to_peer_rx.recv().await {
2304
                Ok(MainToPeerTask::TransactionNotification(tx_noti)) => {
2305
                    assert_eq!(merged_txid, tx_noti.txid);
2306
                    assert_eq!(TransactionProofQuality::SingleProof, tx_noti.proof_quality);
2307
                },
2308
                other => panic!("Must have sent transaction notification to peer loop after successful proof upgrade. Got:\n{other:?}"),
2309
            }
2310
        }
2311
    }
2312

2313
    mod peer_discovery {
2314
        use super::*;
2315

2316
        #[apply(shared_tokio_runtime)]
2317
        #[traced_test]
2318
        async fn prune_peers_too_many_connections() {
2319
            let num_init_peers_outgoing = 10;
2320
            let num_init_peers_incoming = 4;
2321
            let TestSetup {
2322
                mut main_loop_handler,
2323
                mut main_to_peer_rx,
2324
                ..
2325
            } = setup(num_init_peers_outgoing, num_init_peers_incoming).await;
2326

2327
            let mocked_cli = cli_args::Args {
2328
                max_num_peers: num_init_peers_outgoing as usize,
2329
                ..Default::default()
2330
            };
2331

2332
            main_loop_handler
2333
                .global_state_lock
2334
                .set_cli(mocked_cli)
2335
                .await;
2336

2337
            main_loop_handler.prune_peers().await.unwrap();
2338
            assert_eq!(4, main_to_peer_rx.len());
2339
            for _ in 0..4 {
2340
                let peer_msg = main_to_peer_rx.recv().await.unwrap();
2341
                assert!(matches!(peer_msg, MainToPeerTask::Disconnect(_)))
2342
            }
2343
        }
2344

2345
        #[apply(shared_tokio_runtime)]
2346
        #[traced_test]
2347
        async fn prune_peers_not_too_many_connections() {
2348
            let num_init_peers_outgoing = 10;
2349
            let num_init_peers_incoming = 1;
2350
            let TestSetup {
2351
                mut main_loop_handler,
2352
                main_to_peer_rx,
2353
                ..
2354
            } = setup(num_init_peers_outgoing, num_init_peers_incoming).await;
2355

2356
            let mocked_cli = cli_args::Args {
2357
                max_num_peers: 200,
2358
                ..Default::default()
2359
            };
2360

2361
            main_loop_handler
2362
                .global_state_lock
2363
                .set_cli(mocked_cli)
2364
                .await;
2365

2366
            main_loop_handler.prune_peers().await.unwrap();
2367
            assert!(main_to_peer_rx.is_empty());
2368
        }
2369

2370
        #[apply(shared_tokio_runtime)]
2371
        #[traced_test]
2372
        async fn skip_peer_discovery_if_peer_limit_is_exceeded() {
2373
            let num_init_peers_outgoing = 2;
2374
            let num_init_peers_incoming = 0;
2375
            let TestSetup {
2376
                mut main_loop_handler,
2377
                ..
2378
            } = setup(num_init_peers_outgoing, num_init_peers_incoming).await;
2379

2380
            let mocked_cli = cli_args::Args {
2381
                max_num_peers: 0,
2382
                ..Default::default()
2383
            };
2384
            main_loop_handler
2385
                .global_state_lock
2386
                .set_cli(mocked_cli)
2387
                .await;
2388
            let mut mutable_state = main_loop_handler.mutable();
2389
            main_loop_handler
2390
                .discover_peers(&mut mutable_state)
2391
                .await
2392
                .unwrap();
2393

2394
            assert!(logs_contain("Skipping peer discovery."));
2395
        }
2396

2397
        #[apply(shared_tokio_runtime)]
2398
        #[traced_test]
2399
        async fn performs_peer_discovery_on_few_connections() {
2400
            let num_init_peers_outgoing = 2;
2401
            let num_init_peers_incoming = 0;
2402
            let TestSetup {
2403
                mut main_loop_handler,
2404
                mut main_to_peer_rx,
2405
                ..
2406
            } = setup(num_init_peers_outgoing, num_init_peers_incoming).await;
2407

2408
            // Set CLI to attempt to make more connections
2409
            let mocked_cli = cli_args::Args {
2410
                max_num_peers: 10,
2411
                ..Default::default()
2412
            };
2413
            main_loop_handler
2414
                .global_state_lock
2415
                .set_cli(mocked_cli)
2416
                .await;
2417
            let mut mutable_state = main_loop_handler.mutable();
2418
            main_loop_handler
2419
                .discover_peers(&mut mutable_state)
2420
                .await
2421
                .unwrap();
2422

2423
            let peer_discovery_sent_messages_on_peer_channel = main_to_peer_rx.try_recv().is_ok();
2424
            assert!(peer_discovery_sent_messages_on_peer_channel);
2425
            assert!(logs_contain("Performing peer discovery"));
2426
        }
2427
    }
2428

2429
    #[test]
2430
    fn older_systemtime_ranks_first() {
2431
        let start = UNIX_EPOCH;
2432
        let other = UNIX_EPOCH + Duration::from_secs(1000);
2433
        let mut instants = [start, other];
2434

2435
        assert_eq!(
2436
            start,
2437
            instants.iter().copied().min_by(|l, r| l.cmp(r)).unwrap()
2438
        );
2439

2440
        instants.reverse();
2441

2442
        assert_eq!(
2443
            start,
2444
            instants.iter().copied().min_by(|l, r| l.cmp(r)).unwrap()
2445
        );
2446
    }
2447
    mod bootstrapper_mode {
2448

2449
        use rand::Rng;
2450

2451
        use super::*;
2452
        use crate::models::peer::PeerMessage;
2453
        use crate::models::peer::TransferConnectionStatus;
2454
        use crate::tests::shared::get_dummy_peer_connection_data_genesis;
2455
        use crate::tests::shared::to_bytes;
2456

2457
        #[apply(shared_tokio_runtime)]
2458
        #[traced_test]
2459
        async fn disconnect_from_oldest_peer_upon_connection_request() {
2460
            // Set up a node in bootstrapper mode and connected to a given
2461
            // number of peers, which is one less than the maximum. Initiate a
2462
            // connection request. Verify that the oldest of the existing
2463
            // connections is dropped.
2464

2465
            let network = Network::Main;
2466
            let num_init_peers_outgoing = 5;
2467
            let num_init_peers_incoming = 0;
2468
            let TestSetup {
2469
                mut main_loop_handler,
2470
                mut main_to_peer_rx,
2471
                ..
2472
            } = setup(num_init_peers_outgoing, num_init_peers_incoming).await;
2473

2474
            let mocked_cli = cli_args::Args {
2475
                max_num_peers: usize::from(num_init_peers_outgoing) + 1,
2476
                bootstrap: true,
2477
                network,
2478
                ..Default::default()
2479
            };
2480
            main_loop_handler
2481
                .global_state_lock
2482
                .set_cli(mocked_cli)
2483
                .await;
2484

2485
            let mut mutable_main_loop_state = main_loop_handler.mutable();
2486

2487
            // check sanity: at startup, we are connected to the initial number of peers
2488
            assert_eq!(
2489
                usize::from(num_init_peers_outgoing),
2490
                main_loop_handler
2491
                    .global_state_lock
2492
                    .lock_guard()
2493
                    .await
2494
                    .net
2495
                    .peer_map
2496
                    .len()
2497
            );
2498

2499
            // randomize "connection established" timestamps
2500
            let mut rng = rand::rng();
2501
            let now = SystemTime::now();
2502
            let now_as_unix_timestamp = now.duration_since(UNIX_EPOCH).unwrap();
2503
            main_loop_handler
2504
                .global_state_lock
2505
                .lock_guard_mut()
2506
                .await
2507
                .net
2508
                .peer_map
2509
                .iter_mut()
2510
                .for_each(|(_socket_address, peer_info)| {
2511
                    peer_info.set_connection_established(
2512
                        UNIX_EPOCH
2513
                            + Duration::from_millis(
2514
                                rng.random_range(0..(now_as_unix_timestamp.as_millis() as u64)),
2515
                            ),
2516
                    );
2517
                });
2518

2519
            // compute which peer will be dropped, for later reference
2520
            let expected_drop_peer_socket_address = main_loop_handler
2521
                .global_state_lock
2522
                .lock_guard()
2523
                .await
2524
                .net
2525
                .peer_map
2526
                .iter()
2527
                .min_by(|l, r| {
2528
                    l.1.connection_established()
2529
                        .cmp(&r.1.connection_established())
2530
                })
2531
                .map(|(socket_address, _peer_info)| socket_address)
2532
                .copied()
2533
                .unwrap();
2534

2535
            // simulate incoming connection
2536
            let (peer_handshake_data, peer_socket_address) =
2537
                get_dummy_peer_connection_data_genesis(network, 1);
2538
            let own_handshake_data = main_loop_handler
2539
                .global_state_lock
2540
                .lock_guard()
2541
                .await
2542
                .get_own_handshakedata();
2543
            assert_eq!(peer_handshake_data.network, own_handshake_data.network,);
2544
            assert_eq!(peer_handshake_data.version, own_handshake_data.version,);
2545
            let mock_stream = tokio_test::io::Builder::new()
2546
                .read(
2547
                    &to_bytes(&PeerMessage::Handshake(Box::new((
2548
                        crate::MAGIC_STRING_REQUEST.to_vec(),
2549
                        peer_handshake_data.clone(),
2550
                    ))))
2551
                    .unwrap(),
2552
                )
2553
                .write(
2554
                    &to_bytes(&PeerMessage::Handshake(Box::new((
2555
                        crate::MAGIC_STRING_RESPONSE.to_vec(),
2556
                        own_handshake_data.clone(),
2557
                    ))))
2558
                    .unwrap(),
2559
                )
2560
                .write(
2561
                    &to_bytes(&PeerMessage::ConnectionStatus(
2562
                        TransferConnectionStatus::Accepted,
2563
                    ))
2564
                    .unwrap(),
2565
                )
2566
                .build();
2567
            let peer_to_main_tx_clone = main_loop_handler.peer_task_to_main_tx.clone();
2568
            let global_state_lock_clone = main_loop_handler.global_state_lock.clone();
2569
            let (_main_to_peer_tx_mock, main_to_peer_rx_mock) = tokio::sync::broadcast::channel(10);
2570
            let incoming_peer_task_handle = tokio::task::Builder::new()
2571
                .name("answer_peer_wrapper")
2572
                .spawn(async move {
2573
                    match answer_peer(
2574
                        mock_stream,
2575
                        global_state_lock_clone,
2576
                        peer_socket_address,
2577
                        main_to_peer_rx_mock,
2578
                        peer_to_main_tx_clone,
2579
                        own_handshake_data,
2580
                    )
2581
                    .await
2582
                    {
2583
                        Ok(()) => (),
2584
                        Err(err) => error!("Got error: {:?}", err),
2585
                    }
2586
                })
2587
                .unwrap();
2588

2589
            // `answer_peer_wrapper` should send a
2590
            // `DisconnectFromLongestLivedPeer` message to main
2591
            let peer_to_main_message = main_loop_handler.peer_task_to_main_rx.recv().await.unwrap();
2592
            assert!(matches!(
2593
                peer_to_main_message,
2594
                PeerTaskToMain::DisconnectFromLongestLivedPeer,
2595
            ));
2596

2597
            // process this message
2598
            main_loop_handler
2599
                .handle_peer_task_message(peer_to_main_message, &mut mutable_main_loop_state)
2600
                .await
2601
                .unwrap();
2602

2603
            // main loop should send a `Disconnect` message
2604
            let main_to_peers_message = main_to_peer_rx.recv().await.unwrap();
2605
            let MainToPeerTask::Disconnect(observed_drop_peer_socket_address) =
2606
                main_to_peers_message
2607
            else {
2608
                panic!("Expected disconnect, got {main_to_peers_message:?}");
2609
            };
2610

2611
            // matched observed droppee against expectation
2612
            assert_eq!(
2613
                expected_drop_peer_socket_address,
2614
                observed_drop_peer_socket_address,
2615
            );
2616
            println!("Dropped connection with {expected_drop_peer_socket_address}.");
2617

2618
            // don't forget to terminate the peer task, which is still running
2619
            incoming_peer_task_handle.abort();
2620
        }
2621
    }
2622
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc