• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Neptune-Crypto / neptune-core / 15778331933

20 Jun 2025 11:52AM UTC coverage: 72.25% (+0.4%) from 71.873%
15778331933

Pull #610

github

web-flow
Merge 0253eb5b0 into 3b7351ed6
Pull Request #610: Refactor mempool: Preserve PW over block updates

651 of 753 new or added lines in 19 files covered. (86.45%)

37 existing lines in 9 files now uncovered.

20610 of 28526 relevant lines covered (72.25%)

483914.76 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.27
/src/main_loop.rs
1
pub mod proof_upgrader;
2
pub(crate) mod upgrade_incentive;
3

4
use std::collections::HashMap;
5
use std::net::SocketAddr;
6
use std::sync::Arc;
7
use std::time::Duration;
8
use std::time::SystemTime;
9

10
use anyhow::Result;
11
use itertools::Itertools;
12
use proof_upgrader::get_upgrade_task_from_mempool;
13
use proof_upgrader::UpdateMutatorSetDataJob;
14
use proof_upgrader::UpgradeJob;
15
use rand::prelude::IteratorRandom;
16
use rand::seq::IndexedRandom;
17
use tokio::net::TcpListener;
18
use tokio::select;
19
use tokio::signal;
20
use tokio::sync::broadcast;
21
use tokio::sync::mpsc;
22
use tokio::task::JoinHandle;
23
use tokio::time;
24
use tokio::time::Instant;
25
use tokio::time::MissedTickBehavior;
26
use tracing::debug;
27
use tracing::error;
28
use tracing::info;
29
use tracing::trace;
30
use tracing::warn;
31

32
use crate::connect_to_peers::answer_peer;
33
use crate::connect_to_peers::call_peer;
34
use crate::macros::fn_name;
35
use crate::macros::log_slow_scope;
36
use crate::main_loop::proof_upgrader::PrimitiveWitnessToProofCollection;
37
use crate::main_loop::proof_upgrader::SEARCH_DEPTH_FOR_BLOCKS_FOR_MS_UPDATE;
38
use crate::main_loop::upgrade_incentive::UpgradeIncentive;
39
use crate::models::blockchain::block::block_header::BlockHeader;
40
use crate::models::blockchain::block::block_height::BlockHeight;
41
use crate::models::blockchain::block::difficulty_control::ProofOfWork;
42
use crate::models::blockchain::block::Block;
43
use crate::models::blockchain::transaction::Transaction;
44
use crate::models::blockchain::transaction::TransactionProof;
45
use crate::models::channel::MainToMiner;
46
use crate::models::channel::MainToPeerTask;
47
use crate::models::channel::MainToPeerTaskBatchBlockRequest;
48
use crate::models::channel::MinerToMain;
49
use crate::models::channel::PeerTaskToMain;
50
use crate::models::channel::RPCServerToMain;
51
use crate::models::peer::handshake_data::HandshakeData;
52
use crate::models::peer::peer_info::PeerInfo;
53
use crate::models::peer::transaction_notification::TransactionNotification;
54
use crate::models::peer::PeerSynchronizationState;
55
use crate::models::proof_abstractions::tasm::program::TritonVmProofJobOptions;
56
use crate::models::state::block_proposal::BlockProposal;
57
use crate::models::state::mempool::mempool_update_job::MempoolUpdateJob;
58
use crate::models::state::mempool::mempool_update_job_result::MempoolUpdateJobResult;
59
use crate::models::state::mempool::upgrade_priority::UpgradePriority;
60
use crate::models::state::networking_state::SyncAnchor;
61
use crate::models::state::tx_proving_capability::TxProvingCapability;
62
use crate::models::state::GlobalState;
63
use crate::models::state::GlobalStateLock;
64
use crate::triton_vm_job_queue::vm_job_queue;
65
use crate::triton_vm_job_queue::TritonVmJobPriority;
66
use crate::triton_vm_job_queue::TritonVmJobQueue;
67
use crate::SUCCESS_EXIT_CODE;
68

69
const PEER_DISCOVERY_INTERVAL: Duration = Duration::from_secs(2 * 60);
70
const SYNC_REQUEST_INTERVAL: Duration = Duration::from_secs(3);
71
const MEMPOOL_PRUNE_INTERVAL: Duration = Duration::from_secs(30 * 60);
72
const MP_RESYNC_INTERVAL: Duration = Duration::from_secs(59);
73
const PROOF_UPGRADE_INTERVAL: Duration = Duration::from_secs(10);
74
const EXPECTED_UTXOS_PRUNE_INTERVAL: Duration = Duration::from_secs(19 * 60);
75

76
const SANCTION_PEER_TIMEOUT_FACTOR: u64 = 40;
77

78
/// Number of seconds within which an individual peer is expected to respond
79
/// to a synchronization request.
80
const INDIVIDUAL_PEER_SYNCHRONIZATION_TIMEOUT: Duration =
81
    Duration::from_secs(SYNC_REQUEST_INTERVAL.as_secs() * SANCTION_PEER_TIMEOUT_FACTOR);
82

83
/// Number of seconds that a synchronization may run without any progress.
84
const GLOBAL_SYNCHRONIZATION_TIMEOUT: Duration =
85
    Duration::from_secs(INDIVIDUAL_PEER_SYNCHRONIZATION_TIMEOUT.as_secs() * 4);
86

87
const POTENTIAL_PEER_MAX_COUNT_AS_A_FACTOR_OF_MAX_PEERS: usize = 20;
88
pub(crate) const MAX_NUM_DIGESTS_IN_BATCH_REQUEST: usize = 200;
89
const TX_UPDATER_CHANNEL_CAPACITY: usize = 1;
90

91
/// Wraps a transmission channel.
92
///
93
/// To be used for the transmission channel to the miner, because
94
///  a) the miner might not exist in which case there would be no-one to empty
95
///     the channel; and
96
///  b) contrary to other channels, transmission failures here are not critical.
97
#[derive(Debug)]
98
struct MainToMinerChannel(Option<mpsc::Sender<MainToMiner>>);
99

100
impl MainToMinerChannel {
101
    /// Send a message to the miner task (if any).
102
    fn send(&self, message: MainToMiner) {
65✔
103
        // Do no use the async `send` function because it blocks until there
104
        // is spare capacity on the channel. Messages to the miner are not
105
        // critical so if there is no capacity left, just log an error
106
        // message.
107
        if let Some(channel) = &self.0 {
65✔
108
            if let Err(e) = channel.try_send(message) {
×
109
                error!("Failed to send pause message to miner thread:\n{e}");
×
110
            }
×
111
        }
65✔
112
    }
65✔
113
}
114

115
/// MainLoop is the immutable part of the input for the main loop function
116
#[derive(Debug)]
117
pub struct MainLoopHandler {
118
    incoming_peer_listener: TcpListener,
119
    global_state_lock: GlobalStateLock,
120

121
    // note: broadcast::Sender::send() does not block
122
    main_to_peer_broadcast_tx: broadcast::Sender<MainToPeerTask>,
123

124
    // note: mpsc::Sender::send() blocks if channel full.
125
    // locks should not be held across it.
126
    peer_task_to_main_tx: mpsc::Sender<PeerTaskToMain>,
127

128
    // note: MainToMinerChannel::send() does not block.  might log error.
129
    main_to_miner_tx: MainToMinerChannel,
130

131
    peer_task_to_main_rx: mpsc::Receiver<PeerTaskToMain>,
132
    miner_to_main_rx: mpsc::Receiver<MinerToMain>,
133
    rpc_server_to_main_rx: mpsc::Receiver<RPCServerToMain>,
134
    task_handles: Vec<JoinHandle<()>>,
135

136
    #[cfg(test)]
137
    mock_now: Option<SystemTime>,
138
}
139

140
/// The mutable part of the main loop function
141
struct MutableMainLoopState {
142
    /// Information used to batch-download blocks.
143
    sync_state: SyncState,
144

145
    /// Information about potential peers for new connections.
146
    potential_peers: PotentialPeersState,
147

148
    /// A list of join-handles to spawned tasks.
149
    task_handles: Vec<JoinHandle<()>>,
150

151
    /// A join-handle to a task performing transaction-proof upgrades.
152
    proof_upgrader_task: Option<JoinHandle<()>>,
153

154
    /// A join-handle to a task running the update of the mempool transactions.
155
    update_mempool_txs_handle: Option<JoinHandle<()>>,
156

157
    /// A channel that the task updating mempool transactions can use to
158
    /// communicate its result.
159
    update_mempool_receiver: mpsc::Receiver<Vec<MempoolUpdateJobResult>>,
160
}
161

162
impl MutableMainLoopState {
163
    fn new(task_handles: Vec<JoinHandle<()>>) -> Self {
17✔
164
        let (_dummy_sender, dummy_receiver) =
17✔
165
            mpsc::channel::<Vec<MempoolUpdateJobResult>>(TX_UPDATER_CHANNEL_CAPACITY);
17✔
166
        Self {
17✔
167
            sync_state: SyncState::default(),
17✔
168
            potential_peers: PotentialPeersState::default(),
17✔
169
            task_handles,
17✔
170
            proof_upgrader_task: None,
17✔
171
            update_mempool_txs_handle: None,
17✔
172
            update_mempool_receiver: dummy_receiver,
17✔
173
        }
17✔
174
    }
17✔
175
}
176

177
/// handles batch-downloading of blocks if we are more than n blocks behind
178
#[derive(Default, Debug)]
179
struct SyncState {
180
    peer_sync_states: HashMap<SocketAddr, PeerSynchronizationState>,
181
    last_sync_request: Option<(SystemTime, BlockHeight, SocketAddr)>,
182
}
183

184
impl SyncState {
185
    fn record_request(
1✔
186
        &mut self,
1✔
187
        requested_block_height: BlockHeight,
1✔
188
        peer: SocketAddr,
1✔
189
        now: SystemTime,
1✔
190
    ) {
1✔
191
        self.last_sync_request = Some((now, requested_block_height, peer));
1✔
192
    }
1✔
193

194
    /// Return a list of peers that have reported to be in possession of blocks
195
    /// with a PoW above a threshold.
196
    fn get_potential_peers_for_sync_request(&self, threshold_pow: ProofOfWork) -> Vec<SocketAddr> {
2✔
197
        self.peer_sync_states
2✔
198
            .iter()
2✔
199
            .filter(|(_sa, sync_state)| sync_state.claimed_max_pow > threshold_pow)
2✔
200
            .map(|(sa, _)| *sa)
2✔
201
            .collect()
2✔
202
    }
2✔
203

204
    /// Determine if a peer should be sanctioned for failing to respond to a
205
    /// synchronization request fast enough. Also determine if a new request
206
    /// should be made or the previous one should be allowed to run for longer.
207
    ///
208
    /// Returns (peer to be sanctioned, attempt new request).
209
    fn get_status_of_last_request(
1✔
210
        &self,
1✔
211
        current_block_height: BlockHeight,
1✔
212
        now: SystemTime,
1✔
213
    ) -> (Option<SocketAddr>, bool) {
1✔
214
        // A peer is sanctioned if no answer has been received after N times the sync request
215
        // interval.
216
        match self.last_sync_request {
1✔
217
            None => {
218
                // No sync request has been made since startup of program
219
                (None, true)
1✔
220
            }
221
            Some((req_time, requested_height, peer_sa)) => {
×
222
                if requested_height < current_block_height {
×
223
                    // The last sync request updated the state
224
                    (None, true)
×
225
                } else if req_time + INDIVIDUAL_PEER_SYNCHRONIZATION_TIMEOUT < now {
×
226
                    // The last sync request was not answered, sanction peer
227
                    // and make a new sync request.
228
                    (Some(peer_sa), true)
×
229
                } else {
230
                    // The last sync request has not yet been answered. But it has
231
                    // not timed out yet.
232
                    (None, false)
×
233
                }
234
            }
235
        }
236
    }
1✔
237
}
238

239
/// holds information about a potential peer in the process of peer discovery
240
struct PotentialPeerInfo {
241
    _reported: SystemTime,
242
    _reported_by: SocketAddr,
243
    instance_id: u128,
244
    distance: u8,
245
}
246

247
impl PotentialPeerInfo {
248
    fn new(reported_by: SocketAddr, instance_id: u128, distance: u8, now: SystemTime) -> Self {
3✔
249
        Self {
3✔
250
            _reported: now,
3✔
251
            _reported_by: reported_by,
3✔
252
            instance_id,
3✔
253
            distance,
3✔
254
        }
3✔
255
    }
3✔
256
}
257

258
/// holds information about a set of potential peers in the process of peer discovery
259
struct PotentialPeersState {
260
    potential_peers: HashMap<SocketAddr, PotentialPeerInfo>,
261
}
262

263
impl PotentialPeersState {
264
    fn default() -> Self {
17✔
265
        Self {
17✔
266
            potential_peers: HashMap::new(),
17✔
267
        }
17✔
268
    }
17✔
269

270
    fn add(
3✔
271
        &mut self,
3✔
272
        reported_by: SocketAddr,
3✔
273
        potential_peer: (SocketAddr, u128),
3✔
274
        max_peers: usize,
3✔
275
        distance: u8,
3✔
276
        now: SystemTime,
3✔
277
    ) {
3✔
278
        let potential_peer_socket_address = potential_peer.0;
3✔
279
        let potential_peer_instance_id = potential_peer.1;
3✔
280

281
        // This check *should* make it likely that a potential peer is always
282
        // registered with the lowest observed distance.
283
        if self
3✔
284
            .potential_peers
3✔
285
            .contains_key(&potential_peer_socket_address)
3✔
286
        {
287
            return;
×
288
        }
3✔
289

290
        // If this data structure is full, remove a random entry. Then add this.
291
        if self.potential_peers.len()
3✔
292
            > max_peers * POTENTIAL_PEER_MAX_COUNT_AS_A_FACTOR_OF_MAX_PEERS
3✔
293
        {
×
294
            let mut rng = rand::rng();
×
295
            let random_potential_peer = self
×
296
                .potential_peers
×
297
                .keys()
×
298
                .choose(&mut rng)
×
299
                .unwrap()
×
300
                .to_owned();
×
301
            self.potential_peers.remove(&random_potential_peer);
×
302
        }
3✔
303

304
        let insert_value =
3✔
305
            PotentialPeerInfo::new(reported_by, potential_peer_instance_id, distance, now);
3✔
306
        self.potential_peers
3✔
307
            .insert(potential_peer_socket_address, insert_value);
3✔
308
    }
3✔
309

310
    /// Return a peer from the potential peer list that we aren't connected to
311
    /// and  that isn't our own address.
312
    ///
313
    /// Favors peers with a high distance and with IPs that we are not already
314
    /// connected to.
315
    ///
316
    /// Returns (socket address, peer distance)
317
    fn get_candidate(
1✔
318
        &self,
1✔
319
        connected_clients: &[PeerInfo],
1✔
320
        own_instance_id: u128,
1✔
321
    ) -> Option<(SocketAddr, u8)> {
1✔
322
        let peers_instance_ids: Vec<u128> =
1✔
323
            connected_clients.iter().map(|x| x.instance_id()).collect();
2✔
324

325
        // Only pick those peers that report a listening port
326
        let peers_listen_addresses: Vec<SocketAddr> = connected_clients
1✔
327
            .iter()
1✔
328
            .filter_map(|x| x.listen_address())
2✔
329
            .collect();
1✔
330

331
        // Find the appropriate candidates
332
        let candidates = self
1✔
333
            .potential_peers
1✔
334
            .iter()
1✔
335
            // Prevent connecting to self. Note that we *only* use instance ID to prevent this,
336
            // meaning this will allow multiple nodes e.g. running on the same computer to form
337
            // a complete graph.
338
            .filter(|pp| pp.1.instance_id != own_instance_id)
1✔
339
            // Prevent connecting to peer we already are connected to
340
            .filter(|potential_peer| !peers_instance_ids.contains(&potential_peer.1.instance_id))
1✔
341
            .filter(|potential_peer| !peers_listen_addresses.contains(potential_peer.0))
1✔
342
            .collect::<Vec<_>>();
1✔
343

344
        // Prefer candidates with IPs that we are not already connected to but
345
        // connect to repeated IPs in case we don't have other options, as
346
        // repeated IPs may just be multiple machines on the same NAT'ed IPv4
347
        // address.
348
        let mut connected_ips = peers_listen_addresses.into_iter().map(|x| x.ip());
1✔
349
        let candidates = if candidates
1✔
350
            .iter()
1✔
351
            .any(|candidate| !connected_ips.contains(&candidate.0.ip()))
1✔
352
        {
353
            candidates
×
354
                .into_iter()
×
355
                .filter(|candidate| !connected_ips.contains(&candidate.0.ip()))
×
356
                .collect()
×
357
        } else {
358
            candidates
1✔
359
        };
360

361
        // Get the candidate list with the highest distance
362
        let max_distance_candidates = candidates.iter().max_by_key(|pp| pp.1.distance);
1✔
363

364
        // Pick a random candidate from the appropriate candidates
365
        let mut rng = rand::rng();
1✔
366
        max_distance_candidates
1✔
367
            .iter()
1✔
368
            .choose(&mut rng)
1✔
369
            .map(|x| (x.0.to_owned(), x.1.distance))
1✔
370
    }
1✔
371
}
372

373
/// Return a boolean indicating if synchronization mode should be left
374
fn stay_in_sync_mode(
×
375
    own_block_tip_header: &BlockHeader,
×
376
    sync_state: &SyncState,
×
377
    sync_mode_threshold: usize,
×
378
) -> bool {
×
379
    let max_claimed_pow = sync_state
×
380
        .peer_sync_states
×
381
        .values()
×
382
        .max_by_key(|x| x.claimed_max_pow);
×
383
    match max_claimed_pow {
×
384
        None => false, // No peer have passed the sync challenge phase.
×
385

386
        // Synchronization is left when the remaining number of block is half of what has
387
        // been indicated to fit into RAM
388
        Some(max_claim) => {
×
389
            own_block_tip_header.cumulative_proof_of_work < max_claim.claimed_max_pow
×
390
                && max_claim.claimed_max_height - own_block_tip_header.height
×
391
                    > sync_mode_threshold as i128 / 2
×
392
        }
393
    }
394
}
×
395

396
impl MainLoopHandler {
397
    // todo: find a way to avoid triggering lint
398
    #[allow(clippy::too_many_arguments)]
399
    pub(crate) fn new(
22✔
400
        incoming_peer_listener: TcpListener,
22✔
401
        global_state_lock: GlobalStateLock,
22✔
402
        main_to_peer_broadcast_tx: broadcast::Sender<MainToPeerTask>,
22✔
403
        peer_task_to_main_tx: mpsc::Sender<PeerTaskToMain>,
22✔
404
        main_to_miner_tx: mpsc::Sender<MainToMiner>,
22✔
405

22✔
406
        peer_task_to_main_rx: mpsc::Receiver<PeerTaskToMain>,
22✔
407
        miner_to_main_rx: mpsc::Receiver<MinerToMain>,
22✔
408
        rpc_server_to_main_rx: mpsc::Receiver<RPCServerToMain>,
22✔
409
        task_handles: Vec<JoinHandle<()>>,
22✔
410
    ) -> Self {
22✔
411
        let maybe_main_to_miner_tx = if global_state_lock.cli().mine() {
22✔
412
            Some(main_to_miner_tx)
×
413
        } else {
414
            None
22✔
415
        };
416
        Self {
22✔
417
            incoming_peer_listener,
22✔
418
            global_state_lock,
22✔
419
            main_to_miner_tx: MainToMinerChannel(maybe_main_to_miner_tx),
22✔
420
            main_to_peer_broadcast_tx,
22✔
421
            peer_task_to_main_tx,
22✔
422

22✔
423
            peer_task_to_main_rx,
22✔
424
            miner_to_main_rx,
22✔
425
            rpc_server_to_main_rx,
22✔
426
            task_handles,
22✔
427

22✔
428
            #[cfg(test)]
22✔
429
            mock_now: None,
22✔
430
        }
22✔
431
    }
22✔
432

433
    pub fn global_state_lock(&mut self) -> GlobalStateLock {
11✔
434
        self.global_state_lock.clone()
11✔
435
    }
11✔
436

437
    /// Allows for mocked timestamps such that time dependencies may be tested.
438
    #[cfg(test)]
439
    fn with_mocked_time(mut self, mocked_time: SystemTime) -> Self {
2✔
440
        self.mock_now = Some(mocked_time);
2✔
441
        self
2✔
442
    }
2✔
443

444
    fn now(&self) -> SystemTime {
7✔
445
        #[cfg(not(test))]
446
        {
447
            SystemTime::now()
3✔
448
        }
449
        #[cfg(test)]
450
        {
451
            self.mock_now.unwrap_or(SystemTime::now())
4✔
452
        }
453
    }
7✔
454

455
    /// Update the mutator set data for a list of mempool transactions. Will
456
    /// produce transactions with the same proof quality as what was present in
457
    /// the mempool, so a primitive witness backed transaction will be updated
458
    /// to a new primitive witness backed transaction, a proof-collection to
459
    /// proof-collection, and single proof to single proof.
460
    ///
461
    /// In the case of proof collection, it is not possible to update the
462
    /// transaction, so the primitive witness is instead used to accomplish
463
    /// this.
464
    ///
465
    /// Sends the result back through the provided channel.
466
    async fn update_mempool_jobs(
43✔
467
        mut global_state_lock: GlobalStateLock,
43✔
468
        update_jobs: Vec<MempoolUpdateJob>,
43✔
469
        job_queue: Arc<TritonVmJobQueue>,
43✔
470
        transaction_update_sender: mpsc::Sender<Vec<MempoolUpdateJobResult>>,
43✔
471
        proof_job_options: TritonVmProofJobOptions,
43✔
472
    ) {
43✔
473
        debug!(
43✔
NEW
474
            "Attempting to update transaction witnesses of {} transactions",
×
475
            update_jobs.len()
×
476
        );
477
        let mut result = vec![];
43✔
478
        for job in update_jobs {
48✔
479
            let txid = job.txid();
5✔
480
            match &job {
5✔
481
                MempoolUpdateJob::PrimitiveWitness(pw_update)
1✔
482
                | MempoolUpdateJob::ProofCollection(pw_update) => {
2✔
483
                    let old_msa = &pw_update.old_primitive_witness.mutator_set_accumulator;
3✔
484

485
                    // Acquire lock, and drop it immediately.
486
                    let msa_update = global_state_lock
3✔
487
                        .lock_guard_mut()
3✔
488
                        .await
3✔
489
                        .chain
490
                        .archival_state_mut()
3✔
491
                        .get_mutator_set_update_to_tip(
3✔
492
                            old_msa,
3✔
493
                            SEARCH_DEPTH_FOR_BLOCKS_FOR_MS_UPDATE,
494
                        )
495
                        .await;
3✔
496
                    let Some(msa_update) = msa_update else {
3✔
NEW
497
                        result.push(MempoolUpdateJobResult::Failure(txid));
×
NEW
498
                        continue;
×
499
                    };
500
                    let new_pw = pw_update
3✔
501
                        .old_primitive_witness
3✔
502
                        .clone()
3✔
503
                        .update_with_new_ms_data(msa_update);
3✔
504
                    let upgraded_tx = match &job {
3✔
505
                        MempoolUpdateJob::PrimitiveWitness(_) => Transaction {
1✔
506
                            kernel: new_pw.kernel.clone(),
1✔
507
                            proof: TransactionProof::Witness(new_pw.clone()),
1✔
508
                        },
1✔
509
                        MempoolUpdateJob::ProofCollection(_) => {
510
                            let pc_job = PrimitiveWitnessToProofCollection {
2✔
511
                                primitive_witness: new_pw.clone(),
2✔
512
                            };
2✔
513

514
                            // No locks may be held here!
515
                            let upgrade_result =
2✔
516
                                pc_job.upgrade(job_queue.clone(), &proof_job_options).await;
2✔
517
                            match upgrade_result {
2✔
518
                                Ok(upgraded) => upgraded,
2✔
519
                                Err(_) => {
NEW
520
                                    result.push(MempoolUpdateJobResult::Failure(txid));
×
NEW
521
                                    continue;
×
522
                                }
523
                            }
524
                        }
NEW
525
                        MempoolUpdateJob::SingleProof { .. } => unreachable!(),
×
526
                    };
527

528
                    result.push(MempoolUpdateJobResult::Success {
3✔
529
                        new_primitive_witness: Some(Box::new(new_pw)),
3✔
530
                        new_transaction: Box::new(upgraded_tx),
3✔
531
                    });
3✔
532
                }
533
                MempoolUpdateJob::SingleProof {
534
                    old_kernel,
2✔
535
                    old_single_proof,
2✔
536
                } => {
537
                    let msa_lookup_result = global_state_lock
2✔
538
                        .lock_guard_mut()
2✔
539
                        .await
2✔
540
                        .chain
541
                        .archival_state_mut()
2✔
542
                        .old_mutator_set_and_mutator_set_update_to_tip(
2✔
543
                            old_kernel.mutator_set_hash,
2✔
544
                            SEARCH_DEPTH_FOR_BLOCKS_FOR_MS_UPDATE,
545
                        )
546
                        .await;
2✔
547
                    let Some((old_mutator_set, mutator_set_update)) = msa_lookup_result else {
2✔
NEW
548
                        result.push(MempoolUpdateJobResult::Failure(txid));
×
NEW
549
                        continue;
×
550
                    };
551
                    let update_job = UpdateMutatorSetDataJob::new(
2✔
552
                        old_kernel.to_owned(),
2✔
553
                        old_single_proof.to_owned(),
2✔
554
                        old_mutator_set,
2✔
555
                        mutator_set_update,
2✔
556
                        UpgradeIncentive::Critical,
2✔
557
                    );
558

559
                    // No locks may be held here!
560
                    let upgrade_result = update_job
2✔
561
                        .upgrade(job_queue.clone(), proof_job_options.clone())
2✔
562
                        .await;
2✔
563
                    let Ok(updated_tx) = upgrade_result else {
2✔
NEW
564
                        result.push(MempoolUpdateJobResult::Failure(txid));
×
NEW
565
                        continue;
×
566
                    };
567

568
                    result.push(MempoolUpdateJobResult::Success {
2✔
569
                        new_primitive_witness: None,
2✔
570
                        new_transaction: Box::new(updated_tx),
2✔
571
                    });
2✔
572
                }
573
            }
574
        }
575

576
        transaction_update_sender
43✔
577
            .send(result)
43✔
578
            .await
43✔
579
            .expect("Receiver for updated txs in main loop must still exist");
43✔
580
    }
43✔
581

582
    /// Handles a list of transactions whose witness data has been updated to be
583
    /// valid under a new mutator set.
584
    async fn handle_updated_mempool_txs(&mut self, update_results: Vec<MempoolUpdateJobResult>) {
41✔
585
        {
586
            let mut state = self.global_state_lock.lock_guard_mut().await;
41✔
587
            for update_result in &update_results {
46✔
588
                match update_result {
5✔
NEW
589
                    MempoolUpdateJobResult::Failure(txkid) => {
×
NEW
590
                        warn!(
×
NEW
591
                            "Failed to update transaction {txkid} to be valid under new mutator \
×
NEW
592
                        set. Removing from the mempool."
×
593
                        );
NEW
594
                        state.mempool_remove(*txkid).await
×
595
                    }
596
                    MempoolUpdateJobResult::Success {
597
                        new_primitive_witness,
5✔
598
                        new_transaction,
5✔
599
                    } => {
600
                        let txid = new_transaction.kernel.txid();
5✔
601
                        info!("Updated transaction {txid} to be valid under new mutator set");
5✔
602

603
                        // First update the primitive-witness data associated with the transaction,
604
                        // then insert the new transaction into the mempool. This ensures that the
605
                        // primitive-witness is as up-to-date as possible in case it has to be
606
                        // updated again later.
607
                        if let Some(new_pw) = new_primitive_witness {
5✔
608
                            state
3✔
609
                                .mempool_update_primitive_witness(txid, *new_pw.to_owned())
3✔
610
                                .await;
3✔
611
                        }
2✔
612
                        state
5✔
613
                            .mempool_insert(*new_transaction.to_owned(), UpgradePriority::Critical)
5✔
614
                            .await;
5✔
615
                    }
616
                }
617
            }
618
        }
619

620
        // Then notify all peers about shareable transactions.
621
        for updated in update_results {
46✔
622
            if let MempoolUpdateJobResult::Success {
623
                new_transaction, ..
5✔
624
            } = updated
5✔
625
            {
626
                if let Ok(pmsg) = new_transaction.as_ref().try_into() {
5✔
627
                    let pmsg = MainToPeerTask::TransactionNotification(pmsg);
4✔
628
                    self.main_to_peer_broadcast(pmsg);
4✔
629
                }
4✔
NEW
630
            }
×
631
        }
632

633
        // Tell miner that it can now continue either composing or guessing.
634
        self.main_to_miner_tx.send(MainToMiner::Continue);
41✔
635
    }
41✔
636

637
    /// Process a block whose PoW solution was solved by this client (or an
638
    /// external program) and has not been seen by the rest of the network yet.
639
    ///
640
    /// Shares block with all connected peers, updates own state, and updates
641
    /// any mempool transactions to be valid under this new block.
642
    ///
643
    /// Locking:
644
    ///  * acquires `global_state_lock` for read and write
645
    async fn handle_self_guessed_block(
1✔
646
        &mut self,
1✔
647
        main_loop_state: &mut MutableMainLoopState,
1✔
648
        new_block: Box<Block>,
1✔
649
    ) -> Result<()> {
29✔
650
        let new_block_hash = new_block.hash();
29✔
651

652
        // clone block in advance, so lock is held less time.
653
        let new_block_clone = (*new_block).clone();
29✔
654

655
        // important!  the is_canonical check and set_new_tip() need to be an
656
        // atomic operation, ie called within the same write-lock acquisition.
657
        //
658
        // this avoids a race condition where block B and C are both more
659
        // canonical than A, but B is more than C, yet C replaces B because it
660
        // was only checked against A.
661
        //
662
        // we release the lock as quickly as possible.
663
        let update_jobs = {
29✔
664
            let mut gsm = self.global_state_lock.lock_guard_mut().await;
29✔
665

666
            // bail out if incoming block is not more canonical than present tip.
667
            if !gsm.incoming_block_is_more_canonical(&new_block) {
29✔
668
                drop(gsm); // drop lock right away before send.
×
669
                warn!("Got new block from miner that was not child of tip. Discarding.");
×
670
                self.main_to_miner_tx.send(MainToMiner::Continue);
×
671
                return Ok(());
×
672
            }
29✔
673

674
            let update_jobs = gsm.set_new_tip(new_block_clone).await?;
29✔
675
            gsm.flush_databases().await?;
29✔
676
            update_jobs
29✔
677
        };
678

679
        // Share block with peers right away.
680
        let pmsg = MainToPeerTask::Block(new_block);
29✔
681
        self.main_to_peer_broadcast(pmsg);
29✔
682

683
        info!("Locally-mined block is new tip: {}", new_block_hash);
29✔
684
        info!("broadcasting new block to peers");
29✔
685

686
        self.spawn_mempool_txs_update_job(main_loop_state, update_jobs);
29✔
687

688
        Ok(())
29✔
689
    }
29✔
690

691
    /// Locking:
692
    ///   * acquires `global_state_lock` for write
693
    async fn handle_miner_task_message(
×
694
        &mut self,
×
695
        msg: MinerToMain,
×
696
        main_loop_state: &mut MutableMainLoopState,
×
697
    ) -> Result<Option<i32>> {
×
698
        match msg {
×
699
            MinerToMain::NewBlockFound(new_block_info) => {
×
700
                log_slow_scope!(fn_name!() + "::MinerToMain::NewBlockFound");
×
701

702
                let new_block = new_block_info.block;
×
703

704
                info!("Miner found new block: {}", new_block.kernel.header.height);
×
705
                self.handle_self_guessed_block(main_loop_state, new_block)
×
706
                    .await?;
×
707
            }
708
            MinerToMain::BlockProposal(boxed_proposal) => {
×
709
                let (block, expected_utxos) = *boxed_proposal;
×
710

711
                // If block proposal from miner does not build on current tip,
712
                // don't broadcast it. This check covers reorgs as well.
713
                let current_tip = self
×
714
                    .global_state_lock
×
715
                    .lock_guard()
×
716
                    .await
×
717
                    .chain
718
                    .light_state()
×
719
                    .clone();
×
720
                if block.header().prev_block_digest != current_tip.hash() {
×
721
                    warn!(
×
722
                        "Got block proposal from miner that does not build on current tip. \
×
723
                           Rejecting. If this happens a lot, then maybe this machine is too \
×
724
                           slow to competitively compose blocks. Consider running the client only \
×
725
                           with the guesser flag set and not the compose flag."
×
726
                    );
727
                    self.main_to_miner_tx.send(MainToMiner::Continue);
×
728
                    return Ok(None);
×
729
                }
×
730

731
                // Ensure proposal validity before sharing
732
                if !block
×
733
                    .is_valid(
×
734
                        &current_tip,
×
735
                        block.header().timestamp,
×
736
                        self.global_state_lock.cli().network,
×
737
                    )
×
738
                    .await
×
739
                {
740
                    error!("Own block proposal invalid. This should not happen.");
×
741
                    self.main_to_miner_tx.send(MainToMiner::Continue);
×
742
                    return Ok(None);
×
743
                }
×
744

745
                if !self.global_state_lock.cli().secret_compositions {
×
746
                    let pmsg = MainToPeerTask::BlockProposalNotification((&block).into());
×
747
                    self.main_to_peer_broadcast(pmsg);
×
748
                }
×
749

750
                {
751
                    // Use block proposal and add expected UTXOs from this
752
                    // proposal.
753
                    let mut state = self.global_state_lock.lock_guard_mut().await;
×
754
                    state.mining_state.block_proposal =
×
755
                        BlockProposal::own_proposal(block.clone(), expected_utxos.clone());
×
756
                    state.wallet_state.add_expected_utxos(expected_utxos).await;
×
757
                }
758

759
                // Indicate to miner that block proposal was successfully
760
                // received by main-loop.
761
                self.main_to_miner_tx.send(MainToMiner::Continue);
×
762
            }
763
            MinerToMain::Shutdown(exit_code) => {
×
764
                return Ok(Some(exit_code));
×
765
            }
766
        }
767

768
        Ok(None)
×
769
    }
×
770

771
    /// Locking:
772
    ///   * acquires `global_state_lock` for write
773
    async fn handle_peer_task_message(
1✔
774
        &mut self,
1✔
775
        msg: PeerTaskToMain,
1✔
776
        main_loop_state: &mut MutableMainLoopState,
1✔
777
    ) -> Result<()> {
19✔
778
        debug!("Received {} from a peer task", msg.get_type());
19✔
779
        let cli_args = self.global_state_lock.cli().clone();
19✔
780
        match msg {
19✔
781
            PeerTaskToMain::NewBlocks(blocks) => {
12✔
782
                log_slow_scope!(fn_name!() + "::PeerTaskToMain::NewBlocks");
12✔
783

784
                let last_block = blocks.last().unwrap().to_owned();
12✔
785
                let update_jobs = {
12✔
786
                    // The peer tasks also check this condition, if block is more canonical than current
787
                    // tip, but we have to check it again since the block update might have already been applied
788
                    // through a message from another peer (or from own miner).
789
                    let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
12✔
790
                    let new_canonical =
12✔
791
                        global_state_mut.incoming_block_is_more_canonical(&last_block);
12✔
792

793
                    if !new_canonical {
12✔
794
                        // The blocks are not canonical, but: if we are in sync
795
                        // mode and these blocks beat our current champion, then
796
                        // we store them anyway, without marking them as tip.
797
                        let Some(sync_anchor) = global_state_mut.net.sync_anchor.as_mut() else {
×
798
                            warn!(
×
799
                                "Blocks were not new, and we're not syncing. Not storing blocks."
×
800
                            );
801
                            return Ok(());
×
802
                        };
803
                        if sync_anchor
×
804
                            .champion
×
805
                            .is_some_and(|(height, _)| height >= last_block.header().height)
×
806
                        {
807
                            warn!("Repeated blocks received in sync mode, not storing");
×
808
                            return Ok(());
×
809
                        }
×
810

811
                        sync_anchor.catch_up(last_block.header().height, last_block.hash());
×
812

813
                        for block in blocks {
×
814
                            global_state_mut.store_block_not_tip(block).await?;
×
815
                        }
816

817
                        global_state_mut.flush_databases().await?;
×
818

819
                        return Ok(());
×
820
                    }
12✔
821

822
                    info!(
12✔
823
                        "Last block from peer is new canonical tip: {}; height: {}",
×
824
                        last_block.hash(),
×
825
                        last_block.header().height
×
826
                    );
827

828
                    // Ask miner to stop work until state update is completed
829
                    self.main_to_miner_tx.send(MainToMiner::WaitForContinue);
12✔
830

831
                    // Get out of sync mode if needed
832
                    if global_state_mut.net.sync_anchor.is_some() {
12✔
833
                        let stay_in_sync_mode = stay_in_sync_mode(
×
834
                            &last_block.kernel.header,
×
835
                            &main_loop_state.sync_state,
×
836
                            cli_args.sync_mode_threshold,
×
837
                        );
838
                        if !stay_in_sync_mode {
×
839
                            info!("Exiting sync mode");
×
840
                            global_state_mut.net.sync_anchor = None;
×
841
                            self.main_to_miner_tx.send(MainToMiner::StopSyncing);
×
842
                        }
×
843
                    }
12✔
844

845
                    let mut update_jobs: Vec<MempoolUpdateJob> = vec![];
12✔
846
                    for new_block in blocks {
24✔
847
                        debug!(
12✔
848
                            "Storing block {} in database. Height: {}, Mined: {}",
×
849
                            new_block.hash(),
×
850
                            new_block.kernel.header.height,
×
851
                            new_block.kernel.header.timestamp.standard_format()
×
852
                        );
853

854
                        // Potential race condition here.
855
                        // What if last block is new and canonical, but first
856
                        // block is already known then we'll store the same block
857
                        // twice. That should be OK though, as the appropriate
858
                        // database entries are simply overwritten with the new
859
                        // block info. See the
860
                        // [GlobalState::tests::setting_same_tip_twice_is_allowed]
861
                        // test for a test of this phenomenon.
862

863
                        let update_jobs_ = global_state_mut.set_new_tip(new_block).await?;
12✔
864
                        update_jobs.extend(update_jobs_);
12✔
865
                    }
866

867
                    global_state_mut.flush_databases().await?;
12✔
868

869
                    update_jobs
12✔
870
                };
871

872
                // Inform all peers about new block
873
                let pmsg = MainToPeerTask::Block(Box::new(last_block.clone()));
12✔
874
                self.main_to_peer_broadcast(pmsg);
12✔
875

876
                // Spawn task to handle mempool tx-updating after new blocks.
877
                // TODO: Do clever trick to collapse all jobs relating to the same transaction,
878
                //       identified by transaction-ID, into *one* update job.
879
                self.spawn_mempool_txs_update_job(main_loop_state, update_jobs);
12✔
880

881
                // Inform miner about new block.
882
                self.main_to_miner_tx.send(MainToMiner::NewBlock);
12✔
883
            }
884
            PeerTaskToMain::AddPeerMaxBlockHeight {
885
                peer_address,
×
886
                claimed_height,
×
887
                claimed_cumulative_pow,
×
888
                claimed_block_mmra,
×
889
            } => {
890
                log_slow_scope!(fn_name!() + "::PeerTaskToMain::AddPeerMaxBlockHeight");
×
891

892
                let claimed_state =
×
893
                    PeerSynchronizationState::new(claimed_height, claimed_cumulative_pow);
×
894
                main_loop_state
×
895
                    .sync_state
×
896
                    .peer_sync_states
×
897
                    .insert(peer_address, claimed_state);
×
898

899
                // Check if synchronization mode should be activated.
900
                // Synchronization mode is entered if accumulated PoW exceeds
901
                // our tip and if the height difference is positive and beyond
902
                // a threshold value.
903
                let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
×
904
                if global_state_mut.sync_mode_criterion(claimed_height, claimed_cumulative_pow)
×
905
                    && global_state_mut
×
906
                        .net
×
907
                        .sync_anchor
×
908
                        .as_ref()
×
909
                        .is_none_or(|sa| sa.cumulative_proof_of_work < claimed_cumulative_pow)
×
910
                {
911
                    info!(
×
912
                        "Entering synchronization mode due to peer {} indicating tip height {}; cumulative pow: {:?}",
×
913
                        peer_address, claimed_height, claimed_cumulative_pow
914
                    );
915
                    global_state_mut.net.sync_anchor =
×
916
                        Some(SyncAnchor::new(claimed_cumulative_pow, claimed_block_mmra));
×
917
                    self.main_to_miner_tx.send(MainToMiner::StartSyncing);
×
918
                }
×
919
            }
920
            PeerTaskToMain::RemovePeerMaxBlockHeight(socket_addr) => {
×
921
                log_slow_scope!(fn_name!() + "::PeerTaskToMain::RemovePeerMaxBlockHeight");
×
922

923
                debug!(
×
924
                    "Removing max block height from sync data structure for peer {}",
×
925
                    socket_addr
926
                );
927
                main_loop_state
×
928
                    .sync_state
×
929
                    .peer_sync_states
×
930
                    .remove(&socket_addr);
×
931

932
                // Get out of sync mode if needed.
933
                let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
×
934

935
                if global_state_mut.net.sync_anchor.is_some() {
×
936
                    let stay_in_sync_mode = stay_in_sync_mode(
×
937
                        global_state_mut.chain.light_state().header(),
×
938
                        &main_loop_state.sync_state,
×
939
                        cli_args.sync_mode_threshold,
×
940
                    );
941
                    if !stay_in_sync_mode {
×
942
                        info!("Exiting sync mode");
×
943
                        global_state_mut.net.sync_anchor = None;
×
944
                    }
×
945
                }
×
946
            }
947
            PeerTaskToMain::PeerDiscoveryAnswer((pot_peers, reported_by, distance)) => {
3✔
948
                log_slow_scope!(fn_name!() + "::PeerTaskToMain::PeerDiscoveryAnswer");
3✔
949

950
                let max_peers = self.global_state_lock.cli().max_num_peers;
3✔
951
                for pot_peer in pot_peers {
6✔
952
                    main_loop_state.potential_peers.add(
3✔
953
                        reported_by,
3✔
954
                        pot_peer,
3✔
955
                        max_peers,
3✔
956
                        distance,
3✔
957
                        self.now(),
3✔
958
                    );
3✔
959
                }
3✔
960
            }
961
            PeerTaskToMain::Transaction(pt2m_transaction) => {
3✔
962
                log_slow_scope!(fn_name!() + "::PeerTaskToMain::Transaction");
3✔
963

964
                debug!(
3✔
965
                    "`peer_loop` received following transaction from peer. {} inputs, {} outputs. Synced to mutator set hash: {}",
×
966
                    pt2m_transaction.transaction.kernel.inputs.len(),
×
967
                    pt2m_transaction.transaction.kernel.outputs.len(),
×
968
                    pt2m_transaction.transaction.kernel.mutator_set_hash
×
969
                );
970

971
                {
972
                    let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
3✔
973
                    if pt2m_transaction.confirmable_for_block
3✔
974
                        != global_state_mut.chain.light_state().hash()
3✔
975
                    {
976
                        warn!("main loop got unmined transaction with bad mutator set data, discarding transaction");
×
977
                        return Ok(());
×
978
                    }
3✔
979

980
                    global_state_mut
3✔
981
                        .mempool_insert(
3✔
982
                            pt2m_transaction.transaction.to_owned(),
3✔
983
                            UpgradePriority::Irrelevant,
3✔
984
                        )
3✔
985
                        .await;
3✔
986
                }
987

988
                // send notification to peers
989
                let transaction_notification: TransactionNotification =
3✔
990
                    (&pt2m_transaction.transaction).try_into()?;
3✔
991

992
                let pmsg = MainToPeerTask::TransactionNotification(transaction_notification);
3✔
993
                self.main_to_peer_broadcast(pmsg);
3✔
994
            }
995
            PeerTaskToMain::BlockProposal(block) => {
×
996
                log_slow_scope!(fn_name!() + "::PeerTaskToMain::BlockProposal");
×
997

998
                debug!("main loop received block proposal from peer loop");
×
999

1000
                // Due to race-conditions, we need to verify that this
1001
                // block proposal is still the immediate child of tip. If it is,
1002
                // and it has a higher guesser fee than what we're currently
1003
                // working on, then we switch to this, and notify the miner to
1004
                // mine on this new block. We don't need to verify the block's
1005
                // validity, since that was done in peer loop.
1006
                // To ensure atomicity, a write-lock must be held over global
1007
                // state while we check if this proposal is favorable.
1008
                {
1009
                    info!("Received new favorable block proposal for mining operation.");
×
1010
                    let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
×
1011
                    let verdict = global_state_mut.favor_incoming_block_proposal(
×
1012
                        block.header().prev_block_digest,
×
1013
                        block
×
1014
                            .total_guesser_reward()
×
1015
                            .expect("block received by main loop must have guesser reward"),
×
1016
                    );
×
1017
                    if let Err(reject_reason) = verdict {
×
1018
                        warn!("main loop got unfavorable block proposal. Reason: {reject_reason}");
×
1019
                        return Ok(());
×
1020
                    }
×
1021

1022
                    global_state_mut.mining_state.block_proposal =
×
1023
                        BlockProposal::foreign_proposal(*block.clone());
×
1024
                }
1025

1026
                // Notify all peers of the block proposal we just accepted
1027
                let pmsg = MainToPeerTask::BlockProposalNotification((&*block).into());
×
1028
                self.main_to_peer_broadcast(pmsg);
×
1029

1030
                self.main_to_miner_tx.send(MainToMiner::NewBlockProposal);
×
1031
            }
1032
            PeerTaskToMain::DisconnectFromLongestLivedPeer => {
1033
                let global_state = self.global_state_lock.lock_guard().await;
1✔
1034

1035
                // get all peers
1036
                let all_peers = global_state.net.peer_map.iter();
1✔
1037

1038
                // filter out CLI peers
1039
                let disconnect_candidates =
1✔
1040
                    all_peers.filter(|p| !global_state.cli_peers().contains(p.0));
5✔
1041

1042
                // find the one with the oldest connection
1043
                let longest_lived_peer = disconnect_candidates.min_by(
1✔
1044
                    |(_socket_address_left, peer_info_left),
1045
                     (_socket_address_right, peer_info_right)| {
4✔
1046
                        peer_info_left
4✔
1047
                            .connection_established()
4✔
1048
                            .cmp(&peer_info_right.connection_established())
4✔
1049
                    },
4✔
1050
                );
1051

1052
                // tell to disconnect
1053
                if let Some((peer_socket, _peer_info)) = longest_lived_peer {
1✔
1054
                    let pmsg = MainToPeerTask::Disconnect(peer_socket.to_owned());
1✔
1055
                    self.main_to_peer_broadcast(pmsg);
1✔
1056
                }
1✔
1057
            }
1058
        }
1059

1060
        Ok(())
19✔
1061
    }
19✔
1062

1063
    /// If necessary, disconnect from peers.
1064
    ///
1065
    /// While a reasonable effort is made to never have more connections than
1066
    /// [`max_num_peers`](crate::config_models::cli_args::Args::max_num_peers),
1067
    /// this is not guaranteed. For example, bootstrap nodes temporarily allow a
1068
    /// surplus of incoming connections to provide their service more reliably.
1069
    ///
1070
    /// Never disconnects peers listed as CLI arguments.
1071
    ///
1072
    /// Locking:
1073
    ///   * acquires `global_state_lock` for read
1074
    async fn prune_peers(&self) -> Result<()> {
2✔
1075
        // fetch all relevant info from global state; don't hold the lock
1076
        let cli_args = self.global_state_lock.cli();
2✔
1077
        let connected_peers = self
2✔
1078
            .global_state_lock
2✔
1079
            .lock_guard()
2✔
1080
            .await
2✔
1081
            .net
1082
            .peer_map
1083
            .values()
2✔
1084
            .cloned()
2✔
1085
            .collect_vec();
2✔
1086

1087
        let num_peers = connected_peers.len();
2✔
1088
        let max_num_peers = cli_args.max_num_peers;
2✔
1089
        if num_peers <= max_num_peers {
2✔
1090
            debug!("No need to prune any peer connections.");
1✔
1091
            return Ok(());
1✔
1092
        }
1✔
1093
        warn!("Connected to {num_peers} peers, which exceeds the maximum ({max_num_peers}).");
1✔
1094

1095
        // If all connections are outbound, it's OK to exceed the max.
1096
        if connected_peers.iter().all(|p| p.connection_is_outbound()) {
2✔
1097
            warn!("Not disconnecting from any peer because all connections are outbound.");
×
1098
            return Ok(());
×
1099
        }
1✔
1100

1101
        let num_peers_to_disconnect = num_peers - max_num_peers;
1✔
1102
        let peers_to_disconnect = connected_peers
1✔
1103
            .into_iter()
1✔
1104
            .filter(|peer| !cli_args.peers.contains(&peer.connected_address()))
14✔
1105
            .choose_multiple(&mut rand::rng(), num_peers_to_disconnect);
1✔
1106
        match peers_to_disconnect.len() {
1✔
1107
            0 => warn!("Not disconnecting from any peer because of manual override."),
×
1108
            i => info!("Disconnecting from {i} peers."),
1✔
1109
        }
1110
        for peer in peers_to_disconnect {
5✔
1111
            let pmsg = MainToPeerTask::Disconnect(peer.connected_address());
4✔
1112
            self.main_to_peer_broadcast(pmsg);
4✔
1113
        }
4✔
1114

1115
        Ok(())
1✔
1116
    }
2✔
1117

1118
    /// If necessary, reconnect to the peers listed as CLI arguments.
1119
    ///
1120
    /// Locking:
1121
    ///   * acquires `global_state_lock` for read
1122
    async fn reconnect(&self, main_loop_state: &mut MutableMainLoopState) -> Result<()> {
×
1123
        let connected_peers = self
×
1124
            .global_state_lock
×
1125
            .lock_guard()
×
1126
            .await
×
1127
            .net
1128
            .peer_map
1129
            .keys()
×
1130
            .copied()
×
1131
            .collect_vec();
×
1132
        let peers_with_lost_connection = self
×
1133
            .global_state_lock
×
1134
            .cli()
×
1135
            .peers
×
1136
            .iter()
×
1137
            .filter(|peer| !connected_peers.contains(peer));
×
1138

1139
        // If no connection was lost, there's nothing to do.
1140
        if peers_with_lost_connection.clone().count() == 0 {
×
1141
            return Ok(());
×
1142
        }
×
1143

1144
        // Else, try to reconnect.
1145
        let own_handshake_data = self
×
1146
            .global_state_lock
×
1147
            .lock_guard()
×
1148
            .await
×
1149
            .get_own_handshakedata();
×
1150
        for &peer_with_lost_connection in peers_with_lost_connection {
×
1151
            // Disallow reconnection if peer is in bad standing
1152
            let peer_standing = self
×
1153
                .global_state_lock
×
1154
                .lock_guard()
×
1155
                .await
×
1156
                .net
1157
                .get_peer_standing_from_database(peer_with_lost_connection.ip())
×
1158
                .await;
×
1159
            if peer_standing.is_some_and(|standing| standing.is_bad()) {
×
1160
                info!("Not reconnecting to peer in bad standing: {peer_with_lost_connection}");
×
1161
                continue;
×
1162
            }
×
1163

1164
            info!("Attempting to reconnect to peer: {peer_with_lost_connection}");
×
1165
            let global_state_lock = self.global_state_lock.clone();
×
1166
            let main_to_peer_broadcast_rx = self.main_to_peer_broadcast_tx.subscribe();
×
1167
            let peer_task_to_main_tx = self.peer_task_to_main_tx.to_owned();
×
1168
            let own_handshake_data = own_handshake_data.clone();
×
1169
            let outgoing_connection_task = tokio::task::Builder::new()
×
1170
                .name("call_peer_wrapper_1")
×
1171
                .spawn(async move {
×
1172
                    call_peer(
×
1173
                        peer_with_lost_connection,
×
1174
                        global_state_lock,
×
1175
                        main_to_peer_broadcast_rx,
×
1176
                        peer_task_to_main_tx,
×
1177
                        own_handshake_data,
×
1178
                        1, // All CLI-specified peers have distance 1
×
1179
                    )
×
1180
                    .await;
×
1181
                })?;
×
1182
            main_loop_state.task_handles.push(outgoing_connection_task);
×
1183
            main_loop_state.task_handles.retain(|th| !th.is_finished());
×
1184
        }
1185

1186
        Ok(())
×
1187
    }
×
1188

1189
    /// Perform peer discovery.
1190
    ///
1191
    /// Peer discovery involves finding potential peers from connected peers
1192
    /// and attempts to establish a connection with one of them.
1193
    ///
1194
    /// Locking:
1195
    ///   * acquires `global_state_lock` for read
1196
    async fn discover_peers(&self, main_loop_state: &mut MutableMainLoopState) -> Result<()> {
2✔
1197
        // fetch all relevant info from global state, then release the lock
1198
        let cli_args = self.global_state_lock.cli();
2✔
1199
        let global_state = self.global_state_lock.lock_guard().await;
2✔
1200
        let connected_peers = global_state.net.peer_map.values().cloned().collect_vec();
2✔
1201
        let own_instance_id = global_state.net.instance_id;
2✔
1202
        let own_handshake_data = global_state.get_own_handshakedata();
2✔
1203
        drop(global_state);
2✔
1204

1205
        let num_peers = connected_peers.len();
2✔
1206
        let max_num_peers = cli_args.max_num_peers;
2✔
1207

1208
        // Don't make an outgoing connection if
1209
        // - the peer limit is reached (or exceeded), or
1210
        // - the peer limit is _almost_ reached; reserve the last slot for an
1211
        //   incoming connection.
1212
        if num_peers >= max_num_peers || num_peers > 2 && num_peers - 1 == max_num_peers {
2✔
1213
            info!("Connected to {num_peers} peers. The configured max is {max_num_peers} peers.");
1✔
1214
            info!("Skipping peer discovery.");
1✔
1215
            return Ok(());
1✔
1216
        }
1✔
1217

1218
        info!("Performing peer discovery");
1✔
1219

1220
        // Ask all peers for their peer lists. This will eventually – once the
1221
        // responses have come in – update the list of potential peers.
1222
        let pmsg = MainToPeerTask::MakePeerDiscoveryRequest;
1✔
1223
        self.main_to_peer_broadcast(pmsg);
1✔
1224

1225
        // Get a peer candidate from the list of potential peers. Generally,
1226
        // the peer lists requested in the previous step will not have come in
1227
        // yet. Therefore, the new candidate is selected based on somewhat
1228
        // (but not overly) old information.
1229
        let Some((peer_candidate, candidate_distance)) = main_loop_state
1✔
1230
            .potential_peers
1✔
1231
            .get_candidate(&connected_peers, own_instance_id)
1✔
1232
        else {
1233
            info!("Found no peer candidate to connect to. Not making new connection.");
1✔
1234
            return Ok(());
1✔
1235
        };
1236

1237
        // Try to connect to the selected candidate.
1238
        info!("Connecting to peer {peer_candidate} with distance {candidate_distance}");
×
1239
        let global_state_lock = self.global_state_lock.clone();
×
1240
        let main_to_peer_broadcast_rx = self.main_to_peer_broadcast_tx.subscribe();
×
1241
        let peer_task_to_main_tx = self.peer_task_to_main_tx.to_owned();
×
1242
        let outgoing_connection_task = tokio::task::Builder::new()
×
1243
            .name("call_peer_wrapper_2")
×
1244
            .spawn(async move {
×
1245
                call_peer(
×
1246
                    peer_candidate,
×
1247
                    global_state_lock,
×
1248
                    main_to_peer_broadcast_rx,
×
1249
                    peer_task_to_main_tx,
×
1250
                    own_handshake_data,
×
1251
                    candidate_distance,
×
1252
                )
×
1253
                .await;
×
1254
            })?;
×
1255
        main_loop_state.task_handles.push(outgoing_connection_task);
×
1256
        main_loop_state.task_handles.retain(|th| !th.is_finished());
×
1257

1258
        // Immediately request the new peer's peer list. This allows
1259
        // incorporating the new peer's peers into the list of potential peers,
1260
        // to be used in the next round of peer discovery.
1261
        let m2pmsg = MainToPeerTask::MakeSpecificPeerDiscoveryRequest(peer_candidate);
×
1262
        self.main_to_peer_broadcast(m2pmsg);
×
1263

1264
        Ok(())
×
1265
    }
2✔
1266

1267
    /// Return a list of block heights for a block-batch request.
1268
    ///
1269
    /// Returns an ordered list of the heights of *most preferred block*
1270
    /// to build on, where current tip is always the most preferred block.
1271
    ///
1272
    /// Uses a factor to ensure that the peer will always have something to
1273
    /// build on top of by providing potential starting points all the way
1274
    /// back to genesis.
1275
    fn batch_request_uca_candidate_heights(own_tip_height: BlockHeight) -> Vec<BlockHeight> {
258✔
1276
        const FACTOR: f64 = 1.07f64;
1277

1278
        let mut look_behind = 0;
258✔
1279
        let mut ret = vec![];
258✔
1280

1281
        // A factor of 1.07 can look back ~1m blocks in 200 digests.
1282
        while ret.len() < MAX_NUM_DIGESTS_IN_BATCH_REQUEST - 1 {
51,374✔
1283
            let height = match own_tip_height.checked_sub(look_behind) {
51,118✔
1284
                None => break,
1✔
1285
                Some(height) if height.is_genesis() => break,
51,117✔
1286
                Some(height) => height,
51,116✔
1287
            };
1288

1289
            ret.push(height);
51,116✔
1290
            look_behind = ((look_behind as f64 + 1.0) * FACTOR).floor() as u64;
51,116✔
1291
        }
1292

1293
        ret.push(BlockHeight::genesis());
258✔
1294

1295
        ret
258✔
1296
    }
258✔
1297

1298
    /// Logic for requesting the batch-download of blocks from peers
1299
    ///
1300
    /// Locking:
1301
    ///   * acquires `global_state_lock` for read
1302
    async fn block_sync(&mut self, main_loop_state: &mut MutableMainLoopState) -> Result<()> {
63✔
1303
        let global_state = self.global_state_lock.lock_guard().await;
63✔
1304

1305
        // Check if we are in sync mode
1306
        let Some(anchor) = &global_state.net.sync_anchor else {
63✔
1307
            return Ok(());
61✔
1308
        };
1309

1310
        info!("Running sync");
2✔
1311

1312
        let (own_tip_hash, own_tip_height, own_cumulative_pow) = (
2✔
1313
            global_state.chain.light_state().hash(),
2✔
1314
            global_state.chain.light_state().kernel.header.height,
2✔
1315
            global_state
2✔
1316
                .chain
2✔
1317
                .light_state()
2✔
1318
                .kernel
2✔
1319
                .header
2✔
1320
                .cumulative_proof_of_work,
2✔
1321
        );
2✔
1322

1323
        // Check if sync mode has timed out entirely, in which case it should
1324
        // be abandoned.
1325
        let anchor = anchor.to_owned();
2✔
1326
        if self.now().duration_since(anchor.updated)? > GLOBAL_SYNCHRONIZATION_TIMEOUT {
2✔
1327
            warn!("Sync mode has timed out. Abandoning sync mode.");
1✔
1328

1329
            // Abandon attempt, and punish all peers claiming to serve these
1330
            // blocks.
1331
            drop(global_state);
1✔
1332
            self.global_state_lock
1✔
1333
                .lock_guard_mut()
1✔
1334
                .await
1✔
1335
                .net
1336
                .sync_anchor = None;
1✔
1337

1338
            let peers_to_punish = main_loop_state
1✔
1339
                .sync_state
1✔
1340
                .get_potential_peers_for_sync_request(own_cumulative_pow);
1✔
1341

1342
            for peer in peers_to_punish {
2✔
1343
                let pmsg = MainToPeerTask::PeerSynchronizationTimeout(peer);
1✔
1344
                self.main_to_peer_broadcast(pmsg);
1✔
1345
            }
1✔
1346

1347
            return Ok(());
1✔
1348
        }
1✔
1349

1350
        let (peer_to_sanction, try_new_request): (Option<SocketAddr>, bool) = main_loop_state
1✔
1351
            .sync_state
1✔
1352
            .get_status_of_last_request(own_tip_height, self.now());
1✔
1353

1354
        // Sanction peer if they failed to respond
1355
        if let Some(peer) = peer_to_sanction {
1✔
1356
            let pmsg = MainToPeerTask::PeerSynchronizationTimeout(peer);
×
1357
            self.main_to_peer_broadcast(pmsg);
×
1358
        }
1✔
1359

1360
        if !try_new_request {
1✔
1361
            info!("Waiting for last sync to complete.");
×
1362
            return Ok(());
×
1363
        }
1✔
1364

1365
        // Create the next request from the reported
1366
        info!("Creating new sync request");
1✔
1367

1368
        // Pick a random peer that has reported to have relevant blocks
1369
        let candidate_peers = main_loop_state
1✔
1370
            .sync_state
1✔
1371
            .get_potential_peers_for_sync_request(own_cumulative_pow);
1✔
1372
        let chosen_peer = candidate_peers.choose(&mut rand::rng());
1✔
1373
        assert!(
1✔
1374
            chosen_peer.is_some(),
1✔
1375
            "A synchronization candidate must be available for a request. \
×
1376
            Otherwise, the data structure is in an invalid state and syncing should not be active"
×
1377
        );
1378

1379
        let ordered_preferred_block_digests = match anchor.champion {
1✔
1380
            Some((_height, digest)) => vec![digest],
×
1381
            None => {
1382
                // Find candidate-UCA digests based on a sparse distribution of
1383
                // block heights skewed towards own tip height
1384
                let request_heights = Self::batch_request_uca_candidate_heights(own_tip_height);
1✔
1385
                let mut ordered_preferred_block_digests = vec![];
1✔
1386
                for height in request_heights {
2✔
1387
                    let digest = global_state
1✔
1388
                        .chain
1✔
1389
                        .archival_state()
1✔
1390
                        .archival_block_mmr
1✔
1391
                        .ammr()
1✔
1392
                        .get_leaf_async(height.into())
1✔
1393
                        .await;
1✔
1394
                    ordered_preferred_block_digests.push(digest);
1✔
1395
                }
1396
                ordered_preferred_block_digests
1✔
1397
            }
1398
        };
1399

1400
        // Send message to the relevant peer loop to request the blocks
1401
        let chosen_peer = chosen_peer.unwrap();
1✔
1402
        info!(
1✔
1403
            "Sending block batch request to {}\nrequesting blocks descending from {}\n height {}",
×
1404
            chosen_peer, own_tip_hash, own_tip_height
1405
        );
1406
        let pmsg = MainToPeerTask::RequestBlockBatch(MainToPeerTaskBatchBlockRequest {
1✔
1407
            peer_addr_target: *chosen_peer,
1✔
1408
            known_blocks: ordered_preferred_block_digests,
1✔
1409
            anchor_mmr: anchor.block_mmr.clone(),
1✔
1410
        });
1✔
1411
        self.main_to_peer_broadcast(pmsg);
1✔
1412

1413
        // Record that this request was sent to the peer
1414
        let requested_block_height = own_tip_height.next();
1✔
1415
        main_loop_state
1✔
1416
            .sync_state
1✔
1417
            .record_request(requested_block_height, *chosen_peer, self.now());
1✔
1418

1419
        Ok(())
1✔
1420
    }
63✔
1421

1422
    /// Scheduled task for upgrading the proofs of transactions in the mempool.
1423
    ///
1424
    /// Will either perform a merge of two transactions supported with single
1425
    /// proofs, or will upgrade a transaction proof of the type
1426
    /// `ProofCollection` to `SingleProof`.
1427
    ///
1428
    /// All proving takes place in a spawned task such that it doesn't block
1429
    /// the main loop. The MutableMainLoopState gets the JoinHandle of the
1430
    /// spawned upgrade task such that its status can be expected.
1431
    async fn proof_upgrader(&mut self, main_loop_state: &mut MutableMainLoopState) -> Result<()> {
25✔
1432
        fn attempt_upgrade(
24✔
1433
            global_state: &GlobalState,
24✔
1434
            main_loop_state: &MutableMainLoopState,
24✔
1435
        ) -> bool {
24✔
1436
            let previous_upgrade_task_is_still_running = main_loop_state
24✔
1437
                .proof_upgrader_task
24✔
1438
                .as_ref()
24✔
1439
                .is_some_and(|x| !x.is_finished());
24✔
1440
            global_state.cli().tx_proof_upgrading
24✔
1441
                && global_state.net.sync_anchor.is_none()
2✔
1442
                && global_state.proving_capability() == TxProvingCapability::SingleProof
2✔
1443
                && !previous_upgrade_task_is_still_running
2✔
1444
        }
24✔
1445

1446
        trace!("Running proof upgrader scheduled task");
25✔
1447

1448
        // Check if it's time to run the proof-upgrader, and if we're capable
1449
        // of upgrading a transaction proof.
1450
        let upgrade_candidate = {
1✔
1451
            let mut global_state = self.global_state_lock.lock_guard_mut().await;
25✔
1452
            if !attempt_upgrade(&global_state, main_loop_state) {
24✔
1453
                trace!("Not attempting upgrade.");
22✔
1454
                return Ok(());
22✔
1455
            }
2✔
1456

1457
            debug!("Attempting to run transaction-proof-upgrade");
2✔
1458

1459
            // Find a candidate for proof upgrade
1460
            let Some(upgrade_candidate) = get_upgrade_task_from_mempool(&mut global_state).await
2✔
1461
            else {
1462
                debug!("Found no transaction-proof to upgrade");
1✔
1463
                return Ok(());
1✔
1464
            };
1465

1466
            upgrade_candidate
1✔
1467
        };
1468

1469
        info!(
1✔
1470
            "Attempting to upgrade transaction proofs of: {}",
×
1471
            upgrade_candidate.affected_txids().iter().join("; ")
×
1472
        );
1473

1474
        // Perform the upgrade, if we're not using the prover for anything else,
1475
        // like mining, or proving our own transaction. Running the prover takes
1476
        // a long time (minutes), so we spawn a task for this such that we do
1477
        // not block the main loop.
1478
        let vm_job_queue = vm_job_queue();
1✔
1479

1480
        let global_state_lock_clone = self.global_state_lock.clone();
1✔
1481
        let main_to_peer_broadcast_tx_clone = self.main_to_peer_broadcast_tx.clone();
1✔
1482
        let proof_upgrader_task =
1✔
1483
            tokio::task::Builder::new()
1✔
1484
                .name("proof_upgrader")
1✔
1485
                .spawn(async move {
1✔
1486
                    upgrade_candidate
1✔
1487
                        .handle_upgrade(
1✔
1488
                            vm_job_queue,
1✔
1489
                            global_state_lock_clone,
1✔
1490
                            main_to_peer_broadcast_tx_clone,
1✔
1491
                        )
1✔
1492
                        .await
1✔
1493
                })?;
1✔
1494

1495
        main_loop_state.proof_upgrader_task = Some(proof_upgrader_task);
1✔
1496

1497
        Ok(())
1✔
1498
    }
24✔
1499

1500
    /// Post-processing when new block has arrived. Spawn a task to update
1501
    /// transactions in the mempool. Only when the spawned task has completed,
1502
    /// should the miner continue.
1503
    fn spawn_mempool_txs_update_job(
41✔
1504
        &self,
41✔
1505
        main_loop_state: &mut MutableMainLoopState,
41✔
1506
        update_jobs: Vec<MempoolUpdateJob>,
41✔
1507
    ) {
41✔
1508
        // job completion of the spawned task is communicated through the
1509
        // `update_mempool_txs_handle` channel.
1510
        let vm_job_queue = vm_job_queue();
41✔
1511
        if let Some(handle) = main_loop_state.update_mempool_txs_handle.as_ref() {
41✔
1512
            handle.abort();
29✔
1513
        }
29✔
1514
        let (update_sender, update_receiver) =
41✔
1515
            mpsc::channel::<Vec<MempoolUpdateJobResult>>(TX_UPDATER_CHANNEL_CAPACITY);
41✔
1516

1517
        // note: if this task is cancelled, the job will continue
1518
        // because TritonVmJobOptions::cancel_job_rx is None.
1519
        // see how compose_task handles cancellation in mine_loop.
1520
        let job_options = self
41✔
1521
            .global_state_lock
41✔
1522
            .cli()
41✔
1523
            .proof_job_options(TritonVmJobPriority::Highest);
41✔
1524
        let global_state_lock = self.global_state_lock.clone();
41✔
1525
        main_loop_state.update_mempool_txs_handle = Some(
41✔
1526
            tokio::task::Builder::new()
41✔
1527
                .name("mempool tx ms-updater")
41✔
1528
                .spawn(async move {
41✔
1529
                    Self::update_mempool_jobs(
40✔
1530
                        global_state_lock,
40✔
1531
                        update_jobs,
40✔
1532
                        vm_job_queue.clone(),
40✔
1533
                        update_sender,
40✔
1534
                        job_options,
40✔
1535
                    )
40✔
1536
                    .await
40✔
1537
                })
40✔
1538
                .unwrap(),
41✔
1539
        );
1540
        main_loop_state.update_mempool_receiver = update_receiver;
41✔
1541
    }
41✔
1542

1543
    pub async fn run(&mut self) -> Result<i32> {
11✔
1544
        info!("Starting main loop");
11✔
1545

1546
        let task_handles = std::mem::take(&mut self.task_handles);
11✔
1547

1548
        // Handle incoming connections, messages from peer tasks, and messages from the mining task
1549
        let mut main_loop_state = MutableMainLoopState::new(task_handles);
11✔
1550

1551
        // Set up various timers.
1552
        //
1553
        // The `MissedTickBehavior::Delay` is appropriate for tasks that don't
1554
        // do anything meaningful if executed in quick succession. For example,
1555
        // pruning stale information immediately after pruning stale information
1556
        // is almost certainly a no-op.
1557
        // Similarly, tasks performing network operations (e.g., peer discovery)
1558
        // should probably not try to “catch up” if some ticks were missed.
1559

1560
        // Don't run peer discovery immediately at startup since outgoing
1561
        // connections started from lib.rs may not have finished yet.
1562
        let mut peer_discovery_interval = time::interval_at(
11✔
1563
            Instant::now() + PEER_DISCOVERY_INTERVAL,
11✔
1564
            PEER_DISCOVERY_INTERVAL,
1565
        );
1566
        peer_discovery_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
11✔
1567

1568
        let mut block_sync_interval = time::interval(SYNC_REQUEST_INTERVAL);
11✔
1569
        block_sync_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
11✔
1570

1571
        let mut mempool_cleanup_interval = time::interval(MEMPOOL_PRUNE_INTERVAL);
11✔
1572
        mempool_cleanup_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
11✔
1573

1574
        let mut utxo_notification_cleanup_interval = time::interval(EXPECTED_UTXOS_PRUNE_INTERVAL);
11✔
1575
        utxo_notification_cleanup_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
11✔
1576

1577
        let mut mp_resync_interval = time::interval(MP_RESYNC_INTERVAL);
11✔
1578
        mp_resync_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
11✔
1579

1580
        let mut tx_proof_upgrade_interval = time::interval(PROOF_UPGRADE_INTERVAL);
11✔
1581
        tx_proof_upgrade_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
11✔
1582

1583
        // Spawn tasks to monitor for SIGTERM, SIGINT, and SIGQUIT. These
1584
        // signals are only used on Unix systems.
1585
        let (tx_term, mut rx_term) = mpsc::channel::<()>(2);
11✔
1586
        let (tx_int, mut rx_int) = mpsc::channel::<()>(2);
11✔
1587
        let (tx_quit, mut rx_quit) = mpsc::channel::<()>(2);
11✔
1588
        #[cfg(unix)]
1589
        {
1590
            use tokio::signal::unix::signal;
1591
            use tokio::signal::unix::SignalKind;
1592

1593
            // Monitor for SIGTERM
1594
            let mut sigterm = signal(SignalKind::terminate())?;
11✔
1595
            tokio::task::Builder::new()
11✔
1596
                .name("sigterm_handler")
11✔
1597
                .spawn(async move {
11✔
1598
                    if sigterm.recv().await.is_some() {
11✔
1599
                        info!("Received SIGTERM");
×
1600
                        tx_term.send(()).await.unwrap();
×
1601
                    }
×
1602
                })?;
×
1603

1604
            // Monitor for SIGINT
1605
            let mut sigint = signal(SignalKind::interrupt())?;
11✔
1606
            tokio::task::Builder::new()
11✔
1607
                .name("sigint_handler")
11✔
1608
                .spawn(async move {
11✔
1609
                    if sigint.recv().await.is_some() {
11✔
1610
                        info!("Received SIGINT");
×
1611
                        tx_int.send(()).await.unwrap();
×
1612
                    }
×
1613
                })?;
×
1614

1615
            // Monitor for SIGQUIT
1616
            let mut sigquit = signal(SignalKind::quit())?;
11✔
1617
            tokio::task::Builder::new()
11✔
1618
                .name("sigquit_handler")
11✔
1619
                .spawn(async move {
11✔
1620
                    if sigquit.recv().await.is_some() {
11✔
1621
                        info!("Received SIGQUIT");
×
1622
                        tx_quit.send(()).await.unwrap();
×
1623
                    }
×
1624
                })?;
×
1625
        }
1626

1627
        #[cfg(not(unix))]
1628
        drop((tx_term, tx_int, tx_quit));
1629

1630
        let exit_code: i32 = loop {
×
1631
            select! {
221✔
1632
                Ok(()) = signal::ctrl_c() => {
221✔
1633
                    info!("Detected Ctrl+c signal.");
×
1634
                    break SUCCESS_EXIT_CODE;
×
1635
                }
1636

1637
                // Monitor for SIGTERM, SIGINT, and SIGQUIT.
1638
                Some(_) = rx_term.recv() => {
221✔
1639
                    info!("Detected SIGTERM signal.");
×
1640
                    break SUCCESS_EXIT_CODE;
×
1641
                }
1642
                Some(_) = rx_int.recv() => {
221✔
1643
                    info!("Detected SIGINT signal.");
×
1644
                    break SUCCESS_EXIT_CODE;
×
1645
                }
1646
                Some(_) = rx_quit.recv() => {
221✔
1647
                    info!("Detected SIGQUIT signal.");
×
1648
                    break SUCCESS_EXIT_CODE;
×
1649
                }
1650

1651
                // Handle incoming connections from peer
1652
                Ok((stream, peer_address)) = self.incoming_peer_listener.accept() => {
221✔
1653
                    // Return early if no incoming connections are accepted. Do
1654
                    // not send application-handshake.
1655
                    if self.global_state_lock.cli().disallow_all_incoming_peer_connections() {
3✔
1656
                        warn!("Got incoming connection despite not accepting any. Ignoring");
×
1657
                        continue;
×
1658
                    }
3✔
1659

1660
                    let state = self.global_state_lock.lock_guard().await;
3✔
1661
                    let main_to_peer_broadcast_rx_clone: broadcast::Receiver<MainToPeerTask> = self.main_to_peer_broadcast_tx.subscribe();
3✔
1662
                    let peer_task_to_main_tx_clone: mpsc::Sender<PeerTaskToMain> = self.peer_task_to_main_tx.clone();
3✔
1663
                    let own_handshake_data: HandshakeData = state.get_own_handshakedata();
3✔
1664
                    let global_state_lock = self.global_state_lock.clone(); // bump arc refcount.
3✔
1665
                    let incoming_peer_task_handle = tokio::task::Builder::new()
3✔
1666
                        .name("answer_peer_wrapper")
3✔
1667
                        .spawn(async move {
3✔
1668
                        match answer_peer(
3✔
1669
                            stream,
3✔
1670
                            global_state_lock,
3✔
1671
                            peer_address,
3✔
1672
                            main_to_peer_broadcast_rx_clone,
3✔
1673
                            peer_task_to_main_tx_clone,
3✔
1674
                            own_handshake_data,
3✔
1675
                        ).await {
3✔
1676
                            Ok(()) => (),
×
1677
                            Err(err) => error!("Got error: {:?}", err),
×
1678
                        }
1679
                    })?;
×
1680
                    main_loop_state.task_handles.push(incoming_peer_task_handle);
3✔
1681
                    main_loop_state.task_handles.retain(|th| !th.is_finished());
9✔
1682
                }
1683

1684
                // Handle messages from peer tasks
1685
                Some(msg) = self.peer_task_to_main_rx.recv() => {
221✔
1686
                    debug!("Received message sent to main task.");
18✔
1687
                    self.handle_peer_task_message(
18✔
1688
                        msg,
18✔
1689
                        &mut main_loop_state,
18✔
1690
                    )
18✔
1691
                    .await?
18✔
1692
                }
1693

1694
                // Handle messages from miner task
1695
                Some(main_message) = self.miner_to_main_rx.recv() => {
221✔
1696
                    let exit_code = self.handle_miner_task_message(main_message, &mut main_loop_state).await?;
×
1697

1698
                    if let Some(exit_code) = exit_code {
×
1699
                        break exit_code;
×
1700
                    }
×
1701

1702
                }
1703

1704
                // Handle the completion of mempool tx-update jobs after new block.
1705
                Some(ms_updated_transactions) = main_loop_state.update_mempool_receiver.recv() => {
221✔
1706
                    self.handle_updated_mempool_txs(ms_updated_transactions).await;
38✔
1707
                }
1708

1709
                // Handle messages from rpc server task
1710
                Some(rpc_server_message) = self.rpc_server_to_main_rx.recv() => {
221✔
1711
                    let shutdown_after_execution = self.handle_rpc_server_message(rpc_server_message.clone(), &mut main_loop_state).await?;
36✔
1712
                    if shutdown_after_execution {
36✔
1713
                        break SUCCESS_EXIT_CODE
×
1714
                    }
36✔
1715
                }
1716

1717
                // Handle peer discovery
1718
                _ = peer_discovery_interval.tick() => {
221✔
1719
                    log_slow_scope!(fn_name!() + "::select::peer_discovery_interval");
×
1720

1721
                    // Check number of peers we are connected to and connect to
1722
                    // more peers if needed.
1723
                    debug!("Timer: peer discovery job");
×
1724

1725
                    // this check makes regtest mode behave in a local, controlled way
1726
                    // because no regtest nodes attempt to discover eachother, so the only
1727
                    // peers are those that are manually added.
1728
                    // see: https://github.com/Neptune-Crypto/neptune-core/issues/539#issuecomment-2764701027
1729
                    if self.global_state_lock.cli().network.performs_peer_discovery() {
×
1730
                        self.prune_peers().await?;
×
1731
                        self.reconnect(&mut main_loop_state).await?;
×
1732
                        self.discover_peers(&mut main_loop_state).await?;
×
1733
                    } else {
1734
                        debug!("peer discovery disabled for network {}", self.global_state_lock.cli().network);
×
1735
                    }
1736
                }
1737

1738
                // Handle synchronization (i.e. batch-downloading of blocks)
1739
                _ = block_sync_interval.tick() => {
221✔
1740
                    log_slow_scope!(fn_name!() + "::select::block_sync_interval");
60✔
1741

1742
                    trace!("Timer: block-synchronization job");
60✔
1743
                    self.block_sync(&mut main_loop_state).await?;
60✔
1744
                }
1745

1746
                // Clean up mempool: remove stale / too old transactions
1747
                _ = mempool_cleanup_interval.tick() => {
221✔
1748
                    log_slow_scope!(fn_name!() + "::select::mempool_cleanup_interval");
11✔
1749

1750
                    debug!("Timer: mempool-cleaner job");
11✔
1751
                    self
11✔
1752
                        .global_state_lock
11✔
1753
                        .lock_guard_mut()
11✔
1754
                        .await
11✔
1755
                        .mempool_prune_stale_transactions()
11✔
1756
                        .await;
11✔
1757
                }
1758

1759
                // Clean up incoming UTXO notifications: remove stale / too old
1760
                // UTXO notifications from pool
1761
                _ = utxo_notification_cleanup_interval.tick() => {
221✔
1762
                    log_slow_scope!(fn_name!() + "::select::utxo_notification_cleanup_interval");
11✔
1763

1764
                    debug!("Timer: UTXO notification pool cleanup job");
11✔
1765

1766
                    // Danger: possible loss of funds.
1767
                    //
1768
                    // See description of prune_stale_expected_utxos().
1769
                    //
1770
                    // This call is disabled until such time as a thorough
1771
                    // evaluation and perhaps reimplementation determines that
1772
                    // it can be called safely without possible loss of funds.
1773
                    // self.global_state_lock.lock_mut(|s| s.wallet_state.prune_stale_expected_utxos()).await;
1774
                }
1775

1776
                // Handle membership proof resynchronization
1777
                _ = mp_resync_interval.tick() => {
221✔
1778
                    log_slow_scope!(fn_name!() + "::select::mp_resync_interval");
11✔
1779

1780
                    debug!("Timer: Membership proof resync job");
11✔
1781
                    self.global_state_lock.resync_membership_proofs().await?;
11✔
1782
                }
1783

1784
                // run the proof upgrader. The callee checks if proof upgrading
1785
                // should be done.
1786
                _ = tx_proof_upgrade_interval.tick() => {
221✔
1787
                    log_slow_scope!(fn_name!() + "::select::tx_proof_upgrade_interval");
23✔
1788

1789
                    trace!("Timer: tx-proof-upgrader");
23✔
1790
                    self.proof_upgrader(&mut main_loop_state).await?;
23✔
1791
                }
1792

1793
            }
1794
        };
1795

1796
        self.graceful_shutdown(main_loop_state.task_handles).await?;
×
1797
        info!("Shutdown completed.");
×
1798

1799
        Ok(exit_code)
×
1800
    }
×
1801

1802
    /// Handle messages from the RPC server. Returns `true` iff the client should shut down
1803
    /// after handling this message.
UNCOV
1804
    async fn handle_rpc_server_message(
×
UNCOV
1805
        &mut self,
×
UNCOV
1806
        msg: RPCServerToMain,
×
UNCOV
1807
        main_loop_state: &mut MutableMainLoopState,
×
1808
    ) -> Result<bool> {
36✔
1809
        match msg {
36✔
1810
            RPCServerToMain::BroadcastTx(transaction) => {
8✔
1811
                debug!(
8✔
1812
                    "`main` received following transaction from RPC Server. {} inputs, {} outputs. Synced to mutator set hash: {}",
×
1813
                    transaction.kernel.inputs.len(),
×
1814
                    transaction.kernel.outputs.len(),
×
1815
                    transaction.kernel.mutator_set_hash
×
1816
                );
1817

1818
                // note: this Tx must already have been added to the mempool by
1819
                // sender.  This occurs in GlobalStateLock::record_transaction().
1820

1821
                // Is this a transaction we can share with peers? If so, share
1822
                // it immediately.
1823
                if let Ok(notification) = transaction.as_ref().try_into() {
8✔
1824
                    let pmsg = MainToPeerTask::TransactionNotification(notification);
×
1825
                    self.main_to_peer_broadcast(pmsg);
×
1826
                } else {
×
1827
                    // Otherwise, upgrade its proof quality, and share it by
1828
                    // spinning up the proof upgrader.
1829
                    let primitive_witness = transaction.proof.clone().into_primitive_witness();
8✔
1830

1831
                    let vm_job_queue = vm_job_queue();
8✔
1832

1833
                    let proving_capability = self.global_state_lock.cli().proving_capability();
8✔
1834
                    let network = self.global_state_lock.cli().network;
8✔
1835
                    let upgrade_job = UpgradeJob::from_primitive_witness(
8✔
1836
                        network,
8✔
1837
                        proving_capability,
8✔
1838
                        primitive_witness,
8✔
1839
                    );
1840

1841
                    // note: handle_upgrade() hands off proving to the
1842
                    //       triton-vm job queue and waits for job completion.
1843
                    // note: handle_upgrade() broadcasts to peers on success.
1844

1845
                    let global_state_lock_clone = self.global_state_lock.clone();
8✔
1846
                    let main_to_peer_broadcast_tx_clone = self.main_to_peer_broadcast_tx.clone();
8✔
1847
                    let _proof_upgrader_task = tokio::task::Builder::new()
8✔
1848
                        .name("proof_upgrader")
8✔
1849
                        .spawn(async move {
8✔
1850
                        upgrade_job
8✔
1851
                            .handle_upgrade(
8✔
1852
                                vm_job_queue.clone(),
8✔
1853
                                global_state_lock_clone,
8✔
1854
                                main_to_peer_broadcast_tx_clone,
8✔
1855
                            )
8✔
1856
                            .await
8✔
1857
                    })?;
8✔
1858

1859
                    // main_loop_state.proof_upgrader_task = Some(proof_upgrader_task);
1860
                    // If transaction could not be shared immediately because
1861
                    // it contains secret data, upgrade its proof-type.
1862
                }
1863

1864
                // do not shut down
1865
                Ok(false)
8✔
1866
            }
1867
            RPCServerToMain::BroadcastMempoolTransactions => {
1868
                info!("Broadcasting transaction notifications for all shareable transactions in mempool");
×
1869
                let state = self.global_state_lock.lock_guard().await;
×
NEW
1870
                let txs = state.mempool.fee_density_iter().collect_vec();
×
1871
                for (txid, _) in txs {
×
1872
                    // Since a read-lock is held over global state, the
1873
                    // transaction must exist in the mempool.
1874
                    let tx = state
×
1875
                        .mempool
×
1876
                        .get(txid)
×
1877
                        .expect("Transaction from iter must exist in mempool");
×
1878
                    let notification = TransactionNotification::try_from(tx);
×
1879
                    match notification {
×
1880
                        Ok(notification) => {
×
1881
                            let pmsg = MainToPeerTask::TransactionNotification(notification);
×
1882
                            self.main_to_peer_broadcast(pmsg);
×
1883
                        }
×
1884
                        Err(error) => {
×
1885
                            warn!("{error}");
×
1886
                        }
1887
                    };
1888
                }
1889
                Ok(false)
×
1890
            }
1891
            RPCServerToMain::ClearMempool => {
1892
                info!("Clearing mempool");
×
1893
                self.global_state_lock
×
1894
                    .lock_guard_mut()
×
1895
                    .await
×
1896
                    .mempool_clear()
×
1897
                    .await;
×
1898

1899
                Ok(false)
×
1900
            }
1901
            RPCServerToMain::ProofOfWorkSolution(new_block) => {
28✔
1902
                info!("Handling PoW solution from RPC call");
28✔
1903

1904
                self.handle_self_guessed_block(main_loop_state, new_block)
28✔
1905
                    .await?;
28✔
1906
                Ok(false)
28✔
1907
            }
1908
            RPCServerToMain::PauseMiner => {
1909
                info!("Received RPC request to stop miner");
×
1910

1911
                self.main_to_miner_tx.send(MainToMiner::StopMining);
×
1912
                Ok(false)
×
1913
            }
1914
            RPCServerToMain::RestartMiner => {
1915
                info!("Received RPC request to start miner");
×
1916
                self.main_to_miner_tx.send(MainToMiner::StartMining);
×
1917
                Ok(false)
×
1918
            }
1919
            RPCServerToMain::Shutdown => {
1920
                info!("Received RPC shutdown request.");
×
1921

1922
                // shut down
1923
                Ok(true)
×
1924
            }
1925
        }
1926
    }
36✔
1927

1928
    async fn graceful_shutdown(&mut self, join_handles: Vec<JoinHandle<()>>) -> Result<()> {
×
1929
        info!("Shutdown initiated.");
×
1930

1931
        // Stop mining
1932
        self.main_to_miner_tx.send(MainToMiner::Shutdown);
×
1933

1934
        // Send 'bye' message to all peers.
1935
        let pmsg = MainToPeerTask::DisconnectAll();
×
1936
        self.main_to_peer_broadcast(pmsg);
×
1937
        debug!("sent bye");
×
1938

1939
        // Flush all databases
1940
        self.global_state_lock.flush_databases().await?;
×
1941

1942
        tokio::time::sleep(Duration::from_millis(50)).await;
×
1943

1944
        // Child tasks should have finished by now. If not, abort them.
1945
        for jh in &join_handles {
×
1946
            jh.abort();
×
1947
        }
×
1948

1949
        // wait for all to finish.
1950
        futures::future::join_all(join_handles).await;
×
1951

1952
        Ok(())
×
1953
    }
×
1954

1955
    // broadcasts message to peers (if any connected)
1956
    //
1957
    // panics if broadcast failed and channel receiver_count is non-zero
1958
    // indicating we have peer connections.
1959
    fn main_to_peer_broadcast(&self, msg: MainToPeerTask) {
56✔
1960
        if let Err(e) = self.main_to_peer_broadcast_tx.send(msg) {
56✔
1961
            // tbd: maybe we should just log an error and ignore rather
1962
            // than panic.  but for now this preserves prior behavior
1963
            let receiver_count = self.main_to_peer_broadcast_tx.receiver_count();
19✔
1964
            assert_eq!(
19✔
1965
                receiver_count, 0,
1966
                "failed to broadcast message from main to {} peer loops: {:?}",
×
1967
                receiver_count, e
1968
            );
1969
        }
37✔
1970
    }
56✔
1971
}
1972

1973
#[cfg(test)]
1974
#[cfg_attr(coverage_nightly, coverage(off))]
1975
mod tests {
1976
    use std::str::FromStr;
1977
    use std::time::UNIX_EPOCH;
1978

1979
    use macro_rules_attr::apply;
1980
    use tracing_test::traced_test;
1981

1982
    use super::*;
1983
    use crate::config_models::cli_args;
1984
    use crate::config_models::network::Network;
1985
    use crate::tests::shared::blocks::invalid_empty_block;
1986
    use crate::tests::shared::globalstate::get_dummy_peer_incoming;
1987
    use crate::tests::shared::globalstate::get_test_genesis_setup;
1988
    use crate::tests::shared_tokio_runtime;
1989
    use crate::MINER_CHANNEL_CAPACITY;
1990

1991
    impl MainLoopHandler {
1992
        fn mutable(&mut self) -> MutableMainLoopState {
1993
            MutableMainLoopState::new(std::mem::take(&mut self.task_handles))
1994
        }
1995
    }
1996

1997
    struct TestSetup {
1998
        main_loop_handler: MainLoopHandler,
1999
        main_to_peer_rx: broadcast::Receiver<MainToPeerTask>,
2000
    }
2001

2002
    async fn setup(
2003
        num_init_peers_outgoing: u8,
2004
        num_peers_incoming: u8,
2005
        cli: cli_args::Args,
2006
    ) -> TestSetup {
2007
        const CHANNEL_CAPACITY_MINER_TO_MAIN: usize = 10;
2008

2009
        let network = Network::Main;
2010
        let (
2011
            main_to_peer_tx,
2012
            main_to_peer_rx,
2013
            peer_to_main_tx,
2014
            peer_to_main_rx,
2015
            mut state,
2016
            _own_handshake_data,
2017
        ) = get_test_genesis_setup(network, num_init_peers_outgoing, cli)
2018
            .await
2019
            .unwrap();
2020
        assert!(
2021
            state
2022
                .lock_guard()
2023
                .await
2024
                .net
2025
                .peer_map
2026
                .iter()
2027
                .all(|(_addr, peer)| peer.connection_is_outbound()),
2028
            "Test assumption: All initial peers must represent outgoing connections."
2029
        );
2030

2031
        for i in 0..num_peers_incoming {
2032
            let peer_address = SocketAddr::from_str(&format!("255.254.253.{i}:8080")).unwrap();
2033
            state
2034
                .lock_guard_mut()
2035
                .await
2036
                .net
2037
                .peer_map
2038
                .insert(peer_address, get_dummy_peer_incoming(peer_address));
2039
        }
2040

2041
        let incoming_peer_listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
2042

2043
        let (main_to_miner_tx, _main_to_miner_rx) =
2044
            mpsc::channel::<MainToMiner>(MINER_CHANNEL_CAPACITY);
2045
        let (_miner_to_main_tx, miner_to_main_rx) =
2046
            mpsc::channel::<MinerToMain>(CHANNEL_CAPACITY_MINER_TO_MAIN);
2047
        let (_rpc_server_to_main_tx, rpc_server_to_main_rx) =
2048
            mpsc::channel::<RPCServerToMain>(CHANNEL_CAPACITY_MINER_TO_MAIN);
2049

2050
        let task_join_handles = vec![];
2051

2052
        let main_loop_handler = MainLoopHandler::new(
2053
            incoming_peer_listener,
2054
            state,
2055
            main_to_peer_tx,
2056
            peer_to_main_tx,
2057
            main_to_miner_tx,
2058
            peer_to_main_rx,
2059
            miner_to_main_rx,
2060
            rpc_server_to_main_rx,
2061
            task_join_handles,
2062
        );
2063
        TestSetup {
2064
            main_loop_handler,
2065
            main_to_peer_rx,
2066
        }
2067
    }
2068

2069
    #[apply(shared_tokio_runtime)]
2070
    async fn handle_self_guessed_block_new_tip() {
2071
        // A new tip is registered by main_loop. Verify correct state update.
2072
        let TestSetup {
2073
            mut main_loop_handler,
2074
            mut main_to_peer_rx,
2075
            ..
2076
        } = setup(1, 0, cli_args::Args::default()).await;
2077
        let network = main_loop_handler.global_state_lock.cli().network;
2078
        let mut mutable_main_loop_state = main_loop_handler.mutable();
2079

2080
        let block1 = invalid_empty_block(network, &Block::genesis(network));
2081

2082
        assert!(
2083
            main_loop_handler
2084
                .global_state_lock
2085
                .lock_guard()
2086
                .await
2087
                .chain
2088
                .light_state()
2089
                .header()
2090
                .height
2091
                .is_genesis(),
2092
            "Tip must be genesis prior to handling of new block"
2093
        );
2094

2095
        let block1 = Box::new(block1);
2096
        main_loop_handler
2097
            .handle_self_guessed_block(&mut mutable_main_loop_state, block1.clone())
2098
            .await
2099
            .unwrap();
2100
        let new_block_height: u64 = main_loop_handler
2101
            .global_state_lock
2102
            .lock_guard()
2103
            .await
2104
            .chain
2105
            .light_state()
2106
            .header()
2107
            .height
2108
            .into();
2109
        assert_eq!(
2110
            1u64, new_block_height,
2111
            "Tip height must be 1 after handling of new block"
2112
        );
2113
        let msg_to_peer_loops = main_to_peer_rx.recv().await.unwrap();
2114
        if let MainToPeerTask::Block(block_to_peers) = msg_to_peer_loops {
2115
            assert_eq!(
2116
                block1, block_to_peers,
2117
                "Peer loops must have received block 1"
2118
            );
2119
        } else {
2120
            panic!("Must have sent block notification to peer loops")
2121
        }
2122
    }
2123

2124
    mod update_mempool_txs {
2125
        use crate::api::export::NativeCurrencyAmount;
2126
        use crate::tests::shared::blocks::fake_deterministic_successor;
2127
        use crate::tests::shared::mock_tx::genesis_tx_with_proof_type;
2128

2129
        use super::*;
2130

2131
        #[traced_test]
2132
        #[apply(shared_tokio_runtime)]
2133
        async fn tx_ms_updating() {
2134
            // Create a transaction, and insert it into the mempool. Receive a
2135
            // block that does not include the transaction. Verify that the
2136
            // transaction is updated to be valid under the new mutator set
2137
            // after the application of the block and the invocation of the
2138
            // relevant functions.
2139
            let network = Network::Main;
2140
            let fee = NativeCurrencyAmount::coins(1);
2141

2142
            let genesis_block = Block::genesis(network);
2143
            let block1 = fake_deterministic_successor(&genesis_block, network).await;
2144
            let cli = cli_args::Args {
2145
                tx_proving_capability: Some(TxProvingCapability::SingleProof),
2146
                ..Default::default()
2147
            };
2148
            for tx_proving_capability in [
2149
                TxProvingCapability::PrimitiveWitness,
2150
                TxProvingCapability::ProofCollection,
2151
                TxProvingCapability::SingleProof,
2152
            ] {
2153
                let num_outgoing_connections = 0;
2154
                let num_incoming_connections = 0;
2155
                let TestSetup {
2156
                    mut main_loop_handler,
2157
                    mut main_to_peer_rx,
2158
                    ..
2159
                } = setup(
2160
                    num_outgoing_connections,
2161
                    num_incoming_connections,
2162
                    cli.clone(),
2163
                )
2164
                .await;
2165

2166
                // First insert a PW backed transaction to ensure PW is
2167
                // present, as this determines what MS-data updating jobs are
2168
                // returned.
2169
                let pw_tx =
2170
                    genesis_tx_with_proof_type(TxProvingCapability::PrimitiveWitness, network, fee)
2171
                        .await;
2172
                let tx = genesis_tx_with_proof_type(tx_proving_capability, network, fee).await;
2173
                let update_jobs = {
2174
                    let mut gsl = main_loop_handler.global_state_lock.lock_guard_mut().await;
2175
                    gsl.mempool_insert(pw_tx.into(), UpgradePriority::Critical)
2176
                        .await;
2177
                    gsl.mempool_insert(tx.clone().into(), UpgradePriority::Critical)
2178
                        .await;
2179
                    gsl.set_new_tip(block1.clone()).await.unwrap()
2180
                };
2181

2182
                assert_eq!(1, update_jobs.len(), "Must return 1 job for MS-updating");
2183

2184
                let (update_sender, mut update_receiver) =
2185
                    mpsc::channel::<Vec<MempoolUpdateJobResult>>(TX_UPDATER_CHANNEL_CAPACITY);
2186
                MainLoopHandler::update_mempool_jobs(
2187
                    main_loop_handler.global_state_lock.clone(),
2188
                    update_jobs,
2189
                    vm_job_queue(),
2190
                    update_sender,
2191
                    TritonVmProofJobOptions::default(),
2192
                )
2193
                .await;
2194

2195
                let msg = update_receiver.recv().await.unwrap();
2196
                assert_eq!(1, msg.len(), "Must return exactly one update result");
2197
                assert!(
2198
                    matches!(msg[0], MempoolUpdateJobResult::Success { .. }),
2199
                    "Update must be a success"
2200
                );
2201

2202
                main_loop_handler.handle_updated_mempool_txs(msg).await;
2203

2204
                // Verify that
2205
                // a) mempool contains the updated transaction, and
2206
                // b) that peers were informed of the new transaction, if the
2207
                //    transaction is shareable, i.e. is not only backed by a
2208
                //    primitive witness.
2209
                let txid = tx.txid();
2210
                let block1_msa = block1.mutator_set_accumulator_after().unwrap();
2211
                assert!(
2212
                    main_loop_handler
2213
                        .global_state_lock
2214
                        .lock_guard()
2215
                        .await
2216
                        .mempool
2217
                        .get(txid)
2218
                        .unwrap()
2219
                        .clone()
2220
                        .is_confirmable_relative_to(&block1_msa),
2221
                    "transaction must be updatable"
2222
                );
2223

2224
                if tx_proving_capability != TxProvingCapability::PrimitiveWitness {
2225
                    let peer_msg = main_to_peer_rx.recv().await.unwrap();
2226
                    let MainToPeerTask::TransactionNotification(tx_notification) = peer_msg else {
2227
                        panic!("Outgoing peer message must be tx notification");
2228
                    };
2229
                    assert_eq!(txid, tx_notification.txid);
2230
                    assert_eq!(block1_msa.hash(), tx_notification.mutator_set_hash);
2231
                }
2232
            }
2233
        }
2234
    }
2235

2236
    #[allow(clippy::explicit_deref_methods)] // suppress clippy's bad autosuggestion
2237
    mod sync_mode {
2238
        use tasm_lib::twenty_first::util_types::mmr::mmr_accumulator::MmrAccumulator;
2239
        use test_strategy::proptest;
2240

2241
        use super::*;
2242
        use crate::tests::shared::globalstate::get_dummy_socket_address;
2243

2244
        #[proptest]
2245
        fn batch_request_heights_prop(#[strategy(0u64..100_000_000_000)] own_height: u64) {
2246
            batch_request_heights_sanity(own_height);
2247
        }
2248

2249
        #[test]
2250
        fn batch_request_heights_unit() {
2251
            let own_height = 1_000_000u64;
2252
            batch_request_heights_sanity(own_height);
2253
        }
2254

2255
        fn batch_request_heights_sanity(own_height: u64) {
2256
            let heights = MainLoopHandler::batch_request_uca_candidate_heights(own_height.into());
2257

2258
            let mut heights_rev = heights.clone();
2259
            heights_rev.reverse();
2260
            assert!(
2261
                heights_rev.is_sorted(),
2262
                "Heights must be sorted from high-to-low"
2263
            );
2264

2265
            heights_rev.dedup();
2266
            assert_eq!(heights_rev.len(), heights.len(), "duplicates");
2267

2268
            assert_eq!(heights[0], own_height.into(), "starts with own tip height");
2269
            assert!(
2270
                heights.last().unwrap().is_genesis(),
2271
                "ends with genesis block"
2272
            );
2273
        }
2274

2275
        #[apply(shared_tokio_runtime)]
2276
        #[traced_test]
2277
        async fn sync_mode_abandoned_on_global_timeout() {
2278
            let num_outgoing_connections = 0;
2279
            let num_incoming_connections = 0;
2280
            let TestSetup {
2281
                mut main_loop_handler,
2282
                main_to_peer_rx: _main_to_peer_rx,
2283
                ..
2284
            } = setup(
2285
                num_outgoing_connections,
2286
                num_incoming_connections,
2287
                cli_args::Args::default(),
2288
            )
2289
            .await;
2290
            let mut mutable_main_loop_state = main_loop_handler.mutable();
2291

2292
            main_loop_handler
2293
                .block_sync(&mut mutable_main_loop_state)
2294
                .await
2295
                .expect("Must return OK when no sync mode is set");
2296

2297
            // Mock that we are in a valid sync state
2298
            let claimed_max_height = 1_000u64.into();
2299
            let claimed_max_pow = ProofOfWork::new([100; 6]);
2300
            main_loop_handler
2301
                .global_state_lock
2302
                .lock_guard_mut()
2303
                .await
2304
                .net
2305
                .sync_anchor = Some(SyncAnchor::new(
2306
                claimed_max_pow,
2307
                MmrAccumulator::new_from_leafs(vec![]),
2308
            ));
2309
            mutable_main_loop_state.sync_state.peer_sync_states.insert(
2310
                get_dummy_socket_address(0),
2311
                PeerSynchronizationState::new(claimed_max_height, claimed_max_pow),
2312
            );
2313

2314
            let sync_start_time = main_loop_handler
2315
                .global_state_lock
2316
                .lock_guard()
2317
                .await
2318
                .net
2319
                .sync_anchor
2320
                .as_ref()
2321
                .unwrap()
2322
                .updated;
2323
            main_loop_handler
2324
                .block_sync(&mut mutable_main_loop_state)
2325
                .await
2326
                .expect("Must return OK when sync mode has not timed out yet");
2327
            assert!(
2328
                main_loop_handler
2329
                    .global_state_lock
2330
                    .lock_guard()
2331
                    .await
2332
                    .net
2333
                    .sync_anchor
2334
                    .is_some(),
2335
                "Sync mode must still be set before timeout has occurred"
2336
            );
2337

2338
            assert_eq!(
2339
                sync_start_time,
2340
                main_loop_handler
2341
                    .global_state_lock
2342
                    .lock_guard()
2343
                    .await
2344
                    .net
2345
                    .sync_anchor
2346
                    .as_ref()
2347
                    .unwrap()
2348
                    .updated,
2349
                "timestamp may not be updated without state change"
2350
            );
2351

2352
            // Mock that sync-mode has timed out
2353
            main_loop_handler = main_loop_handler.with_mocked_time(
2354
                SystemTime::now() + GLOBAL_SYNCHRONIZATION_TIMEOUT + Duration::from_secs(1),
2355
            );
2356

2357
            main_loop_handler
2358
                .block_sync(&mut mutable_main_loop_state)
2359
                .await
2360
                .expect("Must return OK when sync mode has timed out");
2361
            assert!(
2362
                main_loop_handler
2363
                    .global_state_lock
2364
                    .lock_guard()
2365
                    .await
2366
                    .net
2367
                    .sync_anchor
2368
                    .is_none(),
2369
                "Sync mode must be unset on timeout"
2370
            );
2371
        }
2372
    }
2373

2374
    mod proof_upgrader {
2375
        use super::*;
2376
        use crate::models::blockchain::transaction::Transaction;
2377
        use crate::models::blockchain::transaction::TransactionProof;
2378
        use crate::models::blockchain::type_scripts::native_currency_amount::NativeCurrencyAmount;
2379
        use crate::models::peer::transfer_transaction::TransactionProofQuality;
2380
        use crate::models::proof_abstractions::timestamp::Timestamp;
2381
        use crate::models::state::tx_creation_config::TxCreationConfig;
2382
        use crate::models::state::wallet::transaction_output::TxOutput;
2383

2384
        async fn tx_no_outputs(
2385
            global_state_lock: &mut GlobalStateLock,
2386
            tx_proof_type: TxProvingCapability,
2387
            fee: NativeCurrencyAmount,
2388
        ) -> Arc<Transaction> {
2389
            let change_key = global_state_lock
2390
                .lock_guard()
2391
                .await
2392
                .wallet_state
2393
                .wallet_entropy
2394
                .nth_generation_spending_key_for_tests(0);
2395
            let in_seven_months = global_state_lock
2396
                .lock_guard()
2397
                .await
2398
                .chain
2399
                .light_state()
2400
                .header()
2401
                .timestamp
2402
                + Timestamp::months(7);
2403

2404
            let config = TxCreationConfig::default()
2405
                .recover_change_off_chain(change_key.into())
2406
                .with_prover_capability(tx_proof_type);
2407
            global_state_lock
2408
                .api()
2409
                .tx_initiator_internal()
2410
                .create_transaction(Vec::<TxOutput>::new().into(), fee, in_seven_months, config)
2411
                .await
2412
                .unwrap()
2413
                .transaction
2414
        }
2415

2416
        #[apply(shared_tokio_runtime)]
2417
        #[traced_test]
2418
        async fn upgrade_proof_collection_to_single_proof_foreign_tx() {
2419
            let num_outgoing_connections = 0;
2420
            let num_incoming_connections = 0;
2421

2422
            let TestSetup {
2423
                mut main_loop_handler,
2424
                mut main_to_peer_rx,
2425
                ..
2426
            } = setup(
2427
                num_outgoing_connections,
2428
                num_incoming_connections,
2429
                cli_args::Args::default(),
2430
            )
2431
            .await;
2432

2433
            // Force instance to create SingleProofs, otherwise CI and other
2434
            // weak machines fail.
2435
            let mocked_cli = cli_args::Args {
2436
                tx_proving_capability: Some(TxProvingCapability::SingleProof),
2437
                tx_proof_upgrading: true,
2438
                ..Default::default()
2439
            };
2440

2441
            main_loop_handler
2442
                .global_state_lock
2443
                .set_cli(mocked_cli)
2444
                .await;
2445
            let mut main_loop_handler = main_loop_handler.with_mocked_time(SystemTime::now());
2446
            let mut mutable_main_loop_state = main_loop_handler.mutable();
2447

2448
            assert!(
2449
                main_loop_handler
2450
                    .proof_upgrader(&mut mutable_main_loop_state)
2451
                    .await
2452
                    .is_ok(),
2453
                "Scheduled task returns OK when run on empty mempool"
2454
            );
2455

2456
            let fee = NativeCurrencyAmount::coins(1);
2457
            let proof_collection_tx = tx_no_outputs(
2458
                &mut main_loop_handler.global_state_lock,
2459
                TxProvingCapability::ProofCollection,
2460
                fee,
2461
            )
2462
            .await;
2463

2464
            main_loop_handler
2465
                .global_state_lock
2466
                .lock_guard_mut()
2467
                .await
2468
                .mempool_insert((*proof_collection_tx).clone(), UpgradePriority::Irrelevant)
2469
                .await;
2470
            assert!(
2471
                main_loop_handler
2472
                    .proof_upgrader(&mut mutable_main_loop_state)
2473
                    .await
2474
                    .is_ok(),
2475
                "Scheduled task must return OK when it's time to upgrade"
2476
            );
2477

2478
            // Wait for upgrade task to finish.
2479
            let handle = mutable_main_loop_state.proof_upgrader_task.unwrap().await;
2480
            assert!(
2481
                handle.is_ok(),
2482
                "Proof-upgrade task must finish successfully."
2483
            );
2484

2485
            // At this point there should be one transaction in the mempool,
2486
            // which is (if all is well) the merger of the ProofCollection
2487
            // transaction inserted above and one of the upgrader's fee
2488
            // gobblers. The point is that this transaction is a SingleProof
2489
            // transaction, so test that.
2490
            let (merged_txid, _) = main_loop_handler
2491
                .global_state_lock
2492
                .lock_guard()
2493
                .await
2494
                .mempool
2495
                .fee_density_iter()
2496
                .next_back()
2497
                .expect("mempool should contain one item here");
2498

2499
            assert!(
2500
                matches!(
2501
                    main_loop_handler
2502
                        .global_state_lock
2503
                        .lock_guard()
2504
                        .await
2505
                        .mempool
2506
                        .get(merged_txid)
2507
                        .unwrap()
2508
                        .proof,
2509
                    TransactionProof::SingleProof(_)
2510
                ),
2511
                "Proof in mempool must now be of type single proof"
2512
            );
2513

2514
            match main_to_peer_rx.recv().await {
2515
                Ok(MainToPeerTask::TransactionNotification(tx_noti)) => {
2516
                    assert_eq!(merged_txid, tx_noti.txid);
2517
                    assert_eq!(TransactionProofQuality::SingleProof, tx_noti.proof_quality);
2518
                },
2519
                other => panic!("Must have sent transaction notification to peer loop after successful proof upgrade. Got:\n{other:?}"),
2520
            }
2521
        }
2522
    }
2523

2524
    mod peer_discovery {
2525
        use super::*;
2526

2527
        #[apply(shared_tokio_runtime)]
2528
        #[traced_test]
2529
        async fn prune_peers_too_many_connections() {
2530
            let num_init_peers_outgoing = 10;
2531
            let num_init_peers_incoming = 4;
2532
            let TestSetup {
2533
                mut main_loop_handler,
2534
                mut main_to_peer_rx,
2535
                ..
2536
            } = setup(
2537
                num_init_peers_outgoing,
2538
                num_init_peers_incoming,
2539
                cli_args::Args::default(),
2540
            )
2541
            .await;
2542

2543
            let mocked_cli = cli_args::Args {
2544
                max_num_peers: num_init_peers_outgoing as usize,
2545
                ..Default::default()
2546
            };
2547

2548
            main_loop_handler
2549
                .global_state_lock
2550
                .set_cli(mocked_cli)
2551
                .await;
2552

2553
            main_loop_handler.prune_peers().await.unwrap();
2554
            assert_eq!(4, main_to_peer_rx.len());
2555
            for _ in 0..4 {
2556
                let peer_msg = main_to_peer_rx.recv().await.unwrap();
2557
                assert!(matches!(peer_msg, MainToPeerTask::Disconnect(_)))
2558
            }
2559
        }
2560

2561
        #[apply(shared_tokio_runtime)]
2562
        #[traced_test]
2563
        async fn prune_peers_not_too_many_connections() {
2564
            let num_init_peers_outgoing = 10;
2565
            let num_init_peers_incoming = 1;
2566
            let TestSetup {
2567
                mut main_loop_handler,
2568
                main_to_peer_rx,
2569
                ..
2570
            } = setup(
2571
                num_init_peers_outgoing,
2572
                num_init_peers_incoming,
2573
                cli_args::Args::default(),
2574
            )
2575
            .await;
2576

2577
            let mocked_cli = cli_args::Args {
2578
                max_num_peers: 200,
2579
                ..Default::default()
2580
            };
2581

2582
            main_loop_handler
2583
                .global_state_lock
2584
                .set_cli(mocked_cli)
2585
                .await;
2586

2587
            main_loop_handler.prune_peers().await.unwrap();
2588
            assert!(main_to_peer_rx.is_empty());
2589
        }
2590

2591
        #[apply(shared_tokio_runtime)]
2592
        #[traced_test]
2593
        async fn skip_peer_discovery_if_peer_limit_is_exceeded() {
2594
            let num_init_peers_outgoing = 2;
2595
            let num_init_peers_incoming = 0;
2596
            let TestSetup {
2597
                mut main_loop_handler,
2598
                ..
2599
            } = setup(
2600
                num_init_peers_outgoing,
2601
                num_init_peers_incoming,
2602
                cli_args::Args::default(),
2603
            )
2604
            .await;
2605

2606
            let mocked_cli = cli_args::Args {
2607
                max_num_peers: 0,
2608
                ..Default::default()
2609
            };
2610
            main_loop_handler
2611
                .global_state_lock
2612
                .set_cli(mocked_cli)
2613
                .await;
2614
            let mut mutable_state = main_loop_handler.mutable();
2615
            main_loop_handler
2616
                .discover_peers(&mut mutable_state)
2617
                .await
2618
                .unwrap();
2619

2620
            assert!(logs_contain("Skipping peer discovery."));
2621
        }
2622

2623
        #[apply(shared_tokio_runtime)]
2624
        #[traced_test]
2625
        async fn performs_peer_discovery_on_few_connections() {
2626
            let num_init_peers_outgoing = 2;
2627
            let num_init_peers_incoming = 0;
2628
            let TestSetup {
2629
                mut main_loop_handler,
2630
                mut main_to_peer_rx,
2631
                ..
2632
            } = setup(
2633
                num_init_peers_outgoing,
2634
                num_init_peers_incoming,
2635
                cli_args::Args::default(),
2636
            )
2637
            .await;
2638

2639
            // Set CLI to attempt to make more connections
2640
            let mocked_cli = cli_args::Args {
2641
                max_num_peers: 10,
2642
                ..Default::default()
2643
            };
2644
            main_loop_handler
2645
                .global_state_lock
2646
                .set_cli(mocked_cli)
2647
                .await;
2648
            let mut mutable_state = main_loop_handler.mutable();
2649
            main_loop_handler
2650
                .discover_peers(&mut mutable_state)
2651
                .await
2652
                .unwrap();
2653

2654
            let peer_discovery_sent_messages_on_peer_channel = main_to_peer_rx.try_recv().is_ok();
2655
            assert!(peer_discovery_sent_messages_on_peer_channel);
2656
            assert!(logs_contain("Performing peer discovery"));
2657
        }
2658
    }
2659

2660
    #[test]
2661
    fn older_systemtime_ranks_first() {
2662
        let start = UNIX_EPOCH;
2663
        let other = UNIX_EPOCH + Duration::from_secs(1000);
2664
        let mut instants = [start, other];
2665

2666
        assert_eq!(
2667
            start,
2668
            instants.iter().copied().min_by(|l, r| l.cmp(r)).unwrap()
2669
        );
2670

2671
        instants.reverse();
2672

2673
        assert_eq!(
2674
            start,
2675
            instants.iter().copied().min_by(|l, r| l.cmp(r)).unwrap()
2676
        );
2677
    }
2678
    mod bootstrapper_mode {
2679

2680
        use rand::Rng;
2681

2682
        use super::*;
2683
        use crate::models::peer::PeerMessage;
2684
        use crate::models::peer::TransferConnectionStatus;
2685
        use crate::tests::shared::globalstate::get_dummy_peer_connection_data_genesis;
2686
        use crate::tests::shared::to_bytes;
2687

2688
        #[apply(shared_tokio_runtime)]
2689
        #[traced_test]
2690
        async fn disconnect_from_oldest_peer_upon_connection_request() {
2691
            // Set up a node in bootstrapper mode and connected to a given
2692
            // number of peers, which is one less than the maximum. Initiate a
2693
            // connection request. Verify that the oldest of the existing
2694
            // connections is dropped.
2695

2696
            let network = Network::Main;
2697
            let num_init_peers_outgoing = 5;
2698
            let num_init_peers_incoming = 0;
2699
            let TestSetup {
2700
                mut main_loop_handler,
2701
                mut main_to_peer_rx,
2702
                ..
2703
            } = setup(
2704
                num_init_peers_outgoing,
2705
                num_init_peers_incoming,
2706
                cli_args::Args::default(),
2707
            )
2708
            .await;
2709

2710
            let mocked_cli = cli_args::Args {
2711
                max_num_peers: usize::from(num_init_peers_outgoing) + 1,
2712
                bootstrap: true,
2713
                network,
2714
                ..Default::default()
2715
            };
2716
            main_loop_handler
2717
                .global_state_lock
2718
                .set_cli(mocked_cli)
2719
                .await;
2720

2721
            let mut mutable_main_loop_state = main_loop_handler.mutable();
2722

2723
            // check sanity: at startup, we are connected to the initial number of peers
2724
            assert_eq!(
2725
                usize::from(num_init_peers_outgoing),
2726
                main_loop_handler
2727
                    .global_state_lock
2728
                    .lock_guard()
2729
                    .await
2730
                    .net
2731
                    .peer_map
2732
                    .len()
2733
            );
2734

2735
            // randomize "connection established" timestamps
2736
            let mut rng = rand::rng();
2737
            let now = SystemTime::now();
2738
            let now_as_unix_timestamp = now.duration_since(UNIX_EPOCH).unwrap();
2739
            main_loop_handler
2740
                .global_state_lock
2741
                .lock_guard_mut()
2742
                .await
2743
                .net
2744
                .peer_map
2745
                .iter_mut()
2746
                .for_each(|(_socket_address, peer_info)| {
2747
                    peer_info.set_connection_established(
2748
                        UNIX_EPOCH
2749
                            + Duration::from_millis(
2750
                                rng.random_range(0..(now_as_unix_timestamp.as_millis() as u64)),
2751
                            ),
2752
                    );
2753
                });
2754

2755
            // compute which peer will be dropped, for later reference
2756
            let expected_drop_peer_socket_address = main_loop_handler
2757
                .global_state_lock
2758
                .lock_guard()
2759
                .await
2760
                .net
2761
                .peer_map
2762
                .iter()
2763
                .min_by(|l, r| {
2764
                    l.1.connection_established()
2765
                        .cmp(&r.1.connection_established())
2766
                })
2767
                .map(|(socket_address, _peer_info)| socket_address)
2768
                .copied()
2769
                .unwrap();
2770

2771
            // simulate incoming connection
2772
            let (peer_handshake_data, peer_socket_address) =
2773
                get_dummy_peer_connection_data_genesis(network, 1);
2774
            let own_handshake_data = main_loop_handler
2775
                .global_state_lock
2776
                .lock_guard()
2777
                .await
2778
                .get_own_handshakedata();
2779
            assert_eq!(peer_handshake_data.network, own_handshake_data.network,);
2780
            assert_eq!(peer_handshake_data.version, own_handshake_data.version,);
2781
            let mock_stream = tokio_test::io::Builder::new()
2782
                .read(
2783
                    &to_bytes(&PeerMessage::Handshake(Box::new((
2784
                        crate::MAGIC_STRING_REQUEST.to_vec(),
2785
                        peer_handshake_data.clone(),
2786
                    ))))
2787
                    .unwrap(),
2788
                )
2789
                .write(
2790
                    &to_bytes(&PeerMessage::Handshake(Box::new((
2791
                        crate::MAGIC_STRING_RESPONSE.to_vec(),
2792
                        own_handshake_data.clone(),
2793
                    ))))
2794
                    .unwrap(),
2795
                )
2796
                .write(
2797
                    &to_bytes(&PeerMessage::ConnectionStatus(
2798
                        TransferConnectionStatus::Accepted,
2799
                    ))
2800
                    .unwrap(),
2801
                )
2802
                .build();
2803
            let peer_to_main_tx_clone = main_loop_handler.peer_task_to_main_tx.clone();
2804
            let global_state_lock_clone = main_loop_handler.global_state_lock.clone();
2805
            let (_main_to_peer_tx_mock, main_to_peer_rx_mock) = tokio::sync::broadcast::channel(10);
2806
            let incoming_peer_task_handle = tokio::task::Builder::new()
2807
                .name("answer_peer_wrapper")
2808
                .spawn(async move {
2809
                    match answer_peer(
2810
                        mock_stream,
2811
                        global_state_lock_clone,
2812
                        peer_socket_address,
2813
                        main_to_peer_rx_mock,
2814
                        peer_to_main_tx_clone,
2815
                        own_handshake_data,
2816
                    )
2817
                    .await
2818
                    {
2819
                        Ok(()) => (),
2820
                        Err(err) => error!("Got error: {:?}", err),
2821
                    }
2822
                })
2823
                .unwrap();
2824

2825
            // `answer_peer_wrapper` should send a
2826
            // `DisconnectFromLongestLivedPeer` message to main
2827
            let peer_to_main_message = main_loop_handler.peer_task_to_main_rx.recv().await.unwrap();
2828
            assert!(matches!(
2829
                peer_to_main_message,
2830
                PeerTaskToMain::DisconnectFromLongestLivedPeer,
2831
            ));
2832

2833
            // process this message
2834
            main_loop_handler
2835
                .handle_peer_task_message(peer_to_main_message, &mut mutable_main_loop_state)
2836
                .await
2837
                .unwrap();
2838

2839
            // main loop should send a `Disconnect` message
2840
            let main_to_peers_message = main_to_peer_rx.recv().await.unwrap();
2841
            let MainToPeerTask::Disconnect(observed_drop_peer_socket_address) =
2842
                main_to_peers_message
2843
            else {
2844
                panic!("Expected disconnect, got {main_to_peers_message:?}");
2845
            };
2846

2847
            // matched observed droppee against expectation
2848
            assert_eq!(
2849
                expected_drop_peer_socket_address,
2850
                observed_drop_peer_socket_address,
2851
            );
2852
            println!("Dropped connection with {expected_drop_peer_socket_address}.");
2853

2854
            // don't forget to terminate the peer task, which is still running
2855
            incoming_peer_task_handle.abort();
2856
        }
2857
    }
2858
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc