• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Neptune-Crypto / neptune-core / 13911366652

17 Mar 2025 10:41PM UTC coverage: 84.279% (-0.04%) from 84.316%
13911366652

push

github

Sword-Smith
Revert "feat!: Communicate node's bootstrap status"

This reverts commit e62fa5bf3.

Revert to get rid of variant `PeerMessage::BootstrapStatus` which
unfortunately doesn't seem to be backwards compatible.

38 of 40 new or added lines in 2 files covered. (95.0%)

648 existing lines in 8 files now uncovered.

50757 of 60225 relevant lines covered (84.28%)

174317.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.37
/src/main_loop.rs
1
pub mod proof_upgrader;
2

3
use std::collections::HashMap;
4
use std::net::SocketAddr;
5
use std::time::Duration;
6
use std::time::SystemTime;
7

8
use anyhow::Result;
9
use itertools::Itertools;
10
use proof_upgrader::get_upgrade_task_from_mempool;
11
use proof_upgrader::UpdateMutatorSetDataJob;
12
use proof_upgrader::UpgradeJob;
13
use rand::prelude::IteratorRandom;
14
use rand::seq::IndexedRandom;
15
use tokio::net::TcpListener;
16
use tokio::select;
17
use tokio::signal;
18
use tokio::sync::broadcast;
19
use tokio::sync::mpsc;
20
use tokio::task::JoinHandle;
21
use tokio::time;
22
use tracing::debug;
23
use tracing::error;
24
use tracing::info;
25
use tracing::trace;
26
use tracing::warn;
27

28
use crate::connect_to_peers::answer_peer;
29
use crate::connect_to_peers::call_peer;
30
use crate::job_queue::triton_vm::TritonVmJobPriority;
31
use crate::job_queue::triton_vm::TritonVmJobQueue;
32
use crate::macros::fn_name;
33
use crate::macros::log_slow_scope;
34
use crate::models::blockchain::block::block_header::BlockHeader;
35
use crate::models::blockchain::block::block_height::BlockHeight;
36
use crate::models::blockchain::block::difficulty_control::ProofOfWork;
37
use crate::models::blockchain::block::Block;
38
use crate::models::blockchain::transaction::Transaction;
39
use crate::models::blockchain::transaction::TransactionProof;
40
use crate::models::channel::MainToMiner;
41
use crate::models::channel::MainToPeerTask;
42
use crate::models::channel::MainToPeerTaskBatchBlockRequest;
43
use crate::models::channel::MinerToMain;
44
use crate::models::channel::PeerTaskToMain;
45
use crate::models::channel::RPCServerToMain;
46
use crate::models::peer::handshake_data::HandshakeData;
47
use crate::models::peer::peer_info::PeerInfo;
48
use crate::models::peer::transaction_notification::TransactionNotification;
49
use crate::models::peer::PeerSynchronizationState;
50
use crate::models::proof_abstractions::tasm::program::TritonVmProofJobOptions;
51
use crate::models::state::block_proposal::BlockProposal;
52
use crate::models::state::mempool::TransactionOrigin;
53
use crate::models::state::networking_state::SyncAnchor;
54
use crate::models::state::tx_proving_capability::TxProvingCapability;
55
use crate::models::state::GlobalState;
56
use crate::models::state::GlobalStateLock;
57
use crate::SUCCESS_EXIT_CODE;
58

59
const PEER_DISCOVERY_INTERVAL_IN_SECONDS: u64 = 120;
60
const SYNC_REQUEST_INTERVAL_IN_SECONDS: u64 = 3;
61
const MEMPOOL_PRUNE_INTERVAL_IN_SECS: u64 = 30 * 60; // 30mins
62
const MP_RESYNC_INTERVAL_IN_SECS: u64 = 59;
63
const EXPECTED_UTXOS_PRUNE_INTERVAL_IN_SECS: u64 = 19 * 60; // 19 mins
64

65
/// Interval for when transaction-upgrade checker is run. Note that this does
66
/// *not* define how often a transaction-proof upgrade is actually performed.
67
/// Only how often we check if we're ready to perform an upgrade.
68
const TRANSACTION_UPGRADE_CHECK_INTERVAL_IN_SECONDS: u64 = 60; // 1 minute
69

70
const SANCTION_PEER_TIMEOUT_FACTOR: u64 = 40;
71

72
/// Number of seconds within which an individual peer is expected to respond
73
/// to a synchronization request.
74
const INDIVIDUAL_PEER_SYNCHRONIZATION_TIMEOUT_IN_SECONDS: u64 =
75
    SANCTION_PEER_TIMEOUT_FACTOR * SYNC_REQUEST_INTERVAL_IN_SECONDS;
76

77
/// Number of seconds that a synchronization may run without any progress.
78
const GLOBAL_SYNCHRONIZATION_TIMEOUT_IN_SECONDS: u64 =
79
    INDIVIDUAL_PEER_SYNCHRONIZATION_TIMEOUT_IN_SECONDS * 4;
80

81
const POTENTIAL_PEER_MAX_COUNT_AS_A_FACTOR_OF_MAX_PEERS: usize = 20;
82
pub(crate) const MAX_NUM_DIGESTS_IN_BATCH_REQUEST: usize = 200;
83
const TX_UPDATER_CHANNEL_CAPACITY: usize = 1;
84

85
/// Wraps a transmission channel.
86
///
87
/// To be used for the transmission channel to the miner, because
88
///  a) the miner might not exist in which case there would be no-one to empty
89
///     the channel; and
90
///  b) contrary to other channels, transmission failures here are not critical.
91
#[derive(Debug)]
92
struct MainToMinerChannel(Option<mpsc::Sender<MainToMiner>>);
93

94
impl MainToMinerChannel {
95
    /// Send a message to the miner task (if any).
UNCOV
96
    fn send(&self, message: MainToMiner) {
×
97
        // Do no use the async `send` function because it blocks until there
98
        // is spare capacity on the channel. Messages to the miner are not
99
        // critical so if there is no capacity left, just log an error
100
        // message.
101
        if let Some(channel) = &self.0 {
×
UNCOV
102
            if let Err(e) = channel.try_send(message) {
×
UNCOV
103
                error!("Failed to send pause message to miner thread:\n{e}");
×
UNCOV
104
            }
×
UNCOV
105
        }
×
106
    }
×
107
}
108

109
/// MainLoop is the immutable part of the input for the main loop function
110
#[derive(Debug)]
111
pub struct MainLoopHandler {
112
    incoming_peer_listener: TcpListener,
113
    global_state_lock: GlobalStateLock,
114

115
    // note: broadcast::Sender::send() does not block
116
    main_to_peer_broadcast_tx: broadcast::Sender<MainToPeerTask>,
117

118
    // note: mpsc::Sender::send() blocks if channel full.
119
    // locks should not be held across it.
120
    peer_task_to_main_tx: mpsc::Sender<PeerTaskToMain>,
121

122
    // note: MainToMinerChannel::send() does not block.  might log error.
123
    main_to_miner_tx: MainToMinerChannel,
124

125
    #[cfg(test)]
126
    mock_now: Option<SystemTime>,
127
}
128

129
/// The mutable part of the main loop function
130
struct MutableMainLoopState {
131
    /// Information used to batch-download blocks.
132
    sync_state: SyncState,
133

134
    /// Information about potential peers for new connections.
135
    potential_peers: PotentialPeersState,
136

137
    /// A list of joinhandles to spawned tasks.
138
    task_handles: Vec<JoinHandle<()>>,
139

140
    /// A joinhandle to a task performing transaction-proof upgrades.
141
    proof_upgrader_task: Option<JoinHandle<()>>,
142

143
    /// A joinhandle to a task running the update of the mempool transactions.
144
    update_mempool_txs_handle: Option<JoinHandle<()>>,
145

146
    /// A channel that the task updating mempool transactions can use to
147
    /// communicate its result.
148
    update_mempool_receiver: mpsc::Receiver<Vec<Transaction>>,
149
}
150

151
impl MutableMainLoopState {
152
    fn new(task_handles: Vec<JoinHandle<()>>) -> Self {
6✔
153
        let (_dummy_sender, dummy_receiver) =
6✔
154
            mpsc::channel::<Vec<Transaction>>(TX_UPDATER_CHANNEL_CAPACITY);
6✔
155
        Self {
6✔
156
            sync_state: SyncState::default(),
6✔
157
            potential_peers: PotentialPeersState::default(),
6✔
158
            task_handles,
6✔
159
            proof_upgrader_task: None,
6✔
160
            update_mempool_txs_handle: None,
6✔
161
            update_mempool_receiver: dummy_receiver,
6✔
162
        }
6✔
163
    }
6✔
164
}
165

166
/// handles batch-downloading of blocks if we are more than n blocks behind
167
#[derive(Default, Debug)]
168
struct SyncState {
169
    peer_sync_states: HashMap<SocketAddr, PeerSynchronizationState>,
170
    last_sync_request: Option<(SystemTime, BlockHeight, SocketAddr)>,
171
}
172

173
impl SyncState {
174
    fn record_request(
1✔
175
        &mut self,
1✔
176
        requested_block_height: BlockHeight,
1✔
177
        peer: SocketAddr,
1✔
178
        now: SystemTime,
1✔
179
    ) {
1✔
180
        self.last_sync_request = Some((now, requested_block_height, peer));
1✔
181
    }
1✔
182

183
    /// Return a list of peers that have reported to be in possession of blocks
184
    /// with a PoW above a threshold.
185
    fn get_potential_peers_for_sync_request(&self, threshold_pow: ProofOfWork) -> Vec<SocketAddr> {
2✔
186
        self.peer_sync_states
2✔
187
            .iter()
2✔
188
            .filter(|(_sa, sync_state)| sync_state.claimed_max_pow > threshold_pow)
2✔
189
            .map(|(sa, _)| *sa)
2✔
190
            .collect()
2✔
191
    }
2✔
192

193
    /// Determine if a peer should be sanctioned for failing to respond to a
194
    /// synchronization request fast enough. Also determine if a new request
195
    /// should be made or the previous one should be allowed to run for longer.
196
    ///
197
    /// Returns (peer to be sanctioned, attempt new request).
198
    fn get_status_of_last_request(
1✔
199
        &self,
1✔
200
        current_block_height: BlockHeight,
1✔
201
        now: SystemTime,
1✔
202
    ) -> (Option<SocketAddr>, bool) {
1✔
203
        // A peer is sanctioned if no answer has been received after N times the sync request
1✔
204
        // interval.
1✔
205
        match self.last_sync_request {
1✔
206
            None => {
207
                // No sync request has been made since startup of program
208
                (None, true)
1✔
209
            }
UNCOV
210
            Some((req_time, requested_height, peer_sa)) => {
×
UNCOV
211
                if requested_height < current_block_height {
×
212
                    // The last sync request updated the state
UNCOV
213
                    (None, true)
×
214
                } else if req_time
×
215
                    + Duration::from_secs(INDIVIDUAL_PEER_SYNCHRONIZATION_TIMEOUT_IN_SECONDS)
×
UNCOV
216
                    < now
×
217
                {
218
                    // The last sync request was not answered, sanction peer
219
                    // and make a new sync request.
220
                    (Some(peer_sa), true)
×
221
                } else {
222
                    // The last sync request has not yet been answered. But it has
223
                    // not timed out yet.
224
                    (None, false)
×
225
                }
226
            }
227
        }
228
    }
1✔
229
}
230

231
/// holds information about a potential peer in the process of peer discovery
232
struct PotentialPeerInfo {
233
    _reported: SystemTime,
234
    _reported_by: SocketAddr,
235
    instance_id: u128,
236
    distance: u8,
237
}
238

239
impl PotentialPeerInfo {
UNCOV
240
    fn new(reported_by: SocketAddr, instance_id: u128, distance: u8, now: SystemTime) -> Self {
×
UNCOV
241
        Self {
×
UNCOV
242
            _reported: now,
×
UNCOV
243
            _reported_by: reported_by,
×
UNCOV
244
            instance_id,
×
UNCOV
245
            distance,
×
UNCOV
246
        }
×
UNCOV
247
    }
×
248
}
249

250
/// holds information about a set of potential peers in the process of peer discovery
251
struct PotentialPeersState {
252
    potential_peers: HashMap<SocketAddr, PotentialPeerInfo>,
253
}
254

255
impl PotentialPeersState {
256
    fn default() -> Self {
6✔
257
        Self {
6✔
258
            potential_peers: HashMap::new(),
6✔
259
        }
6✔
260
    }
6✔
261

UNCOV
262
    fn add(
×
UNCOV
263
        &mut self,
×
UNCOV
264
        reported_by: SocketAddr,
×
UNCOV
265
        potential_peer: (SocketAddr, u128),
×
UNCOV
266
        max_peers: usize,
×
UNCOV
267
        distance: u8,
×
UNCOV
268
        now: SystemTime,
×
UNCOV
269
    ) {
×
UNCOV
270
        let potential_peer_socket_address = potential_peer.0;
×
UNCOV
271
        let potential_peer_instance_id = potential_peer.1;
×
UNCOV
272

×
UNCOV
273
        // This check *should* make it likely that a potential peer is always
×
UNCOV
274
        // registered with the lowest observed distance.
×
UNCOV
275
        if self
×
UNCOV
276
            .potential_peers
×
UNCOV
277
            .contains_key(&potential_peer_socket_address)
×
278
        {
279
            return;
×
280
        }
×
281

×
282
        // If this data structure is full, remove a random entry. Then add this.
×
UNCOV
283
        if self.potential_peers.len()
×
UNCOV
284
            > max_peers * POTENTIAL_PEER_MAX_COUNT_AS_A_FACTOR_OF_MAX_PEERS
×
UNCOV
285
        {
×
UNCOV
286
            let mut rng = rand::rng();
×
UNCOV
287
            let random_potential_peer = self
×
UNCOV
288
                .potential_peers
×
289
                .keys()
×
290
                .choose(&mut rng)
×
291
                .unwrap()
×
UNCOV
292
                .to_owned();
×
293
            self.potential_peers.remove(&random_potential_peer);
×
294
        }
×
295

296
        let insert_value =
×
UNCOV
297
            PotentialPeerInfo::new(reported_by, potential_peer_instance_id, distance, now);
×
UNCOV
298
        self.potential_peers
×
UNCOV
299
            .insert(potential_peer_socket_address, insert_value);
×
UNCOV
300
    }
×
301

302
    /// Return a peer from the potential peer list that we aren't connected to
303
    /// and  that isn't our own address.
304
    ///
305
    /// Favors peers with a high distance and with IPs that we are not already
306
    /// connected to.
307
    ///
308
    /// Returns (socket address, peer distance)
309
    fn get_candidate(
1✔
310
        &self,
1✔
311
        connected_clients: &[PeerInfo],
1✔
312
        own_instance_id: u128,
1✔
313
    ) -> Option<(SocketAddr, u8)> {
1✔
314
        let peers_instance_ids: Vec<u128> =
1✔
315
            connected_clients.iter().map(|x| x.instance_id()).collect();
2✔
316

1✔
317
        // Only pick those peers that report a listening port
1✔
318
        let peers_listen_addresses: Vec<SocketAddr> = connected_clients
1✔
319
            .iter()
1✔
320
            .filter_map(|x| x.listen_address())
2✔
321
            .collect();
1✔
322

1✔
323
        // Find the appropriate candidates
1✔
324
        let candidates = self
1✔
325
            .potential_peers
1✔
326
            .iter()
1✔
327
            // Prevent connecting to self. Note that we *only* use instance ID to prevent this,
1✔
328
            // meaning this will allow multiple nodes e.g. running on the same computer to form
1✔
329
            // a complete graph.
1✔
330
            .filter(|pp| pp.1.instance_id != own_instance_id)
1✔
331
            // Prevent connecting to peer we already are connected to
1✔
332
            .filter(|potential_peer| !peers_instance_ids.contains(&potential_peer.1.instance_id))
1✔
333
            .filter(|potential_peer| !peers_listen_addresses.contains(potential_peer.0))
1✔
334
            .collect::<Vec<_>>();
1✔
335

1✔
336
        // Prefer candidates with IPs that we are not already connected to but
1✔
337
        // connect to repeated IPs in case we don't have other options, as
1✔
338
        // repeated IPs may just be multiple machines on the same NAT'ed IPv4
1✔
339
        // address.
1✔
340
        let mut connected_ips = peers_listen_addresses.into_iter().map(|x| x.ip());
1✔
341
        let candidates = if candidates
1✔
342
            .iter()
1✔
343
            .any(|candidate| !connected_ips.contains(&candidate.0.ip()))
1✔
344
        {
UNCOV
345
            candidates
×
UNCOV
346
                .into_iter()
×
UNCOV
347
                .filter(|candidate| !connected_ips.contains(&candidate.0.ip()))
×
UNCOV
348
                .collect()
×
349
        } else {
350
            candidates
1✔
351
        };
352

353
        // Get the candidate list with the highest distance
354
        let max_distance_candidates = candidates.iter().max_by_key(|pp| pp.1.distance);
1✔
355

1✔
356
        // Pick a random candidate from the appropriate candidates
1✔
357
        let mut rng = rand::rng();
1✔
358
        max_distance_candidates
1✔
359
            .iter()
1✔
360
            .choose(&mut rng)
1✔
361
            .map(|x| (x.0.to_owned(), x.1.distance))
1✔
362
    }
1✔
363
}
364

365
/// Return a boolean indicating if synchronization mode should be left
UNCOV
366
fn stay_in_sync_mode(
×
UNCOV
367
    own_block_tip_header: &BlockHeader,
×
UNCOV
368
    sync_state: &SyncState,
×
UNCOV
369
    sync_mode_threshold: usize,
×
UNCOV
370
) -> bool {
×
UNCOV
371
    let max_claimed_pow = sync_state
×
UNCOV
372
        .peer_sync_states
×
UNCOV
373
        .values()
×
UNCOV
374
        .max_by_key(|x| x.claimed_max_pow);
×
UNCOV
375
    match max_claimed_pow {
×
UNCOV
376
        None => false, // No peer have passed the sync challenge phase.
×
377

378
        // Synchronization is left when the remaining number of block is half of what has
379
        // been indicated to fit into RAM
UNCOV
380
        Some(max_claim) => {
×
UNCOV
381
            own_block_tip_header.cumulative_proof_of_work < max_claim.claimed_max_pow
×
UNCOV
382
                && max_claim.claimed_max_height - own_block_tip_header.height
×
UNCOV
383
                    > sync_mode_threshold as i128 / 2
×
384
        }
385
    }
UNCOV
386
}
×
387

388
impl MainLoopHandler {
389
    pub(crate) fn new(
8✔
390
        incoming_peer_listener: TcpListener,
8✔
391
        global_state_lock: GlobalStateLock,
8✔
392
        main_to_peer_broadcast_tx: broadcast::Sender<MainToPeerTask>,
8✔
393
        peer_task_to_main_tx: mpsc::Sender<PeerTaskToMain>,
8✔
394
        main_to_miner_tx: mpsc::Sender<MainToMiner>,
8✔
395
    ) -> Self {
8✔
396
        let maybe_main_to_miner_tx = if global_state_lock.cli().mine() {
8✔
397
            Some(main_to_miner_tx)
×
398
        } else {
399
            None
8✔
400
        };
401
        Self {
8✔
402
            incoming_peer_listener,
8✔
403
            global_state_lock,
8✔
404
            main_to_miner_tx: MainToMinerChannel(maybe_main_to_miner_tx),
8✔
405
            main_to_peer_broadcast_tx,
8✔
406
            peer_task_to_main_tx,
8✔
407
            #[cfg(test)]
8✔
408
            mock_now: None,
8✔
409
        }
8✔
410
    }
8✔
411

412
    /// Allows for mocked timestamps such that time dependencies may be tested.
413
    #[cfg(test)]
414
    fn with_mocked_time(mut self, mocked_time: SystemTime) -> Self {
3✔
415
        self.mock_now = Some(mocked_time);
3✔
416
        self
3✔
417
    }
3✔
418

419
    fn now(&self) -> SystemTime {
7✔
420
        #[cfg(not(test))]
7✔
421
        {
7✔
422
            SystemTime::now()
7✔
423
        }
7✔
424
        #[cfg(test)]
7✔
425
        {
7✔
426
            self.mock_now.unwrap_or(SystemTime::now())
7✔
427
        }
7✔
428
    }
7✔
429

430
    /// Run a list of Triton VM prover jobs that update the mutator set state
431
    /// for transactions.
432
    ///
433
    /// Sends the result back through the provided channel.
UNCOV
434
    async fn update_mempool_jobs(
×
UNCOV
435
        update_jobs: Vec<UpdateMutatorSetDataJob>,
×
UNCOV
436
        job_queue: &TritonVmJobQueue,
×
UNCOV
437
        transaction_update_sender: mpsc::Sender<Vec<Transaction>>,
×
UNCOV
438
        proof_job_options: TritonVmProofJobOptions,
×
UNCOV
439
    ) {
×
UNCOV
440
        debug!(
×
441
            "Attempting to update transaction proofs of {} transactions",
×
UNCOV
442
            update_jobs.len()
×
443
        );
UNCOV
444
        let mut result = vec![];
×
UNCOV
445
        for job in update_jobs {
×
446
            // Jobs for updating txs in the mempool have highest priority since
447
            // they block the composer from continuing.
448
            // TODO: Handle errors better here.
UNCOV
449
            let job_result = job
×
UNCOV
450
                .upgrade(job_queue, proof_job_options.clone())
×
UNCOV
451
                .await
×
UNCOV
452
                .unwrap();
×
UNCOV
453
            result.push(job_result);
×
454
        }
455

UNCOV
456
        transaction_update_sender
×
UNCOV
457
            .send(result)
×
UNCOV
458
            .await
×
UNCOV
459
            .expect("Receiver for updated txs in main loop must still exist");
×
UNCOV
460
    }
×
461

462
    /// Handles a list of transactions whose proof has been updated with new
463
    /// mutator set data.
UNCOV
464
    async fn handle_updated_mempool_txs(&mut self, updated_txs: Vec<Transaction>) {
×
465
        // Update mempool with updated transactions
466
        {
UNCOV
467
            let mut state = self.global_state_lock.lock_guard_mut().await;
×
UNCOV
468
            for updated in &updated_txs {
×
UNCOV
469
                let txid = updated.kernel.txid();
×
UNCOV
470
                if let Some(tx) = state.mempool.get_mut(txid) {
×
UNCOV
471
                    *tx = updated.to_owned();
×
UNCOV
472
                } else {
×
UNCOV
473
                    warn!("Updated transaction which is no longer in mempool");
×
474
                }
475
            }
476
        }
477

478
        // Then notify all peers
479
        for updated in updated_txs {
×
480
            self.main_to_peer_broadcast_tx
×
481
                .send(MainToPeerTask::TransactionNotification(
×
482
                    (&updated).try_into().unwrap(),
×
483
                ))
×
484
                .unwrap();
×
485
        }
×
486

487
        // Tell miner that it can now start composing next block.
488
        self.main_to_miner_tx.send(MainToMiner::Continue);
×
489
    }
×
490

491
    /// Process a block whose PoW solution was solved by this client (or an
492
    /// external program) and has not been seen by the rest of the network yet.
493
    ///
494
    /// Shares block with all connected peers, updates own state, and updates
495
    /// any mempool transactions to be valid under this new block.
496
    ///
497
    /// Locking:
498
    ///  * acquires `global_state_lock` for write
499
    async fn handle_self_guessed_block(
1✔
500
        &mut self,
1✔
501
        main_loop_state: &mut MutableMainLoopState,
1✔
502
        new_block: Box<Block>,
1✔
503
    ) -> Result<()> {
1✔
504
        let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
1✔
505

506
        if !global_state_mut.incoming_block_is_more_canonical(&new_block) {
1✔
UNCOV
507
            drop(global_state_mut); // don't hold across send()
×
508
            warn!("Got new block from miner that was not child of tip. Discarding.");
×
UNCOV
509
            self.main_to_miner_tx.send(MainToMiner::Continue);
×
UNCOV
510
            return Ok(());
×
511
        }
1✔
512
        info!("Locally-mined block is new tip: {}", new_block.hash());
1✔
513

514
        // Share block with peers first thing.
515
        info!("broadcasting new block to peers");
1✔
516
        self.main_to_peer_broadcast_tx
1✔
517
            .send(MainToPeerTask::Block(new_block.clone()))
1✔
518
            .expect("Peer handler broadcast channel prematurely closed.");
1✔
519

520
        let update_jobs = global_state_mut.set_new_tip(*new_block).await?;
1✔
521
        drop(global_state_mut);
1✔
522

1✔
523
        self.spawn_mempool_txs_update_job(main_loop_state, update_jobs);
1✔
524

1✔
525
        Ok(())
1✔
526
    }
1✔
527

528
    /// Locking:
529
    ///   * acquires `global_state_lock` for write
UNCOV
530
    async fn handle_miner_task_message(
×
UNCOV
531
        &mut self,
×
532
        msg: MinerToMain,
×
533
        main_loop_state: &mut MutableMainLoopState,
×
UNCOV
534
    ) -> Result<Option<i32>> {
×
UNCOV
535
        match msg {
×
UNCOV
536
            MinerToMain::NewBlockFound(new_block_info) => {
×
UNCOV
537
                log_slow_scope!(fn_name!() + "::MinerToMain::NewBlockFound");
×
UNCOV
538

×
UNCOV
539
                let new_block = new_block_info.block;
×
UNCOV
540

×
UNCOV
541
                info!("Miner found new block: {}", new_block.kernel.header.height);
×
UNCOV
542
                self.handle_self_guessed_block(main_loop_state, new_block)
×
UNCOV
543
                    .await?;
×
544
            }
UNCOV
545
            MinerToMain::BlockProposal(boxed_proposal) => {
×
UNCOV
546
                let (block, expected_utxos) = *boxed_proposal;
×
547

548
                // If block proposal from miner does not build on current tip,
549
                // don't broadcast it. This check covers reorgs as well.
UNCOV
550
                let current_tip = self
×
551
                    .global_state_lock
×
552
                    .lock_guard()
×
553
                    .await
×
554
                    .chain
UNCOV
555
                    .light_state()
×
UNCOV
556
                    .clone();
×
UNCOV
557
                if block.header().prev_block_digest != current_tip.hash() {
×
UNCOV
558
                    warn!(
×
UNCOV
559
                        "Got block proposal from miner that does not build on current tip. \
×
UNCOV
560
                           Rejecting. If this happens a lot, then maybe this machine is too \
×
UNCOV
561
                           slow to competitively compose blocks. Consider running the client only \
×
UNCOV
562
                           with the guesser flag set and not the compose flag."
×
563
                    );
UNCOV
564
                    self.main_to_miner_tx.send(MainToMiner::Continue);
×
UNCOV
565
                    return Ok(None);
×
UNCOV
566
                }
×
UNCOV
567

×
UNCOV
568
                // Ensure proposal validity before sharing
×
UNCOV
569
                if !block.is_valid(&current_tip, block.header().timestamp).await {
×
UNCOV
570
                    error!("Own block proposal invalid. This should not happen.");
×
UNCOV
571
                    self.main_to_miner_tx.send(MainToMiner::Continue);
×
UNCOV
572
                    return Ok(None);
×
UNCOV
573
                }
×
574

×
575
                if !self.global_state_lock.cli().secret_compositions {
×
576
                    self.main_to_peer_broadcast_tx
×
577
                    .send(MainToPeerTask::BlockProposalNotification((&block).into()))
×
578
                    .expect(
×
579
                        "Peer handler broadcast channel prematurely closed. This should never happen.",
×
580
                    );
×
581
                }
×
582

583
                {
584
                    // Use block proposal and add expected UTXOs from this
585
                    // proposal.
586
                    let mut state = self.global_state_lock.lock_guard_mut().await;
×
587
                    state.mining_state.block_proposal =
×
UNCOV
588
                        BlockProposal::own_proposal(block.clone(), expected_utxos.clone());
×
589
                    state.wallet_state.add_expected_utxos(expected_utxos).await;
×
590
                }
591

592
                // Indicate to miner that block proposal was successfully
593
                // received by main-loop.
594
                self.main_to_miner_tx.send(MainToMiner::Continue);
×
595
            }
596
            MinerToMain::Shutdown(exit_code) => {
×
597
                return Ok(Some(exit_code));
×
598
            }
599
        }
600

601
        Ok(None)
×
602
    }
×
603

604
    /// Locking:
605
    ///   * acquires `global_state_lock` for write
606
    async fn handle_peer_task_message(
1✔
607
        &mut self,
1✔
608
        msg: PeerTaskToMain,
1✔
609
        main_loop_state: &mut MutableMainLoopState,
1✔
610
    ) -> Result<()> {
1✔
611
        debug!("Received {} from a peer task", msg.get_type());
1✔
612
        let cli_args = self.global_state_lock.cli().clone();
1✔
613
        match msg {
1✔
614
            PeerTaskToMain::NewBlocks(blocks) => {
×
615
                log_slow_scope!(fn_name!() + "::PeerTaskToMain::NewBlocks");
×
616

×
617
                let last_block = blocks.last().unwrap().to_owned();
×
618
                let update_jobs = {
×
619
                    // The peer tasks also check this condition, if block is more canonical than current
620
                    // tip, but we have to check it again since the block update might have already been applied
621
                    // through a message from another peer (or from own miner).
622
                    let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
×
623
                    let new_canonical =
×
624
                        global_state_mut.incoming_block_is_more_canonical(&last_block);
×
625

×
UNCOV
626
                    if !new_canonical {
×
627
                        // The blocks are not canonical, but: if we are in sync
628
                        // mode and these blocks beat our current champion, then
629
                        // we store them anyway, without marking them as tip.
630
                        let Some(sync_anchor) = global_state_mut.net.sync_anchor.as_mut() else {
×
631
                            warn!(
×
632
                                "Blocks were not new, and we're not syncing. Not storing blocks."
×
633
                            );
UNCOV
634
                            return Ok(());
×
635
                        };
UNCOV
636
                        if sync_anchor
×
UNCOV
637
                            .champion
×
638
                            .is_some_and(|(height, _)| height >= last_block.header().height)
×
639
                        {
640
                            warn!("Repeated blocks received in sync mode, not storing");
×
641
                            return Ok(());
×
UNCOV
642
                        }
×
UNCOV
643

×
UNCOV
644
                        sync_anchor.catch_up(last_block.header().height, last_block.hash());
×
645

646
                        for block in blocks {
×
UNCOV
647
                            global_state_mut.store_block_not_tip(block).await?;
×
648
                        }
649

UNCOV
650
                        return Ok(());
×
UNCOV
651
                    }
×
UNCOV
652

×
UNCOV
653
                    info!(
×
UNCOV
654
                        "Last block from peer is new canonical tip: {}; height: {}",
×
UNCOV
655
                        last_block.hash(),
×
UNCOV
656
                        last_block.header().height
×
657
                    );
658

659
                    // Ask miner to stop work until state update is completed
660
                    self.main_to_miner_tx.send(MainToMiner::WaitForContinue);
×
661

×
662
                    // Get out of sync mode if needed
×
UNCOV
663
                    if global_state_mut.net.sync_anchor.is_some() {
×
UNCOV
664
                        let stay_in_sync_mode = stay_in_sync_mode(
×
UNCOV
665
                            &last_block.kernel.header,
×
666
                            &main_loop_state.sync_state,
×
667
                            cli_args.sync_mode_threshold,
×
668
                        );
×
669
                        if !stay_in_sync_mode {
×
670
                            info!("Exiting sync mode");
×
UNCOV
671
                            global_state_mut.net.sync_anchor = None;
×
UNCOV
672
                            self.main_to_miner_tx.send(MainToMiner::StopSyncing);
×
UNCOV
673
                        }
×
674
                    }
×
675

676
                    let mut update_jobs: Vec<UpdateMutatorSetDataJob> = vec![];
×
UNCOV
677
                    for new_block in blocks {
×
678
                        debug!(
×
UNCOV
679
                            "Storing block {} in database. Height: {}, Mined: {}",
×
680
                            new_block.hash(),
×
681
                            new_block.kernel.header.height,
×
682
                            new_block.kernel.header.timestamp.standard_format()
×
683
                        );
684

685
                        // Potential race condition here.
686
                        // What if last block is new and canonical, but first
687
                        // block is already known then we'll store the same block
688
                        // twice. That should be OK though, as the appropriate
689
                        // database entries are simply overwritten with the new
690
                        // block info. See the
691
                        // [GlobalState::test::setting_same_tip_twice_is_allowed]
692
                        // test for a test of this phenomenon.
693

694
                        let update_jobs_ = global_state_mut.set_new_tip(new_block).await?;
×
695
                        update_jobs.extend(update_jobs_);
×
696
                    }
697

698
                    update_jobs
×
699
                };
×
700

×
UNCOV
701
                // Inform all peers about new block
×
UNCOV
702
                self.main_to_peer_broadcast_tx
×
UNCOV
703
                    .send(MainToPeerTask::Block(Box::new(last_block.clone())))
×
704
                    .expect("Peer handler broadcast was closed. This should never happen");
×
705

×
706
                // Spawn task to handle mempool tx-updating after new blocks.
×
707
                // TODO: Do clever trick to collapse all jobs relating to the same transaction,
×
708
                //       identified by transaction-ID, into *one* update job.
×
709
                self.spawn_mempool_txs_update_job(main_loop_state, update_jobs);
×
710

×
711
                // Inform miner about new block.
×
712
                self.main_to_miner_tx.send(MainToMiner::NewBlock);
×
713
            }
714
            PeerTaskToMain::AddPeerMaxBlockHeight {
715
                peer_address,
×
716
                claimed_height,
×
717
                claimed_cumulative_pow,
×
718
                claimed_block_mmra,
×
UNCOV
719
            } => {
×
720
                log_slow_scope!(fn_name!() + "::PeerTaskToMain::AddPeerMaxBlockHeight");
×
721

×
722
                let claimed_state =
×
723
                    PeerSynchronizationState::new(claimed_height, claimed_cumulative_pow);
×
724
                main_loop_state
×
725
                    .sync_state
×
726
                    .peer_sync_states
×
UNCOV
727
                    .insert(peer_address, claimed_state);
×
728

729
                // Check if synchronization mode should be activated.
730
                // Synchronization mode is entered if accumulated PoW exceeds
731
                // our tip and if the height difference is positive and beyond
732
                // a threshold value.
UNCOV
733
                let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
×
UNCOV
734
                if global_state_mut.sync_mode_criterion(claimed_height, claimed_cumulative_pow)
×
UNCOV
735
                    && global_state_mut
×
UNCOV
736
                        .net
×
UNCOV
737
                        .sync_anchor
×
738
                        .as_ref()
×
739
                        .is_none_or(|sa| sa.cumulative_proof_of_work < claimed_cumulative_pow)
×
740
                {
UNCOV
741
                    info!(
×
742
                        "Entering synchronization mode due to peer {} indicating tip height {}; cumulative pow: {:?}",
×
743
                        peer_address, claimed_height, claimed_cumulative_pow
744
                    );
745
                    global_state_mut.net.sync_anchor =
×
746
                        Some(SyncAnchor::new(claimed_cumulative_pow, claimed_block_mmra));
×
747
                    self.main_to_miner_tx.send(MainToMiner::StartSyncing);
×
748
                }
×
749
            }
750
            PeerTaskToMain::RemovePeerMaxBlockHeight(socket_addr) => {
×
751
                log_slow_scope!(fn_name!() + "::PeerTaskToMain::RemovePeerMaxBlockHeight");
×
752

×
753
                debug!(
×
754
                    "Removing max block height from sync data structure for peer {}",
×
755
                    socket_addr
756
                );
UNCOV
757
                main_loop_state
×
UNCOV
758
                    .sync_state
×
759
                    .peer_sync_states
×
760
                    .remove(&socket_addr);
×
761

762
                // Get out of sync mode if needed.
763
                let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
×
764

765
                if global_state_mut.net.sync_anchor.is_some() {
×
766
                    let stay_in_sync_mode = stay_in_sync_mode(
×
767
                        global_state_mut.chain.light_state().header(),
×
768
                        &main_loop_state.sync_state,
×
769
                        cli_args.sync_mode_threshold,
×
770
                    );
×
771
                    if !stay_in_sync_mode {
×
UNCOV
772
                        info!("Exiting sync mode");
×
UNCOV
773
                        global_state_mut.net.sync_anchor = None;
×
UNCOV
774
                    }
×
UNCOV
775
                }
×
776
            }
777
            PeerTaskToMain::PeerDiscoveryAnswer((pot_peers, reported_by, distance)) => {
×
778
                log_slow_scope!(fn_name!() + "::PeerTaskToMain::PeerDiscoveryAnswer");
×
779

×
780
                let max_peers = self.global_state_lock.cli().max_num_peers;
×
781
                for pot_peer in pot_peers {
×
782
                    main_loop_state.potential_peers.add(
×
783
                        reported_by,
×
UNCOV
784
                        pot_peer,
×
785
                        max_peers,
×
786
                        distance,
×
UNCOV
787
                        self.now(),
×
UNCOV
788
                    );
×
789
                }
×
790
            }
791
            PeerTaskToMain::Transaction(pt2m_transaction) => {
×
792
                log_slow_scope!(fn_name!() + "::PeerTaskToMain::Transaction");
×
UNCOV
793

×
794
                debug!(
×
795
                    "`peer_loop` received following transaction from peer. {} inputs, {} outputs. Synced to mutator set hash: {}",
×
796
                    pt2m_transaction.transaction.kernel.inputs.len(),
×
797
                    pt2m_transaction.transaction.kernel.outputs.len(),
×
798
                    pt2m_transaction.transaction.kernel.mutator_set_hash
×
799
                );
800

801
                let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
×
802
                if pt2m_transaction.confirmable_for_block
×
803
                    != global_state_mut.chain.light_state().hash()
×
804
                {
UNCOV
805
                    warn!("main loop got unmined transaction with bad mutator set data, discarding transaction");
×
UNCOV
806
                    return Ok(());
×
807
                }
×
UNCOV
808

×
809
                // Insert into mempool
×
810
                global_state_mut
×
811
                    .mempool_insert(
×
812
                        pt2m_transaction.transaction.to_owned(),
×
813
                        TransactionOrigin::Foreign,
×
814
                    )
×
815
                    .await;
×
816

817
                // send notification to peers
818
                let transaction_notification: TransactionNotification =
×
819
                    (&pt2m_transaction.transaction).try_into()?;
×
UNCOV
820
                self.main_to_peer_broadcast_tx
×
821
                    .send(MainToPeerTask::TransactionNotification(
×
822
                        transaction_notification,
×
823
                    ))?;
×
824
            }
825
            PeerTaskToMain::BlockProposal(block) => {
×
826
                log_slow_scope!(fn_name!() + "::PeerTaskToMain::BlockProposal");
×
UNCOV
827

×
828
                debug!("main loop received block proposal from peer loop");
×
829

830
                // Due to race-conditions, we need to verify that this
831
                // block proposal is still the immediate child of tip. If it is,
832
                // and it has a higher guesser fee than what we're currently
833
                // working on, then we switch to this, and notify the miner to
834
                // mine on this new block. We don't need to verify the block's
835
                // validity, since that was done in peer loop.
836
                // To ensure atomicity, a write-lock must be held over global
837
                // state while we check if this proposal is favorable.
838
                {
839
                    info!("Received new favorable block proposal for mining operation.");
×
840
                    let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
×
UNCOV
841
                    let verdict = global_state_mut.favor_incoming_block_proposal(
×
842
                        block.header().height,
×
843
                        block.total_guesser_reward(),
×
844
                    );
×
845
                    if let Err(reject_reason) = verdict {
×
846
                        warn!("main loop got unfavorable block proposal. Reason: {reject_reason}");
×
847
                        return Ok(());
×
848
                    }
×
849

×
850
                    global_state_mut.mining_state.block_proposal =
×
851
                        BlockProposal::foreign_proposal(*block.clone());
×
852
                }
×
UNCOV
853

×
UNCOV
854
                // Notify all peers of the block proposal we just accepted
×
855
                self.main_to_peer_broadcast_tx
×
856
                    .send(MainToPeerTask::BlockProposalNotification((&*block).into()))?;
×
857

858
                self.main_to_miner_tx.send(MainToMiner::NewBlockProposal);
×
859
            }
860
            PeerTaskToMain::DisconnectFromLongestLivedPeer => {
861
                let global_state = self.global_state_lock.lock_guard().await;
1✔
862

863
                // get all peers
864
                let all_peers = global_state.net.peer_map.iter();
1✔
865

1✔
866
                // filter out CLI peers
1✔
867
                let disconnect_candidates =
1✔
868
                    all_peers.filter(|p| !global_state.cli_peers().contains(p.0));
5✔
869

1✔
870
                // find the one with the oldest connection
1✔
871
                let longest_lived_peer = disconnect_candidates.min_by(
1✔
872
                    |(_socket_address_left, peer_info_left),
1✔
873
                     (_socket_address_right, peer_info_right)| {
4✔
874
                        peer_info_left
4✔
875
                            .connection_established()
4✔
876
                            .cmp(&peer_info_right.connection_established())
4✔
877
                    },
4✔
878
                );
1✔
879

880
                // tell to disconnect
881
                if let Some((peer_socket, _peer_info)) = longest_lived_peer {
1✔
882
                    self.main_to_peer_broadcast_tx
1✔
883
                        .send(MainToPeerTask::Disconnect(peer_socket.to_owned()))?;
1✔
884
                }
×
885
            }
886
        }
887

888
        Ok(())
1✔
889
    }
1✔
890

891
    /// If necessary, disconnect from peers.
892
    ///
893
    /// While a reasonable effort is made to never have more connections than
894
    /// [`max_num_peers`](crate::config_models::cli_args::Args::max_num_peers),
895
    /// this is not guaranteed. For example, bootstrap nodes temporarily allow a
896
    /// surplus of incoming connections to provide their service more reliably.
897
    ///
898
    /// Never disconnects peers listed as CLI arguments.
899
    ///
900
    /// Locking:
901
    ///   * acquires `global_state_lock` for read
902
    async fn prune_peers(&self) -> Result<()> {
2✔
903
        // fetch all relevant info from global state; don't hold the lock
2✔
904
        let cli_args = self.global_state_lock.cli();
2✔
905
        let connected_peers = self
2✔
906
            .global_state_lock
2✔
907
            .lock_guard()
2✔
908
            .await
2✔
909
            .net
910
            .peer_map
911
            .values()
2✔
912
            .cloned()
2✔
913
            .collect_vec();
2✔
914

2✔
915
        let num_peers = connected_peers.len();
2✔
916
        let max_num_peers = cli_args.max_num_peers;
2✔
917
        if num_peers <= max_num_peers {
2✔
918
            debug!("No need to prune any peer connections.");
1✔
919
            return Ok(());
1✔
920
        }
1✔
921
        warn!("Connected to {num_peers} peers, which exceeds the maximum ({max_num_peers}).");
1✔
922

923
        // If all connections are outbound, it's OK to exceed the max.
924
        if connected_peers.iter().all(|p| p.connection_is_outbound()) {
1✔
UNCOV
925
            warn!("Not disconnecting from any peer because all connections are outbound.");
×
UNCOV
926
            return Ok(());
×
927
        }
1✔
928

1✔
929
        let num_peers_to_disconnect = num_peers - max_num_peers;
1✔
930
        let peers_to_disconnect = connected_peers
1✔
931
            .into_iter()
1✔
932
            .filter(|peer| !cli_args.peers.contains(&peer.connected_address()))
14✔
933
            .choose_multiple(&mut rand::rng(), num_peers_to_disconnect);
1✔
934
        match peers_to_disconnect.len() {
1✔
UNCOV
935
            0 => warn!("Not disconnecting from any peer because of manual override."),
×
936
            i => info!("Disconnecting from {i} peers."),
1✔
937
        }
938
        for peer in peers_to_disconnect {
5✔
939
            self.main_to_peer_broadcast_tx
4✔
940
                .send(MainToPeerTask::Disconnect(peer.connected_address()))?;
4✔
941
        }
942

943
        Ok(())
1✔
944
    }
2✔
945

946
    /// If necessary, reconnect to the peers listed as CLI arguments.
947
    ///
948
    /// Locking:
949
    ///   * acquires `global_state_lock` for read
UNCOV
950
    async fn reconnect(&self, main_loop_state: &mut MutableMainLoopState) -> Result<()> {
×
UNCOV
951
        let connected_peers = self
×
UNCOV
952
            .global_state_lock
×
UNCOV
953
            .lock_guard()
×
UNCOV
954
            .await
×
955
            .net
956
            .peer_map
UNCOV
957
            .keys()
×
UNCOV
958
            .copied()
×
UNCOV
959
            .collect_vec();
×
UNCOV
960
        let peers_with_lost_connection = self
×
UNCOV
961
            .global_state_lock
×
962
            .cli()
×
963
            .peers
×
UNCOV
964
            .iter()
×
UNCOV
965
            .filter(|peer| !connected_peers.contains(peer));
×
UNCOV
966

×
UNCOV
967
        // If no connection was lost, there's nothing to do.
×
UNCOV
968
        if peers_with_lost_connection.clone().count() == 0 {
×
UNCOV
969
            return Ok(());
×
UNCOV
970
        }
×
971

972
        // Else, try to reconnect.
UNCOV
973
        let own_handshake_data = self
×
UNCOV
974
            .global_state_lock
×
UNCOV
975
            .lock_guard()
×
UNCOV
976
            .await
×
UNCOV
977
            .get_own_handshakedata();
×
UNCOV
978
        for &peer_with_lost_connection in peers_with_lost_connection {
×
979
            // Disallow reconnection if peer is in bad standing
UNCOV
980
            let peer_standing = self
×
UNCOV
981
                .global_state_lock
×
UNCOV
982
                .lock_guard()
×
UNCOV
983
                .await
×
984
                .net
UNCOV
985
                .get_peer_standing_from_database(peer_with_lost_connection.ip())
×
UNCOV
986
                .await;
×
987
            if peer_standing.is_some_and(|standing| standing.is_bad()) {
×
988
                info!("Not reconnecting to peer in bad standing: {peer_with_lost_connection}");
×
989
                continue;
×
990
            }
×
991

×
UNCOV
992
            info!("Attempting to reconnect to peer: {peer_with_lost_connection}");
×
UNCOV
993
            let global_state_lock = self.global_state_lock.clone();
×
994
            let main_to_peer_broadcast_rx = self.main_to_peer_broadcast_tx.subscribe();
×
995
            let peer_task_to_main_tx = self.peer_task_to_main_tx.to_owned();
×
996
            let own_handshake_data = own_handshake_data.clone();
×
997
            let outgoing_connection_task = tokio::task::Builder::new()
×
998
                .name("call_peer_wrapper_1")
×
999
                .spawn(async move {
×
1000
                    call_peer(
×
1001
                        peer_with_lost_connection,
×
1002
                        global_state_lock,
×
1003
                        main_to_peer_broadcast_rx,
×
1004
                        peer_task_to_main_tx,
×
1005
                        own_handshake_data,
×
1006
                        1, // All CLI-specified peers have distance 1
×
1007
                    )
×
UNCOV
1008
                    .await;
×
UNCOV
1009
                })?;
×
1010
            main_loop_state.task_handles.push(outgoing_connection_task);
×
1011
            main_loop_state.task_handles.retain(|th| !th.is_finished());
×
1012
        }
×
1013

1014
        Ok(())
×
1015
    }
×
1016

1017
    /// Perform peer discovery.
1018
    ///
1019
    /// Peer discovery involves finding potential peers from connected peers
1020
    /// and attempts to establish a connection with one of them.
1021
    ///
1022
    /// Locking:
1023
    ///   * acquires `global_state_lock` for read
1024
    async fn discover_peers(&self, main_loop_state: &mut MutableMainLoopState) -> Result<()> {
2✔
1025
        // fetch all relevant info from global state, then release the lock
2✔
1026
        let cli_args = self.global_state_lock.cli();
2✔
1027
        let global_state = self.global_state_lock.lock_guard().await;
2✔
1028
        let connected_peers = global_state.net.peer_map.values().cloned().collect_vec();
2✔
1029
        let own_instance_id = global_state.net.instance_id;
2✔
1030
        let own_handshake_data = global_state.get_own_handshakedata();
2✔
1031
        drop(global_state);
2✔
1032

2✔
1033
        let num_peers = connected_peers.len();
2✔
1034
        let max_num_peers = cli_args.max_num_peers;
2✔
1035

2✔
1036
        // Don't make an outgoing connection if
2✔
1037
        // - the peer limit is reached (or exceeded), or
2✔
1038
        // - the peer limit is _almost_ reached; reserve the last slot for an
2✔
1039
        //   incoming connection.
2✔
1040
        if num_peers >= max_num_peers || num_peers > 2 && num_peers - 1 == max_num_peers {
2✔
1041
            info!("Connected to {num_peers} peers. The configured max is {max_num_peers} peers.");
1✔
1042
            info!("Skipping peer discovery.");
1✔
1043
            return Ok(());
1✔
1044
        }
1✔
1045

1✔
1046
        info!("Performing peer discovery");
1✔
1047

1048
        // Ask all peers for their peer lists. This will eventually – once the
1049
        // responses have come in – update the list of potential peers.
1050
        self.main_to_peer_broadcast_tx
1✔
1051
            .send(MainToPeerTask::MakePeerDiscoveryRequest)?;
1✔
1052

1053
        // Get a peer candidate from the list of potential peers. Generally,
1054
        // the peer lists requested in the previous step will not have come in
1055
        // yet. Therefore, the new candidate is selected based on somewhat
1056
        // (but not overly) old information.
1057
        let Some((peer_candidate, candidate_distance)) = main_loop_state
1✔
1058
            .potential_peers
1✔
1059
            .get_candidate(&connected_peers, own_instance_id)
1✔
1060
        else {
1061
            info!("Found no peer candidate to connect to. Not making new connection.");
1✔
1062
            return Ok(());
1✔
1063
        };
1064

1065
        // Try to connect to the selected candidate.
UNCOV
1066
        info!("Connecting to peer {peer_candidate} with distance {candidate_distance}");
×
UNCOV
1067
        let global_state_lock = self.global_state_lock.clone();
×
UNCOV
1068
        let main_to_peer_broadcast_rx = self.main_to_peer_broadcast_tx.subscribe();
×
UNCOV
1069
        let peer_task_to_main_tx = self.peer_task_to_main_tx.to_owned();
×
UNCOV
1070
        let outgoing_connection_task = tokio::task::Builder::new()
×
UNCOV
1071
            .name("call_peer_wrapper_2")
×
UNCOV
1072
            .spawn(async move {
×
UNCOV
1073
                call_peer(
×
UNCOV
1074
                    peer_candidate,
×
UNCOV
1075
                    global_state_lock,
×
UNCOV
1076
                    main_to_peer_broadcast_rx,
×
UNCOV
1077
                    peer_task_to_main_tx,
×
UNCOV
1078
                    own_handshake_data,
×
UNCOV
1079
                    candidate_distance,
×
UNCOV
1080
                )
×
UNCOV
1081
                .await;
×
UNCOV
1082
            })?;
×
UNCOV
1083
        main_loop_state.task_handles.push(outgoing_connection_task);
×
UNCOV
1084
        main_loop_state.task_handles.retain(|th| !th.is_finished());
×
UNCOV
1085

×
UNCOV
1086
        // Immediately request the new peer's peer list. This allows
×
UNCOV
1087
        // incorporating the new peer's peers into the list of potential peers,
×
UNCOV
1088
        // to be used in the next round of peer discovery.
×
UNCOV
1089
        self.main_to_peer_broadcast_tx
×
UNCOV
1090
            .send(MainToPeerTask::MakeSpecificPeerDiscoveryRequest(
×
UNCOV
1091
                peer_candidate,
×
UNCOV
1092
            ))?;
×
1093

UNCOV
1094
        Ok(())
×
1095
    }
2✔
1096

1097
    /// Return a list of block heights for a block-batch request.
1098
    ///
1099
    /// Returns an ordered list of the heights of *most preferred block*
1100
    /// to build on, where current tip is always the most preferred block.
1101
    ///
1102
    /// Uses a factor to ensure that the peer will always have something to
1103
    /// build on top of by providing potential starting points all the way
1104
    /// back to genesis.
1105
    fn batch_request_uca_candidate_heights(own_tip_height: BlockHeight) -> Vec<BlockHeight> {
258✔
1106
        const FACTOR: f64 = 1.07f64;
1107

1108
        let mut look_behind = 0;
258✔
1109
        let mut ret = vec![];
258✔
1110

1111
        // A factor of 1.07 can look back ~1m blocks in 200 digests.
1112
        while ret.len() < MAX_NUM_DIGESTS_IN_BATCH_REQUEST - 1 {
51,374✔
1113
            let height = match own_tip_height.checked_sub(look_behind) {
51,118✔
1114
                None => break,
1✔
1115
                Some(height) if height.is_genesis() => break,
51,117✔
1116
                Some(height) => height,
51,116✔
1117
            };
51,116✔
1118

51,116✔
1119
            ret.push(height);
51,116✔
1120
            look_behind = ((look_behind as f64 + 1.0) * FACTOR).floor() as u64;
51,116✔
1121
        }
1122

1123
        ret.push(BlockHeight::genesis());
258✔
1124

258✔
1125
        ret
258✔
1126
    }
258✔
1127

1128
    /// Logic for requesting the batch-download of blocks from peers
1129
    ///
1130
    /// Locking:
1131
    ///   * acquires `global_state_lock` for read
1132
    async fn block_sync(&mut self, main_loop_state: &mut MutableMainLoopState) -> Result<()> {
3✔
1133
        let global_state = self.global_state_lock.lock_guard().await;
3✔
1134

1135
        // Check if we are in sync mode
1136
        let Some(anchor) = &global_state.net.sync_anchor else {
3✔
1137
            return Ok(());
1✔
1138
        };
1139

1140
        info!("Running sync");
2✔
1141

1142
        let (own_tip_hash, own_tip_height, own_cumulative_pow) = (
2✔
1143
            global_state.chain.light_state().hash(),
2✔
1144
            global_state.chain.light_state().kernel.header.height,
2✔
1145
            global_state
2✔
1146
                .chain
2✔
1147
                .light_state()
2✔
1148
                .kernel
2✔
1149
                .header
2✔
1150
                .cumulative_proof_of_work,
2✔
1151
        );
2✔
1152

2✔
1153
        // Check if sync mode has timed out entirely, in which case it should
2✔
1154
        // be abandoned.
2✔
1155
        let anchor = anchor.to_owned();
2✔
1156
        if self.now().duration_since(anchor.updated)?.as_secs()
2✔
1157
            > GLOBAL_SYNCHRONIZATION_TIMEOUT_IN_SECONDS
1158
        {
1159
            warn!("Sync mode has timed out. Abandoning sync mode.");
1✔
1160

1161
            // Abandon attempt, and punish all peers claiming to serve these
1162
            // blocks.
1163
            drop(global_state);
1✔
1164
            self.global_state_lock
1✔
1165
                .lock_guard_mut()
1✔
1166
                .await
1✔
1167
                .net
1✔
1168
                .sync_anchor = None;
1✔
1169

1170
            let peers_to_punish = main_loop_state
1✔
1171
                .sync_state
1✔
1172
                .get_potential_peers_for_sync_request(own_cumulative_pow);
1✔
1173

1174
            for peer in peers_to_punish {
2✔
1175
                self.main_to_peer_broadcast_tx
1✔
1176
                    .send(MainToPeerTask::PeerSynchronizationTimeout(peer))?;
1✔
1177
            }
1178

1179
            return Ok(());
1✔
1180
        }
1✔
1181

1✔
1182
        let (peer_to_sanction, try_new_request): (Option<SocketAddr>, bool) = main_loop_state
1✔
1183
            .sync_state
1✔
1184
            .get_status_of_last_request(own_tip_height, self.now());
1✔
1185

1186
        // Sanction peer if they failed to respond
1187
        if let Some(peer) = peer_to_sanction {
1✔
UNCOV
1188
            self.main_to_peer_broadcast_tx
×
UNCOV
1189
                .send(MainToPeerTask::PeerSynchronizationTimeout(peer))?;
×
1190
        }
1✔
1191

1192
        if !try_new_request {
1✔
UNCOV
1193
            info!("Waiting for last sync to complete.");
×
UNCOV
1194
            return Ok(());
×
1195
        }
1✔
1196

1✔
1197
        // Create the next request from the reported
1✔
1198
        info!("Creating new sync request");
1✔
1199

1200
        // Pick a random peer that has reported to have relevant blocks
1201
        let candidate_peers = main_loop_state
1✔
1202
            .sync_state
1✔
1203
            .get_potential_peers_for_sync_request(own_cumulative_pow);
1✔
1204
        let mut rng = rand::rng();
1✔
1205
        let chosen_peer = candidate_peers.choose(&mut rng);
1✔
1206
        assert!(
1✔
1207
            chosen_peer.is_some(),
1✔
UNCOV
1208
            "A synchronization candidate must be available for a request. \
×
UNCOV
1209
            Otherwise, the data structure is in an invalid state and syncing should not be active"
×
1210
        );
1211

1212
        let ordered_preferred_block_digests = match anchor.champion {
1✔
UNCOV
1213
            Some((_height, digest)) => vec![digest],
×
1214
            None => {
1215
                // Find candidate-UCA digests based on a sparse distribution of
1216
                // block heights skewed towards own tip height
1217
                let request_heights = Self::batch_request_uca_candidate_heights(own_tip_height);
1✔
1218
                let mut ordered_preferred_block_digests = vec![];
1✔
1219
                for height in request_heights {
2✔
1220
                    let digest = global_state
1✔
1221
                        .chain
1✔
1222
                        .archival_state()
1✔
1223
                        .archival_block_mmr
1✔
1224
                        .ammr()
1✔
1225
                        .get_leaf_async(height.into())
1✔
1226
                        .await;
1✔
1227
                    ordered_preferred_block_digests.push(digest);
1✔
1228
                }
1229
                ordered_preferred_block_digests
1✔
1230
            }
1231
        };
1232

1233
        // Send message to the relevant peer loop to request the blocks
1234
        let chosen_peer = chosen_peer.unwrap();
1✔
1235
        info!(
1✔
UNCOV
1236
            "Sending block batch request to {}\nrequesting blocks descending from {}\n height {}",
×
1237
            chosen_peer, own_tip_hash, own_tip_height
1238
        );
1239
        self.main_to_peer_broadcast_tx
1✔
1240
            .send(MainToPeerTask::RequestBlockBatch(
1✔
1241
                MainToPeerTaskBatchBlockRequest {
1✔
1242
                    peer_addr_target: *chosen_peer,
1✔
1243
                    known_blocks: ordered_preferred_block_digests,
1✔
1244
                    anchor_mmr: anchor.block_mmr.clone(),
1✔
1245
                },
1✔
1246
            ))
1✔
1247
            .expect("Sending message to peers must succeed");
1✔
1248

1✔
1249
        // Record that this request was sent to the peer
1✔
1250
        let requested_block_height = own_tip_height.next();
1✔
1251
        main_loop_state
1✔
1252
            .sync_state
1✔
1253
            .record_request(requested_block_height, *chosen_peer, self.now());
1✔
1254

1✔
1255
        Ok(())
1✔
1256
    }
3✔
1257

1258
    /// Scheduled task for upgrading the proofs of transactions in the mempool.
1259
    ///
1260
    /// Will either perform a merge of two transactions supported with single
1261
    /// proofs, or will upgrade a transaction proof of the type
1262
    /// `ProofCollection` to `SingleProof`.
1263
    ///
1264
    /// All proving takes place in a spawned task such that it doesn't block
1265
    /// the main loop. The MutableMainLoopState gets the JoinHandle of the
1266
    /// spawned upgrade task such that its status can be expected.
1267
    async fn proof_upgrader(&mut self, main_loop_state: &mut MutableMainLoopState) -> Result<()> {
3✔
1268
        fn attempt_upgrade(
3✔
1269
            global_state: &GlobalState,
3✔
1270
            now: SystemTime,
3✔
1271
            tx_upgrade_interval: Option<Duration>,
3✔
1272
            main_loop_state: &MutableMainLoopState,
3✔
1273
        ) -> Result<bool> {
3✔
1274
            let duration_since_last_upgrade =
3✔
1275
                now.duration_since(global_state.net.last_tx_proof_upgrade_attempt)?;
3✔
1276
            let previous_upgrade_task_is_still_running = main_loop_state
3✔
1277
                .proof_upgrader_task
3✔
1278
                .as_ref()
3✔
1279
                .is_some_and(|x| !x.is_finished());
3✔
1280
            Ok(global_state.net.sync_anchor.is_none()
3✔
1281
                && global_state.proving_capability() == TxProvingCapability::SingleProof
3✔
1282
                && !previous_upgrade_task_is_still_running
3✔
1283
                && tx_upgrade_interval
3✔
1284
                    .is_some_and(|upgrade_interval| duration_since_last_upgrade > upgrade_interval))
3✔
1285
        }
3✔
1286

1287
        trace!("Running proof upgrader scheduled task");
3✔
1288

1289
        // Check if it's time to run the proof-upgrader, and if we're capable
1290
        // of upgrading a transaction proof.
1291
        let tx_upgrade_interval = self.global_state_lock.cli().tx_upgrade_interval();
3✔
1292
        let (upgrade_candidate, tx_origin) = {
1✔
1293
            let global_state = self.global_state_lock.lock_guard().await;
3✔
1294
            let now = self.now();
3✔
1295
            if !attempt_upgrade(&global_state, now, tx_upgrade_interval, main_loop_state)? {
3✔
1296
                trace!("Not attempting upgrade.");
2✔
1297
                return Ok(());
2✔
1298
            }
1✔
1299

1✔
1300
            debug!("Attempting to run transaction-proof-upgrade");
1✔
1301

1302
            // Find a candidate for proof upgrade
1303
            let Some((upgrade_candidate, tx_origin)) = get_upgrade_task_from_mempool(&global_state)
1✔
1304
            else {
UNCOV
1305
                debug!("Found no transaction-proof to upgrade");
×
UNCOV
1306
                return Ok(());
×
1307
            };
1308

1309
            (upgrade_candidate, tx_origin)
1✔
1310
        };
1✔
1311

1✔
1312
        info!(
1✔
UNCOV
1313
            "Attempting to upgrade transaction proofs of: {}",
×
UNCOV
1314
            upgrade_candidate.affected_txids().iter().join("; ")
×
1315
        );
1316

1317
        // Perform the upgrade, if we're not using the prover for anything else,
1318
        // like mining, or proving our own transaction. Running the prover takes
1319
        // a long time (minutes), so we spawn a task for this such that we do
1320
        // not block the main loop.
1321
        let vm_job_queue = self.global_state_lock.vm_job_queue().clone();
1✔
1322
        let perform_ms_update_if_needed =
1✔
1323
            self.global_state_lock.cli().proving_capability() == TxProvingCapability::SingleProof;
1✔
1324

1✔
1325
        let global_state_lock_clone = self.global_state_lock.clone();
1✔
1326
        let main_to_peer_broadcast_tx_clone = self.main_to_peer_broadcast_tx.clone();
1✔
1327
        let proof_upgrader_task =
1✔
1328
            tokio::task::Builder::new()
1✔
1329
                .name("proof_upgrader")
1✔
1330
                .spawn(async move {
1✔
1331
                    upgrade_candidate
1✔
1332
                        .handle_upgrade(
1✔
1333
                            &vm_job_queue,
1✔
1334
                            tx_origin,
1✔
1335
                            perform_ms_update_if_needed,
1✔
1336
                            global_state_lock_clone,
1✔
1337
                            main_to_peer_broadcast_tx_clone,
1✔
1338
                        )
1✔
1339
                        .await
1✔
1340
                })?;
1✔
1341

1342
        main_loop_state.proof_upgrader_task = Some(proof_upgrader_task);
1✔
1343

1✔
1344
        Ok(())
1✔
1345
    }
3✔
1346

1347
    /// Post-processing when new block has arrived. Spawn a task to update
1348
    /// transactions in the mempool. Only when the spawned task has completed,
1349
    /// should the miner continue.
1350
    fn spawn_mempool_txs_update_job(
1✔
1351
        &self,
1✔
1352
        main_loop_state: &mut MutableMainLoopState,
1✔
1353
        update_jobs: Vec<UpdateMutatorSetDataJob>,
1✔
1354
    ) {
1✔
1355
        // job completion of the spawned task is communicated through the
1✔
1356
        // `update_mempool_txs_handle` channel.
1✔
1357
        let vm_job_queue = self.global_state_lock.vm_job_queue().clone();
1✔
1358
        if let Some(handle) = main_loop_state.update_mempool_txs_handle.as_ref() {
1✔
UNCOV
1359
            handle.abort();
×
1360
        }
1✔
1361
        let (update_sender, update_receiver) =
1✔
1362
            mpsc::channel::<Vec<Transaction>>(TX_UPDATER_CHANNEL_CAPACITY);
1✔
1363

1✔
1364
        // note: if this task is cancelled, the job will continue
1✔
1365
        // because TritonVmJobOptions::cancel_job_rx is None.
1✔
1366
        // see how compose_task handles cancellation in mine_loop.
1✔
1367
        let job_options = self
1✔
1368
            .global_state_lock
1✔
1369
            .cli()
1✔
1370
            .proof_job_options(TritonVmJobPriority::Highest);
1✔
1371
        main_loop_state.update_mempool_txs_handle = Some(
1✔
1372
            tokio::task::Builder::new()
1✔
1373
                .name("mempool tx ms-updater")
1✔
1374
                .spawn(async move {
1✔
UNCOV
1375
                    Self::update_mempool_jobs(
×
UNCOV
1376
                        update_jobs,
×
UNCOV
1377
                        &vm_job_queue,
×
UNCOV
1378
                        update_sender,
×
UNCOV
1379
                        job_options,
×
UNCOV
1380
                    )
×
UNCOV
1381
                    .await
×
1382
                })
1✔
1383
                .unwrap(),
1✔
1384
        );
1✔
1385
        main_loop_state.update_mempool_receiver = update_receiver;
1✔
1386
    }
1✔
1387

UNCOV
1388
    pub(crate) async fn run(
×
UNCOV
1389
        &mut self,
×
UNCOV
1390
        mut peer_task_to_main_rx: mpsc::Receiver<PeerTaskToMain>,
×
UNCOV
1391
        mut miner_to_main_rx: mpsc::Receiver<MinerToMain>,
×
UNCOV
1392
        mut rpc_server_to_main_rx: mpsc::Receiver<RPCServerToMain>,
×
UNCOV
1393
        task_handles: Vec<JoinHandle<()>>,
×
UNCOV
1394
    ) -> Result<i32> {
×
UNCOV
1395
        // Handle incoming connections, messages from peer tasks, and messages from the mining task
×
UNCOV
1396
        let mut main_loop_state = MutableMainLoopState::new(task_handles);
×
UNCOV
1397

×
1398
        // Set peer discovery to run every N seconds. All timers must be reset
×
UNCOV
1399
        // every time they have run.
×
UNCOV
1400
        let peer_discovery_timer_interval = Duration::from_secs(PEER_DISCOVERY_INTERVAL_IN_SECONDS);
×
UNCOV
1401
        let peer_discovery_timer = time::sleep(peer_discovery_timer_interval);
×
UNCOV
1402
        tokio::pin!(peer_discovery_timer);
×
UNCOV
1403

×
UNCOV
1404
        // Set synchronization to run every M seconds.
×
UNCOV
1405
        let block_sync_interval = Duration::from_secs(SYNC_REQUEST_INTERVAL_IN_SECONDS);
×
UNCOV
1406
        let block_sync_timer = time::sleep(block_sync_interval);
×
UNCOV
1407
        tokio::pin!(block_sync_timer);
×
UNCOV
1408

×
UNCOV
1409
        // Set removal of transactions from mempool.
×
UNCOV
1410
        let mempool_cleanup_interval = Duration::from_secs(MEMPOOL_PRUNE_INTERVAL_IN_SECS);
×
UNCOV
1411
        let mempool_cleanup_timer = time::sleep(mempool_cleanup_interval);
×
UNCOV
1412
        tokio::pin!(mempool_cleanup_timer);
×
UNCOV
1413

×
1414
        // Set removal of stale notifications for incoming UTXOs.
×
1415
        let utxo_notification_cleanup_interval =
×
1416
            Duration::from_secs(EXPECTED_UTXOS_PRUNE_INTERVAL_IN_SECS);
×
1417
        let utxo_notification_cleanup_timer = time::sleep(utxo_notification_cleanup_interval);
×
1418
        tokio::pin!(utxo_notification_cleanup_timer);
×
1419

×
1420
        // Set restoration of membership proofs to run every Q seconds.
×
UNCOV
1421
        let mp_resync_interval = Duration::from_secs(MP_RESYNC_INTERVAL_IN_SECS);
×
UNCOV
1422
        let mp_resync_timer = time::sleep(mp_resync_interval);
×
UNCOV
1423
        tokio::pin!(mp_resync_timer);
×
UNCOV
1424

×
UNCOV
1425
        // Set transasction-proof-upgrade-checker to run every R secnods.
×
UNCOV
1426
        let tx_proof_upgrade_interval =
×
1427
            Duration::from_secs(TRANSACTION_UPGRADE_CHECK_INTERVAL_IN_SECONDS);
×
1428
        let tx_proof_upgrade_timer = time::sleep(tx_proof_upgrade_interval);
×
1429
        tokio::pin!(tx_proof_upgrade_timer);
×
1430

×
1431
        // Spawn tasks to monitor for SIGTERM, SIGINT, and SIGQUIT. These
×
1432
        // signals are only used on Unix systems.
×
1433
        let (tx_term, mut rx_term): (mpsc::Sender<()>, mpsc::Receiver<()>) =
×
1434
            tokio::sync::mpsc::channel(2);
×
1435
        let (tx_int, mut rx_int): (mpsc::Sender<()>, mpsc::Receiver<()>) =
×
1436
            tokio::sync::mpsc::channel(2);
×
1437
        let (tx_quit, mut rx_quit): (mpsc::Sender<()>, mpsc::Receiver<()>) =
×
1438
            tokio::sync::mpsc::channel(2);
×
1439
        #[cfg(unix)]
1440
        {
×
1441
            use tokio::signal::unix::signal;
×
1442
            use tokio::signal::unix::SignalKind;
×
1443

1444
            // Monitor for SIGTERM
1445
            let mut sigterm = signal(SignalKind::terminate())?;
×
1446
            tokio::task::Builder::new()
×
1447
                .name("sigterm_handler")
×
1448
                .spawn(async move {
×
1449
                    if sigterm.recv().await.is_some() {
×
1450
                        info!("Received SIGTERM");
×
1451
                        tx_term.send(()).await.unwrap();
×
1452
                    }
×
1453
                })?;
×
1454

1455
            // Monitor for SIGINT
1456
            let mut sigint = signal(SignalKind::interrupt())?;
×
1457
            tokio::task::Builder::new()
×
1458
                .name("sigint_handler")
×
1459
                .spawn(async move {
×
1460
                    if sigint.recv().await.is_some() {
×
1461
                        info!("Received SIGINT");
×
1462
                        tx_int.send(()).await.unwrap();
×
1463
                    }
×
1464
                })?;
×
1465

1466
            // Monitor for SIGQUIT
1467
            let mut sigquit = signal(SignalKind::quit())?;
×
1468
            tokio::task::Builder::new()
×
1469
                .name("sigquit_handler")
×
1470
                .spawn(async move {
×
1471
                    if sigquit.recv().await.is_some() {
×
1472
                        info!("Received SIGQUIT");
×
1473
                        tx_quit.send(()).await.unwrap();
×
1474
                    }
×
1475
                })?;
×
1476
        }
1477

1478
        #[cfg(not(unix))]
1479
        drop((tx_term, tx_int, tx_quit));
1480

1481
        let exit_code: i32 = loop {
×
1482
            select! {
×
UNCOV
1483
                Ok(()) = signal::ctrl_c() => {
×
UNCOV
1484
                    info!("Detected Ctrl+c signal.");
×
1485
                    break SUCCESS_EXIT_CODE;
×
1486
                }
1487

1488
                // Monitor for SIGTERM, SIGINT, and SIGQUIT.
1489
                Some(_) = rx_term.recv() => {
×
1490
                    info!("Detected SIGTERM signal.");
×
1491
                    break SUCCESS_EXIT_CODE;
×
1492
                }
1493
                Some(_) = rx_int.recv() => {
×
UNCOV
1494
                    info!("Detected SIGINT signal.");
×
UNCOV
1495
                    break SUCCESS_EXIT_CODE;
×
1496
                }
1497
                Some(_) = rx_quit.recv() => {
×
1498
                    info!("Detected SIGQUIT signal.");
×
1499
                    break SUCCESS_EXIT_CODE;
×
1500
                }
1501

1502
                // Handle incoming connections from peer
1503
                Ok((stream, peer_address)) = self.incoming_peer_listener.accept() => {
×
1504
                    // Return early if no incoming connections are accepted. Do
1505
                    // not send application-handshake.
UNCOV
1506
                    if self.global_state_lock.cli().disallow_all_incoming_peer_connections() {
×
1507
                        warn!("Got incoming connection despite not accepting any. Ignoring");
×
1508
                        continue;
×
1509
                    }
×
1510

1511
                    let state = self.global_state_lock.lock_guard().await;
×
1512
                    let main_to_peer_broadcast_rx_clone: broadcast::Receiver<MainToPeerTask> = self.main_to_peer_broadcast_tx.subscribe();
×
1513
                    let peer_task_to_main_tx_clone: mpsc::Sender<PeerTaskToMain> = self.peer_task_to_main_tx.clone();
×
1514
                    let own_handshake_data: HandshakeData = state.get_own_handshakedata();
×
1515
                    let global_state_lock = self.global_state_lock.clone(); // bump arc refcount.
×
UNCOV
1516
                    let incoming_peer_task_handle = tokio::task::Builder::new()
×
UNCOV
1517
                        .name("answer_peer_wrapper")
×
UNCOV
1518
                        .spawn(async move {
×
UNCOV
1519
                        match answer_peer(
×
UNCOV
1520
                            stream,
×
1521
                            global_state_lock,
×
1522
                            peer_address,
×
1523
                            main_to_peer_broadcast_rx_clone,
×
1524
                            peer_task_to_main_tx_clone,
×
1525
                            own_handshake_data,
×
UNCOV
1526
                        ).await {
×
UNCOV
1527
                            Ok(()) => (),
×
UNCOV
1528
                            Err(err) => error!("Got error: {:?}", err),
×
1529
                        }
1530
                    })?;
×
1531
                    main_loop_state.task_handles.push(incoming_peer_task_handle);
×
UNCOV
1532
                    main_loop_state.task_handles.retain(|th| !th.is_finished());
×
1533
                }
×
1534

1535
                // Handle messages from peer tasks
UNCOV
1536
                Some(msg) = peer_task_to_main_rx.recv() => {
×
1537
                    debug!("Received message sent to main task.");
×
1538
                    self.handle_peer_task_message(
×
1539
                        msg,
×
UNCOV
1540
                        &mut main_loop_state,
×
UNCOV
1541
                    )
×
UNCOV
1542
                    .await?
×
1543
                }
1544

1545
                // Handle messages from miner task
1546
                Some(main_message) = miner_to_main_rx.recv() => {
×
1547
                    let exit_code = self.handle_miner_task_message(main_message, &mut main_loop_state).await?;
×
1548

1549
                    if let Some(exit_code) = exit_code {
×
UNCOV
1550
                        break exit_code;
×
1551
                    }
×
1552

1553
                }
1554

1555
                // Handle the completion of mempool tx-update jobs after new block.
1556
                Some(ms_updated_transactions) = main_loop_state.update_mempool_receiver.recv() => {
×
1557
                    self.handle_updated_mempool_txs(ms_updated_transactions).await;
×
1558
                }
1559

1560
                // Handle messages from rpc server task
1561
                Some(rpc_server_message) = rpc_server_to_main_rx.recv() => {
×
1562
                    let shutdown_after_execution = self.handle_rpc_server_message(rpc_server_message.clone(), &mut main_loop_state).await?;
×
1563
                    if shutdown_after_execution {
×
1564
                        break SUCCESS_EXIT_CODE
×
1565
                    }
×
1566
                }
1567

1568
                // Handle peer discovery
UNCOV
1569
                _ = &mut peer_discovery_timer => {
×
1570
                    log_slow_scope!(fn_name!() + "::select::peer_discovery_timer");
×
1571

×
1572
                    // Check number of peers we are connected to and connect to more peers
×
1573
                    // if needed.
×
UNCOV
1574
                    debug!("Timer: peer discovery job");
×
UNCOV
1575
                    self.prune_peers().await?;
×
1576
                    self.reconnect(&mut main_loop_state).await?;
×
1577
                    self.discover_peers(&mut main_loop_state).await?;
×
1578

1579
                    // Reset the timer to run this branch again in N seconds
1580
                    peer_discovery_timer.as_mut().reset(tokio::time::Instant::now() + peer_discovery_timer_interval);
×
1581
                }
1582

1583
                // Handle synchronization (i.e. batch-downloading of blocks)
UNCOV
1584
                _ = &mut block_sync_timer => {
×
UNCOV
1585
                    log_slow_scope!(fn_name!() + "::select::block_sync_timer");
×
1586

×
1587
                    trace!("Timer: block-synchronization job");
×
UNCOV
1588
                    self.block_sync(&mut main_loop_state).await?;
×
1589

1590
                    // Reset the timer to run this branch again in M seconds
1591
                    block_sync_timer.as_mut().reset(tokio::time::Instant::now() + block_sync_interval);
×
1592
                }
1593

1594
                // Handle mempool cleanup, i.e. removing stale/too old txs from mempool
UNCOV
1595
                _ = &mut mempool_cleanup_timer => {
×
1596
                    log_slow_scope!(fn_name!() + "::select::mempool_cleanup_timer");
×
1597

×
UNCOV
1598
                    debug!("Timer: mempool-cleaner job");
×
UNCOV
1599
                    self.global_state_lock.lock_guard_mut().await.mempool_prune_stale_transactions().await;
×
1600

1601
                    // Reset the timer to run this branch again in P seconds
1602
                    mempool_cleanup_timer.as_mut().reset(tokio::time::Instant::now() + mempool_cleanup_interval);
×
1603
                }
1604

1605
                // Handle incoming UTXO notification cleanup, i.e. removing stale/too old UTXO notification from pool
UNCOV
1606
                _ = &mut utxo_notification_cleanup_timer => {
×
UNCOV
1607
                    log_slow_scope!(fn_name!() + "::select::utxo_notification_cleanup_timer");
×
UNCOV
1608

×
1609
                    debug!("Timer: UTXO notification pool cleanup job");
×
1610

1611
                    // Danger: possible loss of funds.
1612
                    //
1613
                    // See description of prune_stale_expected_utxos().
1614
                    //
1615
                    // This call is disabled until such time as a thorough
1616
                    // evaluation and perhaps reimplementation determines that
1617
                    // it can be called safely without possible loss of funds.
1618
                    // self.global_state_lock.lock_mut(|s| s.wallet_state.prune_stale_expected_utxos()).await;
1619

1620
                    utxo_notification_cleanup_timer.as_mut().reset(tokio::time::Instant::now() + utxo_notification_cleanup_interval);
×
1621
                }
1622

1623
                // Handle membership proof resynchronization
1624
                _ = &mut mp_resync_timer => {
×
1625
                    log_slow_scope!(fn_name!() + "::select::mp_resync_timer");
×
1626

×
1627
                    debug!("Timer: Membership proof resync job");
×
1628
                    self.global_state_lock.resync_membership_proofs().await?;
×
1629

UNCOV
1630
                    mp_resync_timer.as_mut().reset(tokio::time::Instant::now() + mp_resync_interval);
×
1631
                }
1632

1633
                // Check if it's time to run the proof upgrader
UNCOV
1634
                _ = &mut tx_proof_upgrade_timer => {
×
1635
                    log_slow_scope!(fn_name!() + "::select::tx_upgrade_proof_timer");
×
1636

×
1637
                    trace!("Timer: tx-proof-upgrader");
×
1638
                    self.proof_upgrader(&mut main_loop_state).await?;
×
1639

UNCOV
1640
                    tx_proof_upgrade_timer.as_mut().reset(tokio::time::Instant::now() + tx_proof_upgrade_interval);
×
1641
                }
1642

1643
            }
1644
        };
1645

1646
        self.graceful_shutdown(main_loop_state.task_handles).await?;
×
1647
        info!("Shutdown completed.");
×
1648

1649
        Ok(exit_code)
×
UNCOV
1650
    }
×
1651

1652
    /// Handle messages from the RPC server. Returns `true` iff the client should shut down
1653
    /// after handling this message.
UNCOV
1654
    async fn handle_rpc_server_message(
×
UNCOV
1655
        &mut self,
×
UNCOV
1656
        msg: RPCServerToMain,
×
UNCOV
1657
        main_loop_state: &mut MutableMainLoopState,
×
UNCOV
1658
    ) -> Result<bool> {
×
UNCOV
1659
        match msg {
×
1660
            RPCServerToMain::BroadcastTx(transaction) => {
×
UNCOV
1661
                debug!(
×
UNCOV
1662
                    "`main` received following transaction from RPC Server. {} inputs, {} outputs. Synced to mutator set hash: {}",
×
UNCOV
1663
                    transaction.kernel.inputs.len(),
×
1664
                    transaction.kernel.outputs.len(),
×
1665
                    transaction.kernel.mutator_set_hash
×
1666
                );
1667

1668
                // insert transaction into mempool
UNCOV
1669
                self.global_state_lock
×
1670
                    .lock_guard_mut()
×
UNCOV
1671
                    .await
×
UNCOV
1672
                    .mempool_insert(*transaction.clone(), TransactionOrigin::Own)
×
UNCOV
1673
                    .await;
×
1674

1675
                // Is this a transaction we can share with peers? If so, share
1676
                // it immediately.
1677
                if let Ok(notification) = transaction.as_ref().try_into() {
×
1678
                    self.main_to_peer_broadcast_tx
×
UNCOV
1679
                        .send(MainToPeerTask::TransactionNotification(notification))?;
×
1680
                } else {
1681
                    // Otherwise, upgrade its proof quality, and share it by
1682
                    // spinning up the proof upgrader.
UNCOV
1683
                    let TransactionProof::Witness(primitive_witness) = transaction.proof else {
×
UNCOV
1684
                        panic!("Expected Primitive witness. Got: {:?}", transaction.proof);
×
1685
                    };
1686

1687
                    let vm_job_queue = self.global_state_lock.vm_job_queue().clone();
×
UNCOV
1688

×
1689
                    let proving_capability = self.global_state_lock.cli().proving_capability();
×
1690
                    let upgrade_job =
×
UNCOV
1691
                        UpgradeJob::from_primitive_witness(proving_capability, primitive_witness);
×
UNCOV
1692

×
UNCOV
1693
                    // note: handle_upgrade() hands off proving to the
×
1694
                    //       triton-vm job queue and waits for job completion.
×
1695
                    // note: handle_upgrade() broadcasts to peers on success.
×
1696

×
1697
                    let global_state_lock_clone = self.global_state_lock.clone();
×
1698
                    let main_to_peer_broadcast_tx_clone = self.main_to_peer_broadcast_tx.clone();
×
1699
                    let _proof_upgrader_task = tokio::task::Builder::new()
×
1700
                        .name("proof_upgrader")
×
1701
                        .spawn(async move {
×
1702
                        upgrade_job
×
1703
                            .handle_upgrade(
×
1704
                                &vm_job_queue,
×
1705
                                TransactionOrigin::Own,
×
UNCOV
1706
                                true,
×
UNCOV
1707
                                global_state_lock_clone,
×
UNCOV
1708
                                main_to_peer_broadcast_tx_clone,
×
1709
                            )
×
1710
                            .await
×
1711
                    })?;
×
1712

1713
                    // main_loop_state.proof_upgrader_task = Some(proof_upgrader_task);
1714
                    // If transaction could not be shared immediately because
1715
                    // it contains secret data, upgrade its proof-type.
1716
                }
1717

1718
                // do not shut down
1719
                Ok(false)
×
1720
            }
1721
            RPCServerToMain::BroadcastMempoolTransactions => {
UNCOV
1722
                info!("Broadcasting transaction notifications for all shareable transactions in mempool");
×
1723
                let state = self.global_state_lock.lock_guard().await;
×
1724
                let txs = state.mempool.get_sorted_iter().collect_vec();
×
UNCOV
1725
                for (txid, _) in txs {
×
1726
                    // Since a read-lock is held over global state, the
1727
                    // transaction must exist in the mempool.
1728
                    let tx = state
×
1729
                        .mempool
×
1730
                        .get(txid)
×
1731
                        .expect("Transaction from iter must exist in mempool");
×
1732
                    let notification = TransactionNotification::try_from(tx);
×
1733
                    match notification {
×
1734
                        Ok(notification) => {
×
1735
                            self.main_to_peer_broadcast_tx
×
1736
                                .send(MainToPeerTask::TransactionNotification(notification))?;
×
1737
                        }
1738
                        Err(error) => {
×
1739
                            warn!("{error}");
×
1740
                        }
1741
                    };
1742
                }
1743
                Ok(false)
×
1744
            }
1745
            RPCServerToMain::ProofOfWorkSolution(new_block) => {
×
1746
                info!("Handling PoW solution from RPC call");
×
1747

1748
                self.handle_self_guessed_block(main_loop_state, new_block)
×
1749
                    .await?;
×
1750
                Ok(false)
×
1751
            }
1752
            RPCServerToMain::PauseMiner => {
UNCOV
1753
                info!("Received RPC request to stop miner");
×
1754

UNCOV
1755
                self.main_to_miner_tx.send(MainToMiner::StopMining);
×
UNCOV
1756
                Ok(false)
×
1757
            }
1758
            RPCServerToMain::RestartMiner => {
1759
                info!("Received RPC request to start miner");
×
UNCOV
1760
                self.main_to_miner_tx.send(MainToMiner::StartMining);
×
UNCOV
1761
                Ok(false)
×
1762
            }
1763
            RPCServerToMain::Shutdown => {
1764
                info!("Received RPC shutdown request.");
×
1765

1766
                // shut down
UNCOV
1767
                Ok(true)
×
1768
            }
1769
        }
1770
    }
×
1771

1772
    async fn graceful_shutdown(&mut self, task_handles: Vec<JoinHandle<()>>) -> Result<()> {
×
1773
        info!("Shutdown initiated.");
×
1774

1775
        // Stop mining
1776
        self.main_to_miner_tx.send(MainToMiner::Shutdown);
×
UNCOV
1777

×
1778
        // Send 'bye' message to all peers.
×
1779
        let _result = self
×
UNCOV
1780
            .main_to_peer_broadcast_tx
×
UNCOV
1781
            .send(MainToPeerTask::DisconnectAll());
×
UNCOV
1782
        debug!("sent bye");
×
1783

1784
        // Flush all databases
1785
        self.global_state_lock.flush_databases().await?;
×
1786

UNCOV
1787
        tokio::time::sleep(Duration::from_millis(50)).await;
×
1788

1789
        // Child processes should have finished by now. If not, abort them violently.
1790
        task_handles.iter().for_each(|jh| jh.abort());
×
UNCOV
1791

×
UNCOV
1792
        // wait for all to finish.
×
1793
        futures::future::join_all(task_handles).await;
×
1794

1795
        Ok(())
×
1796
    }
×
1797
}
1798

1799
#[cfg(test)]
1800
mod test {
1801
    use std::str::FromStr;
1802
    use std::time::UNIX_EPOCH;
1803

1804
    use tracing_test::traced_test;
1805

1806
    use super::*;
1807
    use crate::config_models::cli_args;
1808
    use crate::config_models::network::Network;
1809
    use crate::tests::shared::get_dummy_peer_incoming;
1810
    use crate::tests::shared::get_test_genesis_setup;
1811
    use crate::tests::shared::invalid_empty_block;
1812
    use crate::MINER_CHANNEL_CAPACITY;
1813

1814
    struct TestSetup {
1815
        peer_to_main_rx: mpsc::Receiver<PeerTaskToMain>,
1816
        miner_to_main_rx: mpsc::Receiver<MinerToMain>,
1817
        rpc_server_to_main_rx: mpsc::Receiver<RPCServerToMain>,
1818
        task_join_handles: Vec<JoinHandle<()>>,
1819
        main_loop_handler: MainLoopHandler,
1820
        main_to_peer_rx: broadcast::Receiver<MainToPeerTask>,
1821
    }
1822

1823
    async fn setup(num_init_peers_outgoing: u8, num_peers_incoming: u8) -> TestSetup {
8✔
1824
        const CHANNEL_CAPACITY_MINER_TO_MAIN: usize = 10;
1825

1826
        let network = Network::Main;
8✔
1827
        let (
1828
            main_to_peer_tx,
8✔
1829
            main_to_peer_rx,
8✔
1830
            peer_to_main_tx,
8✔
1831
            peer_to_main_rx,
8✔
1832
            mut state,
8✔
1833
            _own_handshake_data,
8✔
1834
        ) = get_test_genesis_setup(network, num_init_peers_outgoing, cli_args::Args::default())
8✔
1835
            .await
8✔
1836
            .unwrap();
8✔
1837
        assert!(
8✔
1838
            state
8✔
1839
                .lock_guard()
8✔
1840
                .await
8✔
1841
                .net
1842
                .peer_map
1843
                .iter()
8✔
1844
                .all(|(_addr, peer)| peer.connection_is_outbound()),
30✔
UNCOV
1845
            "Test assumption: All initial peers must represent outgoing connections."
×
1846
        );
1847

1848
        for i in 0..num_peers_incoming {
8✔
1849
            let peer_address =
5✔
1850
                std::net::SocketAddr::from_str(&format!("255.254.253.{}:8080", i)).unwrap();
5✔
1851
            state
5✔
1852
                .lock_guard_mut()
5✔
1853
                .await
5✔
1854
                .net
1855
                .peer_map
1856
                .insert(peer_address, get_dummy_peer_incoming(peer_address));
5✔
1857
        }
1858

1859
        let incoming_peer_listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
8✔
1860

8✔
1861
        let (main_to_miner_tx, _main_to_miner_rx) =
8✔
1862
            mpsc::channel::<MainToMiner>(MINER_CHANNEL_CAPACITY);
8✔
1863
        let (_miner_to_main_tx, miner_to_main_rx) =
8✔
1864
            mpsc::channel::<MinerToMain>(CHANNEL_CAPACITY_MINER_TO_MAIN);
8✔
1865
        let (_rpc_server_to_main_tx, rpc_server_to_main_rx) =
8✔
1866
            mpsc::channel::<RPCServerToMain>(CHANNEL_CAPACITY_MINER_TO_MAIN);
8✔
1867

8✔
1868
        let main_loop_handler = MainLoopHandler::new(
8✔
1869
            incoming_peer_listener,
8✔
1870
            state,
8✔
1871
            main_to_peer_tx,
8✔
1872
            peer_to_main_tx,
8✔
1873
            main_to_miner_tx,
8✔
1874
        );
8✔
1875

8✔
1876
        let task_join_handles = vec![];
8✔
1877

8✔
1878
        TestSetup {
8✔
1879
            miner_to_main_rx,
8✔
1880
            peer_to_main_rx,
8✔
1881
            rpc_server_to_main_rx,
8✔
1882
            task_join_handles,
8✔
1883
            main_loop_handler,
8✔
1884
            main_to_peer_rx,
8✔
1885
        }
8✔
1886
    }
8✔
1887

1888
    #[tokio::test]
1889
    async fn handle_self_guessed_block_new_tip() {
1✔
1890
        // A new tip is registered by main_loop. Verify correct state update.
1✔
1891
        let test_setup = setup(1, 0).await;
1✔
1892
        let TestSetup {
1✔
1893
            task_join_handles,
1✔
1894
            mut main_loop_handler,
1✔
1895
            mut main_to_peer_rx,
1✔
1896
            ..
1✔
1897
        } = test_setup;
1✔
1898
        let network = main_loop_handler.global_state_lock.cli().network;
1✔
1899
        let mut mutable_main_loop_state = MutableMainLoopState::new(task_join_handles);
1✔
1900

1✔
1901
        let block1 = invalid_empty_block(&Block::genesis(network));
1✔
1902

1✔
1903
        assert!(
1✔
1904
            main_loop_handler
1✔
1905
                .global_state_lock
1✔
1906
                .lock_guard()
1✔
1907
                .await
1✔
1908
                .chain
1✔
1909
                .light_state()
1✔
1910
                .header()
1✔
1911
                .height
1✔
1912
                .is_genesis(),
1✔
1913
            "Tip must be genesis prior to handling of new block"
1✔
1914
        );
1✔
1915

1✔
1916
        let block1 = Box::new(block1);
1✔
1917
        main_loop_handler
1✔
1918
            .handle_self_guessed_block(&mut mutable_main_loop_state, block1.clone())
1✔
1919
            .await
1✔
1920
            .unwrap();
1✔
1921
        let new_block_height: u64 = main_loop_handler
1✔
1922
            .global_state_lock
1✔
1923
            .lock_guard()
1✔
1924
            .await
1✔
1925
            .chain
1✔
1926
            .light_state()
1✔
1927
            .header()
1✔
1928
            .height
1✔
1929
            .into();
1✔
1930
        assert_eq!(
1✔
1931
            1u64, new_block_height,
1✔
1932
            "Tip height must be 1 after handling of new block"
1✔
1933
        );
1✔
1934
        let msg_to_peer_loops = main_to_peer_rx.recv().await.unwrap();
1✔
1935
        if let MainToPeerTask::Block(block_to_peers) = msg_to_peer_loops {
1✔
1936
            assert_eq!(
1✔
1937
                block1, block_to_peers,
1✔
1938
                "Peer loops must have received block 1"
1✔
1939
            );
1✔
1940
        } else {
1✔
1941
            panic!("Must have sent block notification to peer loops")
1✔
1942
        }
1✔
1943
    }
1✔
1944

1945
    mod sync_mode {
1946
        use tasm_lib::twenty_first::util_types::mmr::mmr_accumulator::MmrAccumulator;
1947
        use test_strategy::proptest;
1948

1949
        use super::*;
1950
        use crate::tests::shared::get_dummy_socket_address;
1951

1952
        #[proptest]
256✔
1953
        fn batch_request_heights_prop(#[strategy(0u64..100_000_000_000)] own_height: u64) {
1✔
1954
            batch_request_heights_sanity(own_height);
1955
        }
1956

1957
        #[test]
1958
        fn batch_request_heights_unit() {
1✔
1959
            let own_height = 1_000_000u64;
1✔
1960
            batch_request_heights_sanity(own_height);
1✔
1961
        }
1✔
1962

1963
        fn batch_request_heights_sanity(own_height: u64) {
257✔
1964
            let heights = MainLoopHandler::batch_request_uca_candidate_heights(own_height.into());
257✔
1965

257✔
1966
            let mut heights_rev = heights.clone();
257✔
1967
            heights_rev.reverse();
257✔
1968
            assert!(
257✔
1969
                heights_rev.is_sorted(),
257✔
UNCOV
1970
                "Heights must be sorted from high-to-low"
×
1971
            );
1972

1973
            heights_rev.dedup();
257✔
1974
            assert_eq!(heights_rev.len(), heights.len(), "duplicates");
257✔
1975

1976
            assert_eq!(heights[0], own_height.into(), "starts with own tip height");
257✔
1977
            assert!(
257✔
1978
                heights.last().unwrap().is_genesis(),
257✔
UNCOV
1979
                "ends with genesis block"
×
1980
            );
1981
        }
257✔
1982

1983
        #[tokio::test]
UNCOV
1984
        #[traced_test]
×
1985
        async fn sync_mode_abandoned_on_global_timeout() {
1✔
1986
            let num_outgoing_connections = 0;
1✔
1987
            let num_incoming_connections = 0;
1✔
1988
            let test_setup = setup(num_outgoing_connections, num_incoming_connections).await;
1✔
1989
            let TestSetup {
1990
                task_join_handles,
1✔
1991
                mut main_loop_handler,
1✔
1992
                ..
1✔
1993
            } = test_setup;
1✔
1994

1✔
1995
            let mut mutable_main_loop_state = MutableMainLoopState::new(task_join_handles);
1✔
1996

1✔
1997
            main_loop_handler
1✔
1998
                .block_sync(&mut mutable_main_loop_state)
1✔
1999
                .await
1✔
2000
                .expect("Must return OK when no sync mode is set");
1✔
2001

1✔
2002
            // Mock that we are in a valid sync state
1✔
2003
            let claimed_max_height = 1_000u64.into();
1✔
2004
            let claimed_max_pow = ProofOfWork::new([100; 6]);
1✔
2005
            main_loop_handler
1✔
2006
                .global_state_lock
1✔
2007
                .lock_guard_mut()
1✔
2008
                .await
1✔
2009
                .net
1✔
2010
                .sync_anchor = Some(SyncAnchor::new(
1✔
2011
                claimed_max_pow,
1✔
2012
                MmrAccumulator::new_from_leafs(vec![]),
1✔
2013
            ));
1✔
2014
            mutable_main_loop_state.sync_state.peer_sync_states.insert(
1✔
2015
                get_dummy_socket_address(0),
1✔
2016
                PeerSynchronizationState::new(claimed_max_height, claimed_max_pow),
1✔
2017
            );
1✔
2018

2019
            let sync_start_time = main_loop_handler
1✔
2020
                .global_state_lock
1✔
2021
                .lock_guard()
1✔
2022
                .await
1✔
2023
                .net
2024
                .sync_anchor
2025
                .as_ref()
1✔
2026
                .unwrap()
1✔
2027
                .updated;
1✔
2028
            main_loop_handler
1✔
2029
                .block_sync(&mut mutable_main_loop_state)
1✔
2030
                .await
1✔
2031
                .expect("Must return OK when sync mode has not timed out yet");
1✔
2032
            assert!(
1✔
2033
                main_loop_handler
1✔
2034
                    .global_state_lock
1✔
2035
                    .lock_guard()
1✔
2036
                    .await
1✔
2037
                    .net
2038
                    .sync_anchor
2039
                    .is_some(),
1✔
UNCOV
2040
                "Sync mode must still be set before timeout has occurred"
×
2041
            );
2042

2043
            assert_eq!(
1✔
2044
                sync_start_time,
1✔
2045
                main_loop_handler
1✔
2046
                    .global_state_lock
1✔
2047
                    .lock_guard()
1✔
2048
                    .await
1✔
2049
                    .net
2050
                    .sync_anchor
2051
                    .as_ref()
1✔
2052
                    .unwrap()
1✔
2053
                    .updated,
UNCOV
2054
                "timestamp may not be updated without state change"
×
2055
            );
2056

2057
            // Mock that sync-mode has timed out
2058
            main_loop_handler = main_loop_handler.with_mocked_time(
1✔
2059
                SystemTime::now()
1✔
2060
                    + Duration::from_secs(GLOBAL_SYNCHRONIZATION_TIMEOUT_IN_SECONDS + 1),
1✔
2061
            );
1✔
2062

1✔
2063
            main_loop_handler
1✔
2064
                .block_sync(&mut mutable_main_loop_state)
1✔
2065
                .await
1✔
2066
                .expect("Must return OK when sync mode has timed out");
1✔
2067
            assert!(
1✔
2068
                main_loop_handler
1✔
2069
                    .global_state_lock
1✔
2070
                    .lock_guard()
1✔
2071
                    .await
1✔
2072
                    .net
1✔
2073
                    .sync_anchor
1✔
2074
                    .is_none(),
1✔
2075
                "Sync mode must be unset on timeout"
1✔
2076
            );
1✔
2077
        }
1✔
2078
    }
2079

2080
    mod proof_upgrader {
2081
        use super::*;
2082
        use crate::job_queue::triton_vm::TritonVmJobQueue;
2083
        use crate::models::blockchain::transaction::Transaction;
2084
        use crate::models::blockchain::transaction::TransactionProof;
2085
        use crate::models::blockchain::type_scripts::native_currency_amount::NativeCurrencyAmount;
2086
        use crate::models::peer::transfer_transaction::TransactionProofQuality;
2087
        use crate::models::proof_abstractions::timestamp::Timestamp;
2088
        use crate::models::state::wallet::utxo_notification::UtxoNotificationMedium;
2089

2090
        async fn tx_no_outputs(
1✔
2091
            global_state_lock: &GlobalStateLock,
1✔
2092
            tx_proof_type: TxProvingCapability,
1✔
2093
            fee: NativeCurrencyAmount,
1✔
2094
        ) -> Transaction {
1✔
2095
            let change_key = global_state_lock
1✔
2096
                .lock_guard()
1✔
2097
                .await
1✔
2098
                .wallet_state
2099
                .wallet_entropy
2100
                .nth_generation_spending_key_for_tests(0);
1✔
2101
            let in_seven_months = global_state_lock
1✔
2102
                .lock_guard()
1✔
2103
                .await
1✔
2104
                .chain
2105
                .light_state()
1✔
2106
                .header()
1✔
2107
                .timestamp
1✔
2108
                + Timestamp::months(7);
1✔
2109

2110
            let global_state = global_state_lock.lock_guard().await;
1✔
2111
            global_state
1✔
2112
                .create_transaction_with_prover_capability(
1✔
2113
                    vec![].into(),
1✔
2114
                    change_key.into(),
1✔
2115
                    UtxoNotificationMedium::OffChain,
1✔
2116
                    fee,
1✔
2117
                    in_seven_months,
1✔
2118
                    tx_proof_type,
1✔
2119
                    &TritonVmJobQueue::dummy(),
1✔
2120
                )
1✔
2121
                .await
1✔
2122
                .unwrap()
1✔
2123
                .0
1✔
2124
        }
1✔
2125

2126
        #[tokio::test]
UNCOV
2127
        #[traced_test]
×
2128
        async fn upgrade_proof_collection_to_single_proof_foreign_tx() {
1✔
2129
            let num_outgoing_connections = 0;
1✔
2130
            let num_incoming_connections = 0;
1✔
2131
            let test_setup = setup(num_outgoing_connections, num_incoming_connections).await;
1✔
2132
            let TestSetup {
2133
                peer_to_main_rx,
1✔
2134
                miner_to_main_rx,
1✔
2135
                rpc_server_to_main_rx,
1✔
2136
                task_join_handles,
1✔
2137
                mut main_loop_handler,
1✔
2138
                mut main_to_peer_rx,
1✔
2139
            } = test_setup;
1✔
2140

1✔
2141
            // Force instance to create SingleProofs, otherwise CI and other
1✔
2142
            // weak machines fail.
1✔
2143
            let mocked_cli = cli_args::Args {
1✔
2144
                tx_proving_capability: Some(TxProvingCapability::SingleProof),
1✔
2145
                tx_proof_upgrade_interval: 100, // seconds
1✔
2146
                ..Default::default()
1✔
2147
            };
1✔
2148

1✔
2149
            main_loop_handler
1✔
2150
                .global_state_lock
1✔
2151
                .set_cli(mocked_cli)
1✔
2152
                .await;
1✔
2153
            let mut main_loop_handler = main_loop_handler.with_mocked_time(SystemTime::now());
1✔
2154
            let mut mutable_main_loop_state = MutableMainLoopState::new(task_join_handles);
1✔
2155

1✔
2156
            assert!(
1✔
2157
                main_loop_handler
1✔
2158
                    .proof_upgrader(&mut mutable_main_loop_state)
1✔
2159
                    .await
1✔
2160
                    .is_ok(),
1✔
UNCOV
2161
                "Scheduled task returns OK when run on empty mempool"
×
2162
            );
2163

2164
            let fee = NativeCurrencyAmount::coins(1);
1✔
2165
            let proof_collection_tx = tx_no_outputs(
1✔
2166
                &main_loop_handler.global_state_lock,
1✔
2167
                TxProvingCapability::ProofCollection,
1✔
2168
                fee,
1✔
2169
            )
1✔
2170
            .await;
1✔
2171

2172
            main_loop_handler
1✔
2173
                .global_state_lock
1✔
2174
                .lock_guard_mut()
1✔
2175
                .await
1✔
2176
                .mempool_insert(proof_collection_tx.clone(), TransactionOrigin::Foreign)
1✔
2177
                .await;
1✔
2178

2179
            assert!(
1✔
2180
                main_loop_handler
1✔
2181
                    .proof_upgrader(&mut mutable_main_loop_state)
1✔
2182
                    .await
1✔
2183
                    .is_ok(),
1✔
UNCOV
2184
                "Scheduled task returns OK when it's not yet time to upgrade"
×
2185
            );
2186

2187
            assert!(
1✔
UNCOV
2188
                matches!(
×
2189
                    main_loop_handler
1✔
2190
                        .global_state_lock
1✔
2191
                        .lock_guard()
1✔
2192
                        .await
1✔
2193
                        .mempool
2194
                        .get(proof_collection_tx.kernel.txid())
1✔
2195
                        .unwrap()
1✔
2196
                        .proof,
2197
                    TransactionProof::ProofCollection(_)
2198
                ),
UNCOV
2199
                "Proof in mempool must still be of type proof collection"
×
2200
            );
2201

2202
            // Mock that enough time has passed to perform the upgrade. Then
2203
            // perform the upgrade.
2204
            let mut main_loop_handler =
1✔
2205
                main_loop_handler.with_mocked_time(SystemTime::now() + Duration::from_secs(300));
1✔
2206
            assert!(
1✔
2207
                main_loop_handler
1✔
2208
                    .proof_upgrader(&mut mutable_main_loop_state)
1✔
2209
                    .await
1✔
2210
                    .is_ok(),
1✔
UNCOV
2211
                "Scheduled task must return OK when it's time to upgrade"
×
2212
            );
2213

2214
            // Wait for upgrade task to finish.
2215
            let handle = mutable_main_loop_state.proof_upgrader_task.unwrap().await;
1✔
2216
            assert!(
1✔
2217
                handle.is_ok(),
1✔
UNCOV
2218
                "Proof-upgrade task must finish successfully."
×
2219
            );
2220

2221
            // At this point there should be one transaction in the mempool,
2222
            // which is (if all is well) the merger of the ProofCollection
2223
            // transaction inserted above and one of the upgrader's fee
2224
            // gobblers. The point is that this transaction is a SingleProof
2225
            // transaction, so test that.
2226

2227
            let (merged_txid, _) = main_loop_handler
1✔
2228
                .global_state_lock
1✔
2229
                .lock_guard()
1✔
2230
                .await
1✔
2231
                .mempool
2232
                .get_sorted_iter()
1✔
2233
                .next_back()
1✔
2234
                .expect("mempool should contain one item here");
1✔
2235

1✔
2236
            assert!(
1✔
UNCOV
2237
                matches!(
×
2238
                    main_loop_handler
1✔
2239
                        .global_state_lock
1✔
2240
                        .lock_guard()
1✔
2241
                        .await
1✔
2242
                        .mempool
2243
                        .get(merged_txid)
1✔
2244
                        .unwrap()
1✔
2245
                        .proof,
2246
                    TransactionProof::SingleProof(_)
2247
                ),
UNCOV
2248
                "Proof in mempool must now be of type single proof"
×
2249
            );
2250

2251
            match main_to_peer_rx.recv().await {
1✔
2252
                Ok(MainToPeerTask::TransactionNotification(tx_noti)) => {
1✔
2253
                    assert_eq!(merged_txid, tx_noti.txid);
1✔
2254
                    assert_eq!(TransactionProofQuality::SingleProof, tx_noti.proof_quality);
1✔
2255
                },
2256
                other => panic!("Must have sent transaction notification to peer loop after successful proof upgrade. Got:\n{other:?}"),
×
2257
            }
2258

2259
            // These values are kept alive as the transmission-counterpart will
2260
            // otherwise fail on `send`.
2261
            drop(peer_to_main_rx);
1✔
2262
            drop(miner_to_main_rx);
1✔
2263
            drop(rpc_server_to_main_rx);
1✔
2264
            drop(main_to_peer_rx);
1✔
2265
        }
1✔
2266
    }
2267

2268
    mod peer_discovery {
2269
        use super::*;
2270

2271
        #[tokio::test]
UNCOV
2272
        #[traced_test]
×
2273
        async fn prune_peers_too_many_connections() {
1✔
2274
            let num_init_peers_outgoing = 10;
1✔
2275
            let num_init_peers_incoming = 4;
1✔
2276
            let test_setup = setup(num_init_peers_outgoing, num_init_peers_incoming).await;
1✔
2277
            let TestSetup {
2278
                mut main_to_peer_rx,
1✔
2279
                mut main_loop_handler,
1✔
2280
                ..
1✔
2281
            } = test_setup;
1✔
2282

1✔
2283
            let mocked_cli = cli_args::Args {
1✔
2284
                max_num_peers: num_init_peers_outgoing as usize,
1✔
2285
                ..Default::default()
1✔
2286
            };
1✔
2287

1✔
2288
            main_loop_handler
1✔
2289
                .global_state_lock
1✔
2290
                .set_cli(mocked_cli)
1✔
2291
                .await;
1✔
2292

2293
            main_loop_handler.prune_peers().await.unwrap();
1✔
2294
            assert_eq!(4, main_to_peer_rx.len());
1✔
2295
            for _ in 0..4 {
5✔
2296
                let peer_msg = main_to_peer_rx.recv().await.unwrap();
4✔
2297
                assert!(matches!(peer_msg, MainToPeerTask::Disconnect(_)))
4✔
2298
            }
1✔
2299
        }
1✔
2300

2301
        #[tokio::test]
UNCOV
2302
        #[traced_test]
×
2303
        async fn prune_peers_not_too_many_connections() {
1✔
2304
            let num_init_peers_outgoing = 10;
1✔
2305
            let num_init_peers_incoming = 1;
1✔
2306
            let test_setup = setup(num_init_peers_outgoing, num_init_peers_incoming).await;
1✔
2307
            let TestSetup {
2308
                main_to_peer_rx,
1✔
2309
                mut main_loop_handler,
1✔
2310
                ..
1✔
2311
            } = test_setup;
1✔
2312

1✔
2313
            let mocked_cli = cli_args::Args {
1✔
2314
                max_num_peers: 200,
1✔
2315
                ..Default::default()
1✔
2316
            };
1✔
2317

1✔
2318
            main_loop_handler
1✔
2319
                .global_state_lock
1✔
2320
                .set_cli(mocked_cli)
1✔
2321
                .await;
1✔
2322

2323
            main_loop_handler.prune_peers().await.unwrap();
1✔
2324
            assert!(main_to_peer_rx.is_empty());
1✔
2325
        }
1✔
2326

2327
        #[tokio::test]
2328
        #[traced_test]
1✔
2329
        async fn skip_peer_discovery_if_peer_limit_is_exceeded() {
1✔
2330
            let num_init_peers_outgoing = 2;
1✔
2331
            let num_init_peers_incoming = 0;
1✔
2332
            let test_setup = setup(num_init_peers_outgoing, num_init_peers_incoming).await;
1✔
2333
            let TestSetup {
2334
                task_join_handles,
1✔
2335
                mut main_loop_handler,
1✔
2336
                ..
1✔
2337
            } = test_setup;
1✔
2338

1✔
2339
            let mocked_cli = cli_args::Args {
1✔
2340
                max_num_peers: 0,
1✔
2341
                ..Default::default()
1✔
2342
            };
1✔
2343
            main_loop_handler
1✔
2344
                .global_state_lock
1✔
2345
                .set_cli(mocked_cli)
1✔
2346
                .await;
1✔
2347
            main_loop_handler
1✔
2348
                .discover_peers(&mut MutableMainLoopState::new(task_join_handles))
1✔
2349
                .await
1✔
2350
                .unwrap();
1✔
2351

1✔
2352
            assert!(logs_contain("Skipping peer discovery."));
1✔
2353
        }
1✔
2354

2355
        #[tokio::test]
2356
        #[traced_test]
1✔
2357
        async fn performs_peer_discovery_on_few_connections() {
1✔
2358
            let num_init_peers_outgoing = 2;
1✔
2359
            let num_init_peers_incoming = 0;
1✔
2360
            let TestSetup {
2361
                task_join_handles,
1✔
2362
                mut main_loop_handler,
1✔
2363
                mut main_to_peer_rx,
1✔
2364
                peer_to_main_rx: _keep_channel_open,
1✔
2365
                ..
2366
            } = setup(num_init_peers_outgoing, num_init_peers_incoming).await;
1✔
2367

2368
            // Set CLI to attempt to make more connections
2369
            let mocked_cli = cli_args::Args {
1✔
2370
                max_num_peers: 10,
1✔
2371
                ..Default::default()
1✔
2372
            };
1✔
2373
            main_loop_handler
1✔
2374
                .global_state_lock
1✔
2375
                .set_cli(mocked_cli)
1✔
2376
                .await;
1✔
2377
            main_loop_handler
1✔
2378
                .discover_peers(&mut MutableMainLoopState::new(task_join_handles))
1✔
2379
                .await
1✔
2380
                .unwrap();
1✔
2381

1✔
2382
            let peer_discovery_sent_messages_on_peer_channel = main_to_peer_rx.try_recv().is_ok();
1✔
2383
            assert!(peer_discovery_sent_messages_on_peer_channel);
1✔
2384
            assert!(logs_contain("Performing peer discovery"));
1✔
2385
        }
1✔
2386
    }
2387

2388
    #[test]
2389
    fn older_systemtime_ranks_first() {
1✔
2390
        let start = UNIX_EPOCH;
1✔
2391
        let other = UNIX_EPOCH + Duration::from_secs(1000);
1✔
2392
        let mut instants = [start, other];
1✔
2393

1✔
2394
        assert_eq!(
1✔
2395
            start,
1✔
2396
            instants.iter().copied().min_by(|l, r| l.cmp(r)).unwrap()
1✔
2397
        );
1✔
2398

2399
        instants.reverse();
1✔
2400

1✔
2401
        assert_eq!(
1✔
2402
            start,
1✔
2403
            instants.iter().copied().min_by(|l, r| l.cmp(r)).unwrap()
1✔
2404
        );
1✔
2405
    }
1✔
2406
    mod bootstrapper_mode {
2407

2408
        use rand::Rng;
2409

2410
        use super::*;
2411
        use crate::models::peer::PeerMessage;
2412
        use crate::models::peer::TransferConnectionStatus;
2413
        use crate::tests::shared::get_dummy_peer_connection_data_genesis;
2414
        use crate::tests::shared::to_bytes;
2415

2416
        #[tokio::test]
UNCOV
2417
        #[traced_test]
×
2418
        async fn disconnect_from_oldest_peer_upon_connection_request() {
1✔
2419
            // Set up a node in bootstrapper mode and connected to a given
1✔
2420
            // number of peers, which is one less than the maximum. Initiate a
1✔
2421
            // connection request. Verify that the oldest of the existing
1✔
2422
            // connections is dropped.
1✔
2423

1✔
2424
            let network = Network::Main;
1✔
2425
            let num_init_peers_outgoing = 5;
1✔
2426
            let num_init_peers_incoming = 0;
1✔
2427
            let test_setup = setup(num_init_peers_outgoing, num_init_peers_incoming).await;
1✔
2428
            let TestSetup {
2429
                mut peer_to_main_rx,
1✔
2430
                miner_to_main_rx: _,
1✔
2431
                rpc_server_to_main_rx: _,
1✔
2432
                task_join_handles,
1✔
2433
                mut main_loop_handler,
1✔
2434
                mut main_to_peer_rx,
1✔
2435
            } = test_setup;
1✔
2436

1✔
2437
            let mocked_cli = cli_args::Args {
1✔
2438
                max_num_peers: usize::from(num_init_peers_outgoing) + 1,
1✔
2439
                bootstrap: true,
1✔
2440
                network,
1✔
2441
                ..Default::default()
1✔
2442
            };
1✔
2443
            main_loop_handler
1✔
2444
                .global_state_lock
1✔
2445
                .set_cli(mocked_cli)
1✔
2446
                .await;
1✔
2447

2448
            let mut mutable_main_loop_state = MutableMainLoopState::new(task_join_handles);
1✔
2449

1✔
2450
            // check sanity: at startup, we are connected to the initial number of peers
1✔
2451
            assert_eq!(
1✔
2452
                usize::from(num_init_peers_outgoing),
1✔
2453
                main_loop_handler
1✔
2454
                    .global_state_lock
1✔
2455
                    .lock_guard()
1✔
2456
                    .await
1✔
2457
                    .net
2458
                    .peer_map
2459
                    .len()
1✔
2460
            );
2461

2462
            // randomize "connection established" timestamps
2463
            let mut rng = rand::rng();
1✔
2464
            let now = SystemTime::now();
1✔
2465
            let now_as_unix_timestamp = now.duration_since(UNIX_EPOCH).unwrap();
1✔
2466
            main_loop_handler
1✔
2467
                .global_state_lock
1✔
2468
                .lock_guard_mut()
1✔
2469
                .await
1✔
2470
                .net
2471
                .peer_map
2472
                .iter_mut()
1✔
2473
                .for_each(|(_socket_address, peer_info)| {
5✔
2474
                    peer_info.set_connection_established(
5✔
2475
                        UNIX_EPOCH
5✔
2476
                            + Duration::from_millis(
5✔
2477
                                rng.random_range(0..(now_as_unix_timestamp.as_millis() as u64)),
5✔
2478
                            ),
5✔
2479
                    );
5✔
2480
                });
5✔
2481

2482
            // compute which peer will be dropped, for later reference
2483
            let expected_drop_peer_socket_address = main_loop_handler
1✔
2484
                .global_state_lock
1✔
2485
                .lock_guard()
1✔
2486
                .await
1✔
2487
                .net
2488
                .peer_map
2489
                .iter()
1✔
2490
                .min_by(|l, r| {
4✔
2491
                    l.1.connection_established()
4✔
2492
                        .cmp(&r.1.connection_established())
4✔
2493
                })
4✔
2494
                .map(|(socket_address, _peer_info)| socket_address)
1✔
2495
                .copied()
1✔
2496
                .unwrap();
1✔
2497

1✔
2498
            // simulate incoming connection
1✔
2499
            let (peer_handshake_data, peer_socket_address) =
1✔
2500
                get_dummy_peer_connection_data_genesis(network, 1);
1✔
2501
            let own_handshake_data = main_loop_handler
1✔
2502
                .global_state_lock
1✔
2503
                .lock_guard()
1✔
2504
                .await
1✔
2505
                .get_own_handshakedata();
1✔
2506
            assert_eq!(peer_handshake_data.network, own_handshake_data.network,);
1✔
2507
            assert_eq!(peer_handshake_data.version, own_handshake_data.version,);
1✔
2508
            let mock_stream = tokio_test::io::Builder::new()
1✔
2509
                .read(
1✔
2510
                    &to_bytes(&PeerMessage::Handshake(Box::new((
1✔
2511
                        crate::MAGIC_STRING_REQUEST.to_vec(),
1✔
2512
                        peer_handshake_data.clone(),
1✔
2513
                    ))))
1✔
2514
                    .unwrap(),
1✔
2515
                )
1✔
2516
                .write(
1✔
2517
                    &to_bytes(&PeerMessage::Handshake(Box::new((
1✔
2518
                        crate::MAGIC_STRING_RESPONSE.to_vec(),
1✔
2519
                        own_handshake_data.clone(),
1✔
2520
                    ))))
1✔
2521
                    .unwrap(),
1✔
2522
                )
1✔
2523
                .write(
1✔
2524
                    &to_bytes(&PeerMessage::ConnectionStatus(
1✔
2525
                        TransferConnectionStatus::Accepted,
1✔
2526
                    ))
1✔
2527
                    .unwrap(),
1✔
2528
                )
1✔
2529
                .build();
1✔
2530
            let peer_to_main_tx_clone = main_loop_handler.peer_task_to_main_tx.clone();
1✔
2531
            let global_state_lock_clone = main_loop_handler.global_state_lock.clone();
1✔
2532
            let (_main_to_peer_tx_mock, main_to_peer_rx_mock) = tokio::sync::broadcast::channel(10);
1✔
2533
            let incoming_peer_task_handle = tokio::task::Builder::new()
1✔
2534
                .name("answer_peer_wrapper")
1✔
2535
                .spawn(async move {
1✔
2536
                    match answer_peer(
1✔
2537
                        mock_stream,
1✔
2538
                        global_state_lock_clone,
1✔
2539
                        peer_socket_address,
1✔
2540
                        main_to_peer_rx_mock,
1✔
2541
                        peer_to_main_tx_clone,
1✔
2542
                        own_handshake_data,
1✔
2543
                    )
1✔
2544
                    .await
1✔
2545
                    {
NEW
2546
                        Ok(()) => (),
×
NEW
2547
                        Err(err) => error!("Got error: {:?}", err),
×
2548
                    }
2549
                })
1✔
2550
                .unwrap();
1✔
2551

2552
            // `answer_peer_wrapper` should send a
2553
            // `DisconnectFromLongestLivedPeer` message to main
2554
            let peer_to_main_message = peer_to_main_rx.recv().await.unwrap();
1✔
2555
            assert!(matches!(
1✔
2556
                peer_to_main_message,
1✔
2557
                PeerTaskToMain::DisconnectFromLongestLivedPeer,
2558
            ));
2559

2560
            // process this message
2561
            main_loop_handler
1✔
2562
                .handle_peer_task_message(peer_to_main_message, &mut mutable_main_loop_state)
1✔
2563
                .await
1✔
2564
                .unwrap();
1✔
2565

2566
            // main loop should send a `Disconnect` message
2567
            let main_to_peers_message = main_to_peer_rx.recv().await.unwrap();
1✔
2568
            let MainToPeerTask::Disconnect(observed_drop_peer_socket_address) =
1✔
2569
                main_to_peers_message
1✔
2570
            else {
UNCOV
2571
                panic!("Expected disconnect, got {main_to_peers_message:?}");
×
2572
            };
2573

2574
            // matched observed droppee against expectation
2575
            assert_eq!(
1✔
2576
                expected_drop_peer_socket_address,
1✔
2577
                observed_drop_peer_socket_address,
1✔
2578
            );
1✔
2579
            println!("Dropped connection with {expected_drop_peer_socket_address}.");
1✔
2580

1✔
2581
            // don't forget to terminate the peer task, which is still running
1✔
2582
            incoming_peer_task_handle.abort();
1✔
2583
        }
1✔
2584
    }
2585
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc