• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Neptune-Crypto / neptune-core / 13897743430

14 Mar 2025 03:15PM UTC coverage: 43.131% (-41.0%) from 84.172%
13897743430

push

github

jan-ferdinand
feat: Conditionally connect to bootstrap nodes

In order to help newcomers to connect to peers on the network, certain
nodes act as bootstrap nodes. Those nodes tend to be well known, and
users tend to specify them as potential peers via command line
arguments.

Previously, connections to potential peers specified as command line
arguments were always re-initiated when dropped. This can cause an
involuntary denial-of-service of those well-known bootstrap nodes.

Now, a well-connected node does not initiate connections with bootstrap
nodes.

119 of 164 new or added lines in 4 files covered. (72.56%)

656 existing lines in 10 files now uncovered.

2926 of 6784 relevant lines covered (43.13%)

1571562.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.7
/src/main_loop.rs
1
pub mod proof_upgrader;
2

3
use std::cmp::Ordering;
4
use std::collections::HashMap;
5
use std::net::SocketAddr;
6
use std::time::Duration;
7
use std::time::SystemTime;
8

9
use anyhow::Result;
10
use itertools::Itertools;
11
use proof_upgrader::get_upgrade_task_from_mempool;
12
use proof_upgrader::UpdateMutatorSetDataJob;
13
use proof_upgrader::UpgradeJob;
14
use rand::prelude::IteratorRandom;
15
use rand::seq::IndexedRandom;
16
use tokio::net::TcpListener;
17
use tokio::select;
18
use tokio::signal;
19
use tokio::sync::broadcast;
20
use tokio::sync::mpsc;
21
use tokio::task::JoinHandle;
22
use tokio::time;
23
use tracing::debug;
24
use tracing::error;
25
use tracing::info;
26
use tracing::trace;
27
use tracing::warn;
28

29
use crate::config_models::cli_args;
30
use crate::connect_to_peers::answer_peer;
31
use crate::connect_to_peers::call_peer;
32
use crate::job_queue::triton_vm::TritonVmJobPriority;
33
use crate::job_queue::triton_vm::TritonVmJobQueue;
34
use crate::macros::fn_name;
35
use crate::macros::log_slow_scope;
36
use crate::models::blockchain::block::block_header::BlockHeader;
37
use crate::models::blockchain::block::block_height::BlockHeight;
38
use crate::models::blockchain::block::difficulty_control::ProofOfWork;
39
use crate::models::blockchain::block::Block;
40
use crate::models::blockchain::transaction::Transaction;
41
use crate::models::blockchain::transaction::TransactionProof;
42
use crate::models::channel::MainToMiner;
43
use crate::models::channel::MainToPeerTask;
44
use crate::models::channel::MainToPeerTaskBatchBlockRequest;
45
use crate::models::channel::MinerToMain;
46
use crate::models::channel::PeerTaskToMain;
47
use crate::models::channel::RPCServerToMain;
48
use crate::models::peer::bootstrap_info::BootstrapStatus;
49
use crate::models::peer::handshake_data::HandshakeData;
50
use crate::models::peer::peer_info::PeerInfo;
51
use crate::models::peer::transaction_notification::TransactionNotification;
52
use crate::models::peer::InstanceId;
53
use crate::models::peer::PeerSynchronizationState;
54
use crate::models::proof_abstractions::tasm::program::TritonVmProofJobOptions;
55
use crate::models::state::block_proposal::BlockProposal;
56
use crate::models::state::mempool::TransactionOrigin;
57
use crate::models::state::networking_state::NetworkingState;
58
use crate::models::state::networking_state::SyncAnchor;
59
use crate::models::state::tx_proving_capability::TxProvingCapability;
60
use crate::models::state::GlobalState;
61
use crate::models::state::GlobalStateLock;
62
use crate::SUCCESS_EXIT_CODE;
63

64
const PEER_DISCOVERY_INTERVAL_IN_SECONDS: u64 = 120;
65
const SYNC_REQUEST_INTERVAL_IN_SECONDS: u64 = 3;
66
const MEMPOOL_PRUNE_INTERVAL_IN_SECS: u64 = 30 * 60; // 30mins
67
const MP_RESYNC_INTERVAL_IN_SECS: u64 = 59;
68
const EXPECTED_UTXOS_PRUNE_INTERVAL_IN_SECS: u64 = 19 * 60; // 19 mins
69

70
/// Interval for when transaction-upgrade checker is run. Note that this does
71
/// *not* define how often a transaction-proof upgrade is actually performed.
72
/// Only how often we check if we're ready to perform an upgrade.
73
const TRANSACTION_UPGRADE_CHECK_INTERVAL_IN_SECONDS: u64 = 60; // 1 minute
74

75
const SANCTION_PEER_TIMEOUT_FACTOR: u64 = 40;
76

77
/// Number of seconds within which an individual peer is expected to respond
78
/// to a synchronization request.
79
const INDIVIDUAL_PEER_SYNCHRONIZATION_TIMEOUT_IN_SECONDS: u64 =
80
    SANCTION_PEER_TIMEOUT_FACTOR * SYNC_REQUEST_INTERVAL_IN_SECONDS;
81

82
/// Number of seconds that a synchronization may run without any progress.
83
const GLOBAL_SYNCHRONIZATION_TIMEOUT_IN_SECONDS: u64 =
84
    INDIVIDUAL_PEER_SYNCHRONIZATION_TIMEOUT_IN_SECONDS * 4;
85

86
const POTENTIAL_PEER_MAX_COUNT_AS_A_FACTOR_OF_MAX_PEERS: usize = 20;
87
pub(crate) const MAX_NUM_DIGESTS_IN_BATCH_REQUEST: usize = 200;
88
const TX_UPDATER_CHANNEL_CAPACITY: usize = 1;
89

90
/// Wraps a transmission channel.
91
///
92
/// To be used for the transmission channel to the miner, because
93
///  a) the miner might not exist in which case there would be no-one to empty
94
///     the channel; and
95
///  b) contrary to other channels, transmission failures here are not critical.
96
#[derive(Debug)]
97
struct MainToMinerChannel(Option<mpsc::Sender<MainToMiner>>);
98

99
impl MainToMinerChannel {
100
    /// Send a message to the miner task (if any).
101
    fn send(&self, message: MainToMiner) {
×
102
        // Do no use the async `send` function because it blocks until there
103
        // is spare capacity on the channel. Messages to the miner are not
104
        // critical so if there is no capacity left, just log an error
105
        // message.
106
        if let Some(channel) = &self.0 {
×
107
            if let Err(e) = channel.try_send(message) {
×
108
                error!("Failed to send pause message to miner thread:\n{e}");
×
109
            }
×
110
        }
×
111
    }
×
112
}
113

114
/// MainLoop is the immutable part of the input for the main loop function
115
#[derive(Debug)]
116
pub struct MainLoopHandler {
117
    incoming_peer_listener: TcpListener,
118
    global_state_lock: GlobalStateLock,
119

120
    // note: broadcast::Sender::send() does not block
121
    main_to_peer_broadcast_tx: broadcast::Sender<MainToPeerTask>,
122

123
    // note: mpsc::Sender::send() blocks if channel full.
124
    // locks should not be held across it.
125
    peer_task_to_main_tx: mpsc::Sender<PeerTaskToMain>,
126

127
    // note: MainToMinerChannel::send() does not block.  might log error.
128
    main_to_miner_tx: MainToMinerChannel,
129

130
    #[cfg(test)]
131
    mock_now: Option<SystemTime>,
132
}
133

134
/// The mutable part of the main loop function
135
struct MutableMainLoopState {
136
    /// Information used to batch-download blocks.
137
    sync_state: SyncState,
138

139
    /// Information about potential peers for new connections.
140
    potential_peers: PotentialPeersState,
141

142
    /// A list of joinhandles to spawned tasks.
143
    task_handles: Vec<JoinHandle<()>>,
144

145
    /// A joinhandle to a task performing transaction-proof upgrades.
146
    proof_upgrader_task: Option<JoinHandle<()>>,
147

148
    /// A joinhandle to a task running the update of the mempool transactions.
149
    update_mempool_txs_handle: Option<JoinHandle<()>>,
150

151
    /// A channel that the task updating mempool transactions can use to
152
    /// communicate its result.
153
    update_mempool_receiver: mpsc::Receiver<Vec<Transaction>>,
154
}
155

156
impl MutableMainLoopState {
157
    fn new(cli_args: &cli_args::Args, task_handles: Vec<JoinHandle<()>>) -> Self {
6✔
158
        let (_dummy_sender, dummy_receiver) = mpsc::channel(TX_UPDATER_CHANNEL_CAPACITY);
6✔
159
        Self {
6✔
160
            sync_state: SyncState::default(),
6✔
161
            potential_peers: PotentialPeersState::new(cli_args),
6✔
162
            task_handles,
6✔
163
            proof_upgrader_task: None,
6✔
164
            update_mempool_txs_handle: None,
6✔
165
            update_mempool_receiver: dummy_receiver,
6✔
166
        }
6✔
167
    }
6✔
168
}
169

170
/// handles batch-downloading of blocks if we are more than n blocks behind
171
#[derive(Default, Debug)]
172
struct SyncState {
173
    peer_sync_states: HashMap<SocketAddr, PeerSynchronizationState>,
174
    last_sync_request: Option<(SystemTime, BlockHeight, SocketAddr)>,
175
}
176

177
impl SyncState {
178
    fn record_request(
1✔
179
        &mut self,
1✔
180
        requested_block_height: BlockHeight,
1✔
181
        peer: SocketAddr,
1✔
182
        now: SystemTime,
1✔
183
    ) {
1✔
184
        self.last_sync_request = Some((now, requested_block_height, peer));
1✔
185
    }
1✔
186

187
    /// Return a list of peers that have reported to be in possession of blocks
188
    /// with a PoW above a threshold.
189
    fn get_potential_peers_for_sync_request(&self, threshold_pow: ProofOfWork) -> Vec<SocketAddr> {
2✔
190
        self.peer_sync_states
2✔
191
            .iter()
2✔
192
            .filter(|(_sa, sync_state)| sync_state.claimed_max_pow > threshold_pow)
2✔
193
            .map(|(sa, _)| *sa)
2✔
194
            .collect()
2✔
195
    }
2✔
196

197
    /// Determine if a peer should be sanctioned for failing to respond to a
198
    /// synchronization request fast enough. Also determine if a new request
199
    /// should be made or the previous one should be allowed to run for longer.
200
    ///
201
    /// Returns (peer to be sanctioned, attempt new request).
202
    fn get_status_of_last_request(
1✔
203
        &self,
1✔
204
        current_block_height: BlockHeight,
1✔
205
        now: SystemTime,
1✔
206
    ) -> (Option<SocketAddr>, bool) {
1✔
207
        // A peer is sanctioned if no answer has been received after N times the sync request
1✔
208
        // interval.
1✔
209
        match self.last_sync_request {
1✔
210
            None => {
211
                // No sync request has been made since startup of program
212
                (None, true)
1✔
213
            }
214
            Some((req_time, requested_height, peer_sa)) => {
×
215
                if requested_height < current_block_height {
×
216
                    // The last sync request updated the state
217
                    (None, true)
×
218
                } else if req_time
×
219
                    + Duration::from_secs(INDIVIDUAL_PEER_SYNCHRONIZATION_TIMEOUT_IN_SECONDS)
×
220
                    < now
×
221
                {
222
                    // The last sync request was not answered, sanction peer
223
                    // and make a new sync request.
224
                    (Some(peer_sa), true)
×
225
                } else {
226
                    // The last sync request has not yet been answered. But it has
227
                    // not timed out yet.
228
                    (None, false)
×
229
                }
230
            }
231
        }
232
    }
1✔
233
}
234

235
/// A potential peer in the process of peer discovery.
236
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
237
struct PeerCandidate {
238
    address: SocketAddr,
239
    id: InstanceId,
240
    distance: u8,
241
}
242

243
impl PeerCandidate {
244
    fn new(address: SocketAddr, id: InstanceId, distance: u8) -> Self {
2✔
245
        Self {
2✔
246
            address,
2✔
247
            id,
2✔
248
            distance,
2✔
249
        }
2✔
250
    }
2✔
251
}
252

253
/// The potential peers in the process of peer discovery.
254
#[derive(Debug, Clone)]
255
struct PotentialPeersState {
256
    /// A copy of the corresponding field from the
257
    /// [command line arguments](cli_args::Args).
258
    max_num_peers: usize,
259

260
    candidates: HashMap<SocketAddr, PeerCandidate>,
261
}
262

263
impl PotentialPeersState {
264
    // Takes (a reference to) the entire command line arguments to facilitate
265
    // future refactors: in case additional information is needed, is can just
266
    // be grabbed here.
267
    // Storing the entire argument list is either a potentially big cloning
268
    // operation or a big lifetime challenge.
269
    fn new(cli_args: &cli_args::Args) -> Self {
8✔
270
        Self {
8✔
271
            max_num_peers: cli_args.max_num_peers,
8✔
272
            candidates: HashMap::new(),
8✔
273
        }
8✔
274
    }
8✔
275

276
    fn add(&mut self, candidate: PeerCandidate) {
2✔
277
        // always use the lowest observed distance for a given candidate
278
        if let Some(existing_candidate) = self.candidates.get_mut(&candidate.address) {
2✔
NEW
279
            if candidate.distance < existing_candidate.distance {
×
NEW
280
                *existing_candidate = candidate;
×
NEW
281
            }
×
282
            return;
×
283
        }
2✔
284

2✔
285
        // if the candidate list is full, remove random entries
2✔
286
        let max_num_candidates =
2✔
287
            self.max_num_peers * POTENTIAL_PEER_MAX_COUNT_AS_A_FACTOR_OF_MAX_PEERS;
2✔
288
        while self.candidates.len() >= max_num_candidates {
2✔
NEW
289
            let Some(&random_candidate) = self.candidates.keys().choose(&mut rand::rng()) else {
×
NEW
290
                warn!("Failed to shrink full potential peer list: couldn't find element to remove");
×
NEW
291
                return;
×
292
            };
NEW
293
            if self.candidates.remove(&random_candidate).is_none() {
×
NEW
294
                warn!("Failed to shrink full potential peer list: couldn't remove chosen element");
×
NEW
295
                return;
×
NEW
296
            };
×
297
        }
298

299
        self.candidates.insert(candidate.address, candidate);
2✔
300
    }
2✔
301

302
    /// Select a node that
303
    /// - is not us,
304
    /// - is no peer, and
305
    /// - if we are well-connected, is not a bootstrap node.
306
    ///
307
    /// Favors peers with a large distance and those with IP addresses we are
308
    /// not already connected to.
309
    ///
310
    /// The decision whether a potential peer is identical to the current node
311
    /// is made _only_ based on the [`InstanceId`]. This means that different
312
    /// nodes that run on the same computer _can_ connect to each other.
313
    ///
314
    /// This method requires (immutable) access to the [`NetworkingState`],
315
    /// which in turn requires the caller of this function to acquire a read
316
    /// lock of the `global_state_lock`. This design is a tradeoff; the
317
    /// alternative is to clone the various fields and then release the lock.
318
    //
319
    // A note on the design: The method uses various filters as well as a way to
320
    // compare two peer candidates in a simple iterator-combinator chain to
321
    // select the candidate.
322
    // The design's intention is to simplify future changes: declaring a new
323
    // filter as well as modifying or dropping existing filters has little to no
324
    // impact on other filters. Similarly, modifying the comparison function
325
    // does not require changes to the filters.
326
    fn peer_candidate(&self, networking_state: &NetworkingState) -> Option<PeerCandidate> {
3✔
327
        let is_self = |candidate: &PeerCandidate| candidate.id == networking_state.instance_id;
3✔
328
        let same_id_is_connected = |candidate: &PeerCandidate| {
3✔
329
            networking_state
2✔
330
                .peer_map
2✔
331
                .values()
2✔
332
                .any(|peer| peer.instance_id() == candidate.id)
42✔
333
        };
2✔
334
        let same_socket_is_connected = |candidate: &PeerCandidate| {
3✔
335
            networking_state
2✔
336
                .peer_map
2✔
337
                .values()
2✔
338
                .filter_map(|peer| peer.listen_address())
42✔
339
                .any(|address| address == candidate.address)
42✔
340
        };
2✔
341
        let is_bootstrap_node = |candidate: &PeerCandidate| {
3✔
342
            let candidate_bootstrap_status = networking_state
1✔
343
                .bootstrap_status
1✔
344
                .get(&candidate.address)
1✔
345
                .map(|bootstrap_info| bootstrap_info.status)
1✔
346
                .unwrap_or_default();
1✔
347
            candidate_bootstrap_status == BootstrapStatus::Bootstrap
1✔
348
        };
1✔
349

350
        let curr_num_peers = networking_state.peer_map.len();
3✔
351
        let self_is_well_connected = self.is_well_connected(curr_num_peers);
3✔
352

3✔
353
        self.candidates
3✔
354
            .values()
3✔
355
            .filter(|candidate| !is_self(candidate))
3✔
356
            .filter(|candidate| !same_id_is_connected(candidate))
3✔
357
            .filter(|candidate| !same_socket_is_connected(candidate))
3✔
358
            .filter(|candidate| !(self_is_well_connected && is_bootstrap_node(candidate)))
3✔
359
            .max_by(|l, r| Self::candidate_ordering(networking_state.peer_map.values(), l, r))
3✔
360
            .copied()
3✔
361
    }
3✔
362

363
    fn is_well_connected(&self, curr_num_peers: usize) -> bool {
3✔
364
        /// A node is considered minimally well-connected if it has at least
365
        /// this many peers.
366
        const NUM_PEERS_MINIMALLY_WELL_CONNECTED: usize = 3;
367

368
        /// A node is considered sufficiently well-connected if it has more
369
        /// peers than this ratio of its maximum number of peers.
370
        const SUFFICIENTLY_WELL_CONNECTED_RATIO: f32 = 0.5;
371

372
        let is_minimally_well_connected = curr_num_peers >= NUM_PEERS_MINIMALLY_WELL_CONNECTED;
3✔
373

3✔
374
        let is_sufficiently_well_connected =
3✔
375
            curr_num_peers as f32 > SUFFICIENTLY_WELL_CONNECTED_RATIO * self.max_num_peers as f32;
3✔
376

3✔
377
        is_minimally_well_connected && is_sufficiently_well_connected
3✔
378
    }
3✔
379

380
    /// Order candidates by connection status of their IP address and distance.
381
    ///
382
    /// The resulting [Ordering] is used like in [`Ord::cmp`]. For example,
383
    /// `candidate_ordering(&[], left, right) == `[`Ordering::Less`] means that
384
    /// `left` is the less-suitable candidate.
385
    //
386
    // This is not `impl Ord for PeerCandidate` because it depends on the
387
    // current peers.
NEW
388
    fn candidate_ordering<'pi>(
×
NEW
389
        current_peers: impl Iterator<Item = &'pi PeerInfo> + Clone,
×
NEW
390
        left: &PeerCandidate,
×
NEW
391
        right: &PeerCandidate,
×
NEW
392
    ) -> Ordering {
×
NEW
393
        // Does a connection to the candidate's IP exist?
×
NEW
394
        let ip_is_connected = |candidate: &PeerCandidate| {
×
NEW
395
            current_peers
×
NEW
396
                .clone()
×
NEW
397
                .filter_map(|peer| peer.listen_address())
×
NEW
398
                .any(|address| address.ip() == candidate.address.ip())
×
NEW
399
        };
×
400

NEW
401
        match (ip_is_connected(left), ip_is_connected(right)) {
×
NEW
402
            (true, false) => Ordering::Less,    // prefer `right`
×
NEW
403
            (false, true) => Ordering::Greater, // prefer `left`
×
NEW
404
            _ => left.distance.cmp(&right.distance),
×
405
        }
UNCOV
406
    }
×
407
}
408

409
/// Return a boolean indicating if synchronization mode should be left
410
fn stay_in_sync_mode(
×
411
    own_block_tip_header: &BlockHeader,
×
412
    sync_state: &SyncState,
×
413
    sync_mode_threshold: usize,
×
414
) -> bool {
×
415
    let max_claimed_pow = sync_state
×
416
        .peer_sync_states
×
417
        .values()
×
418
        .max_by_key(|x| x.claimed_max_pow);
×
419
    match max_claimed_pow {
×
420
        None => false, // No peer have passed the sync challenge phase.
×
421

422
        // Synchronization is left when the remaining number of block is half of what has
423
        // been indicated to fit into RAM
424
        Some(max_claim) => {
×
425
            own_block_tip_header.cumulative_proof_of_work < max_claim.claimed_max_pow
×
426
                && max_claim.claimed_max_height - own_block_tip_header.height
×
427
                    > sync_mode_threshold as i128 / 2
×
428
        }
429
    }
430
}
×
431

432
impl MainLoopHandler {
433
    pub(crate) fn new(
10✔
434
        incoming_peer_listener: TcpListener,
10✔
435
        global_state_lock: GlobalStateLock,
10✔
436
        main_to_peer_broadcast_tx: broadcast::Sender<MainToPeerTask>,
10✔
437
        peer_task_to_main_tx: mpsc::Sender<PeerTaskToMain>,
10✔
438
        main_to_miner_tx: mpsc::Sender<MainToMiner>,
10✔
439
    ) -> Self {
10✔
440
        let maybe_main_to_miner_tx = if global_state_lock.cli().mine() {
10✔
441
            Some(main_to_miner_tx)
×
442
        } else {
443
            None
10✔
444
        };
445
        Self {
10✔
446
            incoming_peer_listener,
10✔
447
            global_state_lock,
10✔
448
            main_to_miner_tx: MainToMinerChannel(maybe_main_to_miner_tx),
10✔
449
            main_to_peer_broadcast_tx,
10✔
450
            peer_task_to_main_tx,
10✔
451
            #[cfg(test)]
10✔
452
            mock_now: None,
10✔
453
        }
10✔
454
    }
10✔
455

456
    /// Allows for mocked timestamps such that time dependencies may be tested.
457
    #[cfg(test)]
458
    fn with_mocked_time(mut self, mocked_time: SystemTime) -> Self {
3✔
459
        self.mock_now = Some(mocked_time);
3✔
460
        self
3✔
461
    }
3✔
462

463
    fn now(&self) -> SystemTime {
7✔
464
        #[cfg(not(test))]
7✔
465
        {
7✔
466
            SystemTime::now()
7✔
467
        }
7✔
468
        #[cfg(test)]
7✔
469
        {
7✔
470
            self.mock_now.unwrap_or(SystemTime::now())
7✔
471
        }
7✔
472
    }
7✔
473

474
    /// Run a list of Triton VM prover jobs that update the mutator set state
475
    /// for transactions.
476
    ///
477
    /// Sends the result back through the provided channel.
478
    async fn update_mempool_jobs(
×
479
        update_jobs: Vec<UpdateMutatorSetDataJob>,
×
480
        job_queue: &TritonVmJobQueue,
×
481
        transaction_update_sender: mpsc::Sender<Vec<Transaction>>,
×
482
        proof_job_options: TritonVmProofJobOptions,
×
483
    ) {
×
484
        debug!(
×
485
            "Attempting to update transaction proofs of {} transactions",
×
486
            update_jobs.len()
×
487
        );
488
        let mut result = vec![];
×
489
        for job in update_jobs {
×
490
            // Jobs for updating txs in the mempool have highest priority since
491
            // they block the composer from continuing.
492
            // TODO: Handle errors better here.
493
            let job_result = job
×
494
                .upgrade(job_queue, proof_job_options.clone())
×
495
                .await
×
496
                .unwrap();
×
497
            result.push(job_result);
×
498
        }
499

500
        transaction_update_sender
×
501
            .send(result)
×
502
            .await
×
503
            .expect("Receiver for updated txs in main loop must still exist");
×
504
    }
×
505

506
    /// Handles a list of transactions whose proof has been updated with new
507
    /// mutator set data.
508
    async fn handle_updated_mempool_txs(&mut self, updated_txs: Vec<Transaction>) {
×
509
        // Update mempool with updated transactions
510
        {
511
            let mut state = self.global_state_lock.lock_guard_mut().await;
×
512
            for updated in &updated_txs {
×
513
                let txid = updated.kernel.txid();
×
514
                if let Some(tx) = state.mempool.get_mut(txid) {
×
515
                    *tx = updated.to_owned();
×
516
                } else {
×
517
                    warn!("Updated transaction which is no longer in mempool");
×
518
                }
519
            }
520
        }
521

522
        // Then notify all peers
523
        for updated in updated_txs {
×
524
            self.main_to_peer_broadcast_tx
×
525
                .send(MainToPeerTask::TransactionNotification(
×
526
                    (&updated).try_into().unwrap(),
×
527
                ))
×
528
                .unwrap();
×
529
        }
×
530

531
        // Tell miner that it can now start composing next block.
532
        self.main_to_miner_tx.send(MainToMiner::Continue);
×
533
    }
×
534

535
    /// Process a block whose PoW solution was solved by this client (or an
536
    /// external program) and has not been seen by the rest of the network yet.
537
    ///
538
    /// Shares block with all connected peers, updates own state, and updates
539
    /// any mempool transactions to be valid under this new block.
540
    ///
541
    /// Locking:
542
    ///  * acquires `global_state_lock` for write
543
    async fn handle_self_guessed_block(
1✔
544
        &mut self,
1✔
545
        main_loop_state: &mut MutableMainLoopState,
1✔
546
        new_block: Box<Block>,
1✔
547
    ) -> Result<()> {
1✔
548
        let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
1✔
549

550
        if !global_state_mut.incoming_block_is_more_canonical(&new_block) {
1✔
551
            drop(global_state_mut); // don't hold across send()
×
552
            warn!("Got new block from miner that was not child of tip. Discarding.");
×
553
            self.main_to_miner_tx.send(MainToMiner::Continue);
×
554
            return Ok(());
×
555
        }
1✔
556
        info!("Locally-mined block is new tip: {}", new_block.hash());
1✔
557

558
        // Share block with peers first thing.
559
        info!("broadcasting new block to peers");
1✔
560
        self.main_to_peer_broadcast_tx
1✔
561
            .send(MainToPeerTask::Block(new_block.clone()))
1✔
562
            .expect("Peer handler broadcast channel prematurely closed.");
1✔
563

564
        let update_jobs = global_state_mut.set_new_tip(*new_block).await?;
1✔
565
        drop(global_state_mut);
1✔
566

1✔
567
        self.spawn_mempool_txs_update_job(main_loop_state, update_jobs);
1✔
568

1✔
569
        Ok(())
1✔
570
    }
1✔
571

572
    /// Locking:
573
    ///   * acquires `global_state_lock` for write
574
    async fn handle_miner_task_message(
×
575
        &mut self,
×
576
        msg: MinerToMain,
×
577
        main_loop_state: &mut MutableMainLoopState,
×
578
    ) -> Result<Option<i32>> {
×
579
        match msg {
×
580
            MinerToMain::NewBlockFound(new_block_info) => {
×
581
                log_slow_scope!(fn_name!() + "::MinerToMain::NewBlockFound");
×
582

×
583
                let new_block = new_block_info.block;
×
584

×
585
                info!("Miner found new block: {}", new_block.kernel.header.height);
×
586
                self.handle_self_guessed_block(main_loop_state, new_block)
×
587
                    .await?;
×
588
            }
589
            MinerToMain::BlockProposal(boxed_proposal) => {
×
590
                let (block, expected_utxos) = *boxed_proposal;
×
591

592
                // If block proposal from miner does not build on current tip,
593
                // don't broadcast it. This check covers reorgs as well.
594
                let current_tip = self
×
595
                    .global_state_lock
×
596
                    .lock_guard()
×
597
                    .await
×
598
                    .chain
599
                    .light_state()
×
600
                    .clone();
×
601
                if block.header().prev_block_digest != current_tip.hash() {
×
602
                    warn!(
×
603
                        "Got block proposal from miner that does not build on current tip. \
×
604
                           Rejecting. If this happens a lot, then maybe this machine is too \
×
605
                           slow to competitively compose blocks. Consider running the client only \
×
606
                           with the guesser flag set and not the compose flag."
×
607
                    );
608
                    self.main_to_miner_tx.send(MainToMiner::Continue);
×
609
                    return Ok(None);
×
610
                }
×
611

×
612
                // Ensure proposal validity before sharing
×
613
                if !block.is_valid(&current_tip, block.header().timestamp).await {
×
614
                    error!("Own block proposal invalid. This should not happen.");
×
615
                    self.main_to_miner_tx.send(MainToMiner::Continue);
×
616
                    return Ok(None);
×
617
                }
×
618

×
619
                if !self.global_state_lock.cli().secret_compositions {
×
620
                    self.main_to_peer_broadcast_tx
×
621
                    .send(MainToPeerTask::BlockProposalNotification((&block).into()))
×
622
                    .expect(
×
623
                        "Peer handler broadcast channel prematurely closed. This should never happen.",
×
624
                    );
×
625
                }
×
626

627
                {
628
                    // Use block proposal and add expected UTXOs from this
629
                    // proposal.
630
                    let mut state = self.global_state_lock.lock_guard_mut().await;
×
631
                    state.mining_state.block_proposal =
×
632
                        BlockProposal::own_proposal(block.clone(), expected_utxos.clone());
×
633
                    state.wallet_state.add_expected_utxos(expected_utxos).await;
×
634
                }
635

636
                // Indicate to miner that block proposal was successfully
637
                // received by main-loop.
638
                self.main_to_miner_tx.send(MainToMiner::Continue);
×
639
            }
640
            MinerToMain::Shutdown(exit_code) => {
×
641
                return Ok(Some(exit_code));
×
642
            }
643
        }
644

645
        Ok(None)
×
646
    }
×
647

648
    /// Locking:
649
    ///   * acquires `global_state_lock` for write
650
    async fn handle_peer_task_message(
1✔
651
        &mut self,
1✔
652
        msg: PeerTaskToMain,
1✔
653
        main_loop_state: &mut MutableMainLoopState,
1✔
654
    ) -> Result<()> {
1✔
655
        debug!("Received {} from a peer task", msg.get_type());
1✔
656
        let cli_args = self.global_state_lock.cli().clone();
1✔
657
        match msg {
1✔
658
            PeerTaskToMain::NewBlocks(blocks) => {
×
659
                log_slow_scope!(fn_name!() + "::PeerTaskToMain::NewBlocks");
×
660

×
661
                let last_block = blocks.last().unwrap().to_owned();
×
662
                let update_jobs = {
×
663
                    // The peer tasks also check this condition, if block is more canonical than current
664
                    // tip, but we have to check it again since the block update might have already been applied
665
                    // through a message from another peer (or from own miner).
666
                    let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
×
667
                    let new_canonical =
×
668
                        global_state_mut.incoming_block_is_more_canonical(&last_block);
×
669

×
670
                    if !new_canonical {
×
671
                        // The blocks are not canonical, but: if we are in sync
672
                        // mode and these blocks beat our current champion, then
673
                        // we store them anyway, without marking them as tip.
674
                        let Some(sync_anchor) = global_state_mut.net.sync_anchor.as_mut() else {
×
675
                            warn!(
×
676
                                "Blocks were not new, and we're not syncing. Not storing blocks."
×
677
                            );
678
                            return Ok(());
×
679
                        };
680
                        if sync_anchor
×
681
                            .champion
×
682
                            .is_some_and(|(height, _)| height >= last_block.header().height)
×
683
                        {
684
                            warn!("Repeated blocks received in sync mode, not storing");
×
685
                            return Ok(());
×
686
                        }
×
687

×
688
                        sync_anchor.catch_up(last_block.header().height, last_block.hash());
×
689

690
                        for block in blocks {
×
691
                            global_state_mut.store_block_not_tip(block).await?;
×
692
                        }
693

694
                        return Ok(());
×
695
                    }
×
696

×
697
                    info!(
×
698
                        "Last block from peer is new canonical tip: {}; height: {}",
×
699
                        last_block.hash(),
×
700
                        last_block.header().height
×
701
                    );
702

703
                    // Ask miner to stop work until state update is completed
704
                    self.main_to_miner_tx.send(MainToMiner::WaitForContinue);
×
705

×
706
                    // Get out of sync mode if needed
×
707
                    if global_state_mut.net.sync_anchor.is_some() {
×
708
                        let stay_in_sync_mode = stay_in_sync_mode(
×
709
                            &last_block.kernel.header,
×
710
                            &main_loop_state.sync_state,
×
711
                            cli_args.sync_mode_threshold,
×
712
                        );
×
713
                        if !stay_in_sync_mode {
×
714
                            info!("Exiting sync mode");
×
715
                            global_state_mut.net.sync_anchor = None;
×
716
                            self.main_to_miner_tx.send(MainToMiner::StopSyncing);
×
717
                        }
×
718
                    }
×
719

720
                    let mut update_jobs: Vec<UpdateMutatorSetDataJob> = vec![];
×
721
                    for new_block in blocks {
×
722
                        debug!(
×
723
                            "Storing block {} in database. Height: {}, Mined: {}",
×
724
                            new_block.hash(),
×
725
                            new_block.kernel.header.height,
×
726
                            new_block.kernel.header.timestamp.standard_format()
×
727
                        );
728

729
                        // Potential race condition here.
730
                        // What if last block is new and canonical, but first
731
                        // block is already known then we'll store the same block
732
                        // twice. That should be OK though, as the appropriate
733
                        // database entries are simply overwritten with the new
734
                        // block info. See the
735
                        // [GlobalState::test::setting_same_tip_twice_is_allowed]
736
                        // test for a test of this phenomenon.
737

738
                        let update_jobs_ = global_state_mut.set_new_tip(new_block).await?;
×
739
                        update_jobs.extend(update_jobs_);
×
740
                    }
741

742
                    update_jobs
×
743
                };
×
744

×
745
                // Inform all peers about new block
×
746
                self.main_to_peer_broadcast_tx
×
747
                    .send(MainToPeerTask::Block(Box::new(last_block.clone())))
×
748
                    .expect("Peer handler broadcast was closed. This should never happen");
×
749

×
750
                // Spawn task to handle mempool tx-updating after new blocks.
×
751
                // TODO: Do clever trick to collapse all jobs relating to the same transaction,
×
752
                //       identified by transaction-ID, into *one* update job.
×
753
                self.spawn_mempool_txs_update_job(main_loop_state, update_jobs);
×
754

×
755
                // Inform miner about new block.
×
756
                self.main_to_miner_tx.send(MainToMiner::NewBlock);
×
757
            }
758
            PeerTaskToMain::AddPeerMaxBlockHeight {
759
                peer_address,
×
760
                claimed_height,
×
761
                claimed_cumulative_pow,
×
762
                claimed_block_mmra,
×
763
            } => {
×
764
                log_slow_scope!(fn_name!() + "::PeerTaskToMain::AddPeerMaxBlockHeight");
×
765

×
766
                let claimed_state =
×
767
                    PeerSynchronizationState::new(claimed_height, claimed_cumulative_pow);
×
768
                main_loop_state
×
769
                    .sync_state
×
770
                    .peer_sync_states
×
771
                    .insert(peer_address, claimed_state);
×
772

773
                // Check if synchronization mode should be activated.
774
                // Synchronization mode is entered if accumulated PoW exceeds
775
                // our tip and if the height difference is positive and beyond
776
                // a threshold value.
777
                let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
×
778
                if global_state_mut.sync_mode_criterion(claimed_height, claimed_cumulative_pow)
×
779
                    && global_state_mut
×
780
                        .net
×
781
                        .sync_anchor
×
782
                        .as_ref()
×
783
                        .is_none_or(|sa| sa.cumulative_proof_of_work < claimed_cumulative_pow)
×
784
                {
785
                    info!(
×
786
                        "Entering synchronization mode due to peer {} indicating tip height {}; cumulative pow: {:?}",
×
787
                        peer_address, claimed_height, claimed_cumulative_pow
788
                    );
789
                    global_state_mut.net.sync_anchor =
×
790
                        Some(SyncAnchor::new(claimed_cumulative_pow, claimed_block_mmra));
×
791
                    self.main_to_miner_tx.send(MainToMiner::StartSyncing);
×
792
                }
×
793
            }
794
            PeerTaskToMain::RemovePeerMaxBlockHeight(socket_addr) => {
×
795
                log_slow_scope!(fn_name!() + "::PeerTaskToMain::RemovePeerMaxBlockHeight");
×
796

×
797
                debug!(
×
798
                    "Removing max block height from sync data structure for peer {}",
×
799
                    socket_addr
800
                );
801
                main_loop_state
×
802
                    .sync_state
×
803
                    .peer_sync_states
×
804
                    .remove(&socket_addr);
×
805

806
                // Get out of sync mode if needed.
807
                let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
×
808

809
                if global_state_mut.net.sync_anchor.is_some() {
×
810
                    let stay_in_sync_mode = stay_in_sync_mode(
×
811
                        global_state_mut.chain.light_state().header(),
×
812
                        &main_loop_state.sync_state,
×
813
                        cli_args.sync_mode_threshold,
×
814
                    );
×
815
                    if !stay_in_sync_mode {
×
816
                        info!("Exiting sync mode");
×
817
                        global_state_mut.net.sync_anchor = None;
×
818
                    }
×
819
                }
×
820
            }
NEW
821
            PeerTaskToMain::PeerDiscoveryAnswer(potential_peers, distance) => {
×
822
                log_slow_scope!(fn_name!() + "::PeerTaskToMain::PeerDiscoveryAnswer");
×
NEW
823
                for (address, id) in potential_peers {
×
NEW
824
                    let candidate = PeerCandidate::new(address, id, distance);
×
NEW
825
                    main_loop_state.potential_peers.add(candidate);
×
UNCOV
826
                }
×
827
            }
828
            PeerTaskToMain::Transaction(pt2m_transaction) => {
×
829
                log_slow_scope!(fn_name!() + "::PeerTaskToMain::Transaction");
×
830

×
831
                debug!(
×
832
                    "`peer_loop` received following transaction from peer. {} inputs, {} outputs. Synced to mutator set hash: {}",
×
833
                    pt2m_transaction.transaction.kernel.inputs.len(),
×
834
                    pt2m_transaction.transaction.kernel.outputs.len(),
×
835
                    pt2m_transaction.transaction.kernel.mutator_set_hash
×
836
                );
837

838
                let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
×
839
                if pt2m_transaction.confirmable_for_block
×
840
                    != global_state_mut.chain.light_state().hash()
×
841
                {
842
                    warn!("main loop got unmined transaction with bad mutator set data, discarding transaction");
×
843
                    return Ok(());
×
844
                }
×
845

×
846
                // Insert into mempool
×
847
                global_state_mut
×
848
                    .mempool_insert(
×
849
                        pt2m_transaction.transaction.to_owned(),
×
850
                        TransactionOrigin::Foreign,
×
851
                    )
×
852
                    .await;
×
853

854
                // send notification to peers
855
                let transaction_notification: TransactionNotification =
×
856
                    (&pt2m_transaction.transaction).try_into()?;
×
857
                self.main_to_peer_broadcast_tx
×
858
                    .send(MainToPeerTask::TransactionNotification(
×
859
                        transaction_notification,
×
860
                    ))?;
×
861
            }
862
            PeerTaskToMain::BlockProposal(block) => {
×
863
                log_slow_scope!(fn_name!() + "::PeerTaskToMain::BlockProposal");
×
864

×
865
                debug!("main loop received block proposal from peer loop");
×
866

867
                // Due to race-conditions, we need to verify that this
868
                // block proposal is still the immediate child of tip. If it is,
869
                // and it has a higher guesser fee than what we're currently
870
                // working on, then we switch to this, and notify the miner to
871
                // mine on this new block. We don't need to verify the block's
872
                // validity, since that was done in peer loop.
873
                // To ensure atomicity, a write-lock must be held over global
874
                // state while we check if this proposal is favorable.
875
                {
876
                    info!("Received new favorable block proposal for mining operation.");
×
877
                    let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;
×
878
                    let verdict = global_state_mut.favor_incoming_block_proposal(
×
879
                        block.header().height,
×
880
                        block.total_guesser_reward(),
×
881
                    );
×
882
                    if let Err(reject_reason) = verdict {
×
883
                        warn!("main loop got unfavorable block proposal. Reason: {reject_reason}");
×
884
                        return Ok(());
×
885
                    }
×
886

×
887
                    global_state_mut.mining_state.block_proposal =
×
888
                        BlockProposal::foreign_proposal(*block.clone());
×
889
                }
×
890

×
891
                // Notify all peers of the block proposal we just accepted
×
892
                self.main_to_peer_broadcast_tx
×
893
                    .send(MainToPeerTask::BlockProposalNotification((&*block).into()))?;
×
894

895
                self.main_to_miner_tx.send(MainToMiner::NewBlockProposal);
×
896
            }
897
            PeerTaskToMain::DisconnectFromLongestLivedPeer => {
898
                let global_state = self.global_state_lock.lock_guard().await;
1✔
899

900
                // get all peers
901
                let all_peers = global_state.net.peer_map.iter();
1✔
902

1✔
903
                // filter out CLI peers
1✔
904
                let disconnect_candidates =
1✔
905
                    all_peers.filter(|p| !global_state.cli_peers().contains(p.0));
5✔
906

1✔
907
                // find the one with the oldest connection
1✔
908
                let longest_lived_peer = disconnect_candidates.min_by(
1✔
909
                    |(_socket_address_left, peer_info_left),
1✔
910
                     (_socket_address_right, peer_info_right)| {
4✔
911
                        peer_info_left
4✔
912
                            .connection_established()
4✔
913
                            .cmp(&peer_info_right.connection_established())
4✔
914
                    },
4✔
915
                );
1✔
916

917
                // tell to disconnect
918
                if let Some((peer_socket, _peer_info)) = longest_lived_peer {
1✔
919
                    self.main_to_peer_broadcast_tx
1✔
920
                        .send(MainToPeerTask::Disconnect(peer_socket.to_owned()))?;
1✔
921
                }
×
922
            }
923
        }
924

925
        Ok(())
1✔
926
    }
1✔
927

928
    /// If necessary, disconnect from peers.
929
    ///
930
    /// While a reasonable effort is made to never have more connections than
931
    /// [`max_num_peers`](crate::config_models::cli_args::Args::max_num_peers),
932
    /// this is not guaranteed. For example, bootstrap nodes temporarily allow a
933
    /// surplus of incoming connections to provide their service more reliably.
934
    ///
935
    /// Never disconnects peers listed as CLI arguments.
936
    ///
937
    /// Locking:
938
    ///   * acquires `global_state_lock` for read
939
    async fn prune_peers(&self) -> Result<()> {
2✔
940
        // fetch all relevant info from global state; don't hold the lock
2✔
941
        let cli_args = self.global_state_lock.cli();
2✔
942
        let connected_peers = self
2✔
943
            .global_state_lock
2✔
944
            .lock_guard()
2✔
945
            .await
2✔
946
            .net
947
            .peer_map
948
            .values()
2✔
949
            .cloned()
2✔
950
            .collect_vec();
2✔
951

2✔
952
        let num_peers = connected_peers.len();
2✔
953
        let max_num_peers = cli_args.max_num_peers;
2✔
954
        if num_peers <= max_num_peers {
2✔
955
            debug!("No need to prune any peer connections.");
1✔
956
            return Ok(());
1✔
957
        }
1✔
958
        warn!("Connected to {num_peers} peers, which exceeds the maximum ({max_num_peers}).");
1✔
959

960
        // If all connections are outbound, it's OK to exceed the max.
961
        if connected_peers.iter().all(|p| p.connection_is_outbound()) {
8✔
962
            warn!("Not disconnecting from any peer because all connections are outbound.");
×
963
            return Ok(());
×
964
        }
1✔
965

1✔
966
        let num_peers_to_disconnect = num_peers - max_num_peers;
1✔
967
        let peers_to_disconnect = connected_peers
1✔
968
            .into_iter()
1✔
969
            .filter(|peer| !cli_args.peers.contains(&peer.connected_address()))
14✔
970
            .choose_multiple(&mut rand::rng(), num_peers_to_disconnect);
1✔
971
        match peers_to_disconnect.len() {
1✔
972
            0 => warn!("Not disconnecting from any peer because of manual override."),
×
973
            i => info!("Disconnecting from {i} peers."),
1✔
974
        }
975
        for peer in peers_to_disconnect {
5✔
976
            self.main_to_peer_broadcast_tx
4✔
977
                .send(MainToPeerTask::Disconnect(peer.connected_address()))?;
4✔
978
        }
979

980
        Ok(())
1✔
981
    }
2✔
982

983
    /// If necessary, reconnect to the peers listed as CLI arguments.
984
    ///
985
    /// Locking:
986
    ///   * acquires `global_state_lock` for read
987
    async fn reconnect(&self, main_loop_state: &mut MutableMainLoopState) -> Result<()> {
×
988
        let connected_peers = self
×
989
            .global_state_lock
×
990
            .lock_guard()
×
991
            .await
×
992
            .net
993
            .peer_map
994
            .keys()
×
995
            .copied()
×
996
            .collect_vec();
×
997
        let peers_with_lost_connection = self
×
998
            .global_state_lock
×
999
            .cli()
×
1000
            .peers
×
1001
            .iter()
×
1002
            .filter(|peer| !connected_peers.contains(peer));
×
1003

×
1004
        // If no connection was lost, there's nothing to do.
×
1005
        if peers_with_lost_connection.clone().count() == 0 {
×
1006
            return Ok(());
×
1007
        }
×
1008

1009
        // Else, try to reconnect.
1010
        let own_handshake_data = self
×
1011
            .global_state_lock
×
1012
            .lock_guard()
×
1013
            .await
×
1014
            .get_own_handshakedata();
×
1015
        for &peer_with_lost_connection in peers_with_lost_connection {
×
1016
            // Disallow reconnection if peer is in bad standing
1017
            let peer_standing = self
×
1018
                .global_state_lock
×
1019
                .lock_guard()
×
1020
                .await
×
1021
                .net
1022
                .get_peer_standing_from_database(peer_with_lost_connection.ip())
×
1023
                .await;
×
1024
            if peer_standing.is_some_and(|standing| standing.is_bad()) {
×
1025
                info!("Not reconnecting to peer in bad standing: {peer_with_lost_connection}");
×
1026
                continue;
×
1027
            }
×
1028

×
1029
            info!("Attempting to reconnect to peer: {peer_with_lost_connection}");
×
1030
            let global_state_lock = self.global_state_lock.clone();
×
1031
            let main_to_peer_broadcast_rx = self.main_to_peer_broadcast_tx.subscribe();
×
1032
            let peer_task_to_main_tx = self.peer_task_to_main_tx.to_owned();
×
1033
            let own_handshake_data = own_handshake_data.clone();
×
1034
            let outgoing_connection_task = tokio::task::Builder::new()
×
1035
                .name("call_peer_wrapper_1")
×
1036
                .spawn(async move {
×
1037
                    call_peer(
×
1038
                        peer_with_lost_connection,
×
1039
                        global_state_lock,
×
1040
                        main_to_peer_broadcast_rx,
×
1041
                        peer_task_to_main_tx,
×
1042
                        own_handshake_data,
×
1043
                        1, // All CLI-specified peers have distance 1
×
1044
                    )
×
1045
                    .await;
×
1046
                })?;
×
1047
            main_loop_state.task_handles.push(outgoing_connection_task);
×
1048
            main_loop_state.task_handles.retain(|th| !th.is_finished());
×
1049
        }
×
1050

1051
        Ok(())
×
1052
    }
×
1053

1054
    /// Perform peer discovery.
1055
    ///
1056
    /// Peer discovery involves finding potential peers from connected peers
1057
    /// and attempts to establish a connection with one of them.
1058
    ///
1059
    /// Locking:
1060
    ///   * acquires `global_state_lock` for read
1061
    async fn discover_peers(&self, main_loop_state: &mut MutableMainLoopState) -> Result<()> {
2✔
1062
        // fetch all relevant info from global state, then release the lock
2✔
1063
        let cli_args = self.global_state_lock.cli();
2✔
1064
        let global_state = self.global_state_lock.lock_guard().await;
2✔
1065
        let num_peers = global_state.net.peer_map.len();
2✔
1066
        let own_handshake_data = global_state.get_own_handshakedata();
2✔
1067
        drop(global_state);
2✔
1068

2✔
1069
        let max_num_peers = cli_args.max_num_peers;
2✔
1070

2✔
1071
        // Don't make an outgoing connection if
2✔
1072
        // - the peer limit is reached (or exceeded), or
2✔
1073
        // - the peer limit is _almost_ reached; reserve the last slot for an
2✔
1074
        //   incoming connection.
2✔
1075
        if num_peers >= max_num_peers || num_peers > 2 && num_peers - 1 == max_num_peers {
2✔
1076
            info!("Connected to {num_peers} peers. The configured max is {max_num_peers} peers.");
1✔
1077
            info!("Skipping peer discovery.");
1✔
1078
            return Ok(());
1✔
1079
        }
1✔
1080

1✔
1081
        info!("Performing peer discovery");
1✔
1082

1083
        // Ask all peers for their peer lists. This will eventually – once the
1084
        // responses have come in – update the list of potential peers.
1085
        self.main_to_peer_broadcast_tx
1✔
1086
            .send(MainToPeerTask::MakePeerDiscoveryRequest)?;
1✔
1087

1088
        // Get a peer candidate from the list of potential peers. Generally,
1089
        // the peer lists requested in the previous step will not have come in
1090
        // yet. Therefore, the new candidate is selected based on somewhat
1091
        // (but not overly) old information.
1092
        let Some(peer_candidate) = main_loop_state
1✔
1093
            .potential_peers
1✔
1094
            .peer_candidate(&self.global_state_lock.lock_guard().await.net)
1✔
1095
        else {
1096
            info!("Found no peer candidate to connect to. Not making new connection.");
1✔
1097
            return Ok(());
1✔
1098
        };
1099

1100
        // Try to connect to the selected candidate.
NEW
1101
        info!(
×
NEW
1102
            "Connecting to peer {address} with distance {distance}",
×
1103
            address = peer_candidate.address,
1104
            distance = peer_candidate.distance
1105
        );
1106
        let global_state_lock = self.global_state_lock.clone();
×
1107
        let main_to_peer_broadcast_rx = self.main_to_peer_broadcast_tx.subscribe();
×
1108
        let peer_task_to_main_tx = self.peer_task_to_main_tx.to_owned();
×
1109
        let outgoing_connection_task = tokio::task::Builder::new()
×
NEW
1110
            .name("call_peer_for_new_connection_from_peer_discovery")
×
1111
            .spawn(async move {
×
1112
                call_peer(
×
NEW
1113
                    peer_candidate.address,
×
1114
                    global_state_lock,
×
1115
                    main_to_peer_broadcast_rx,
×
1116
                    peer_task_to_main_tx,
×
1117
                    own_handshake_data,
×
NEW
1118
                    peer_candidate.distance,
×
1119
                )
×
1120
                .await;
×
1121
            })?;
×
1122
        main_loop_state.task_handles.push(outgoing_connection_task);
×
1123
        main_loop_state.task_handles.retain(|th| !th.is_finished());
×
1124

×
1125
        // Immediately request the new peer's peer list. This allows
×
1126
        // incorporating the new peer's peers into the list of potential peers,
×
1127
        // to be used in the next round of peer discovery.
×
NEW
1128
        let peer_discovery_request =
×
NEW
1129
            MainToPeerTask::MakeSpecificPeerDiscoveryRequest(peer_candidate.address);
×
1130
        self.main_to_peer_broadcast_tx
×
NEW
1131
            .send(peer_discovery_request)?;
×
1132

1133
        Ok(())
×
1134
    }
2✔
1135

1136
    /// Return a list of block heights for a block-batch request.
1137
    ///
1138
    /// Returns an ordered list of the heights of *most preferred block*
1139
    /// to build on, where current tip is always the most preferred block.
1140
    ///
1141
    /// Uses a factor to ensure that the peer will always have something to
1142
    /// build on top of by providing potential starting points all the way
1143
    /// back to genesis.
1144
    fn batch_request_uca_candidate_heights(own_tip_height: BlockHeight) -> Vec<BlockHeight> {
258✔
1145
        const FACTOR: f64 = 1.07f64;
1146

1147
        let mut look_behind = 0;
258✔
1148
        let mut ret = vec![];
258✔
1149

1150
        // A factor of 1.07 can look back ~1m blocks in 200 digests.
1151
        while ret.len() < MAX_NUM_DIGESTS_IN_BATCH_REQUEST - 1 {
51,374✔
1152
            let height = match own_tip_height.checked_sub(look_behind) {
51,118✔
1153
                None => break,
1✔
1154
                Some(height) if height.is_genesis() => break,
51,117✔
1155
                Some(height) => height,
51,116✔
1156
            };
51,116✔
1157

51,116✔
1158
            ret.push(height);
51,116✔
1159
            look_behind = ((look_behind as f64 + 1.0) * FACTOR).floor() as u64;
51,116✔
1160
        }
1161

1162
        ret.push(BlockHeight::genesis());
258✔
1163

258✔
1164
        ret
258✔
1165
    }
258✔
1166

1167
    /// Logic for requesting the batch-download of blocks from peers
1168
    ///
1169
    /// Locking:
1170
    ///   * acquires `global_state_lock` for read
1171
    async fn block_sync(&mut self, main_loop_state: &mut MutableMainLoopState) -> Result<()> {
3✔
1172
        let global_state = self.global_state_lock.lock_guard().await;
3✔
1173

1174
        // Check if we are in sync mode
1175
        let Some(anchor) = &global_state.net.sync_anchor else {
3✔
1176
            return Ok(());
1✔
1177
        };
1178

1179
        info!("Running sync");
2✔
1180

1181
        let (own_tip_hash, own_tip_height, own_cumulative_pow) = (
2✔
1182
            global_state.chain.light_state().hash(),
2✔
1183
            global_state.chain.light_state().kernel.header.height,
2✔
1184
            global_state
2✔
1185
                .chain
2✔
1186
                .light_state()
2✔
1187
                .kernel
2✔
1188
                .header
2✔
1189
                .cumulative_proof_of_work,
2✔
1190
        );
2✔
1191

2✔
1192
        // Check if sync mode has timed out entirely, in which case it should
2✔
1193
        // be abandoned.
2✔
1194
        let anchor = anchor.to_owned();
2✔
1195
        if self.now().duration_since(anchor.updated)?.as_secs()
2✔
1196
            > GLOBAL_SYNCHRONIZATION_TIMEOUT_IN_SECONDS
1197
        {
1198
            warn!("Sync mode has timed out. Abandoning sync mode.");
1✔
1199

1200
            // Abandon attempt, and punish all peers claiming to serve these
1201
            // blocks.
1202
            drop(global_state);
1✔
1203
            self.global_state_lock
1✔
1204
                .lock_guard_mut()
1✔
1205
                .await
1✔
1206
                .net
1✔
1207
                .sync_anchor = None;
1✔
1208

1209
            let peers_to_punish = main_loop_state
1✔
1210
                .sync_state
1✔
1211
                .get_potential_peers_for_sync_request(own_cumulative_pow);
1✔
1212

1213
            for peer in peers_to_punish {
2✔
1214
                self.main_to_peer_broadcast_tx
1✔
1215
                    .send(MainToPeerTask::PeerSynchronizationTimeout(peer))?;
1✔
1216
            }
1217

1218
            return Ok(());
1✔
1219
        }
1✔
1220

1✔
1221
        let (peer_to_sanction, try_new_request): (Option<SocketAddr>, bool) = main_loop_state
1✔
1222
            .sync_state
1✔
1223
            .get_status_of_last_request(own_tip_height, self.now());
1✔
1224

1225
        // Sanction peer if they failed to respond
1226
        if let Some(peer) = peer_to_sanction {
1✔
1227
            self.main_to_peer_broadcast_tx
×
1228
                .send(MainToPeerTask::PeerSynchronizationTimeout(peer))?;
×
1229
        }
1✔
1230

1231
        if !try_new_request {
1✔
1232
            info!("Waiting for last sync to complete.");
×
1233
            return Ok(());
×
1234
        }
1✔
1235

1✔
1236
        // Create the next request from the reported
1✔
1237
        info!("Creating new sync request");
1✔
1238

1239
        // Pick a random peer that has reported to have relevant blocks
1240
        let candidate_peers = main_loop_state
1✔
1241
            .sync_state
1✔
1242
            .get_potential_peers_for_sync_request(own_cumulative_pow);
1✔
1243
        let mut rng = rand::rng();
1✔
1244
        let chosen_peer = candidate_peers.choose(&mut rng);
1✔
1245
        assert!(
1✔
1246
            chosen_peer.is_some(),
1✔
1247
            "A synchronization candidate must be available for a request. \
×
1248
            Otherwise, the data structure is in an invalid state and syncing should not be active"
×
1249
        );
1250

1251
        let ordered_preferred_block_digests = match anchor.champion {
1✔
1252
            Some((_height, digest)) => vec![digest],
×
1253
            None => {
1254
                // Find candidate-UCA digests based on a sparse distribution of
1255
                // block heights skewed towards own tip height
1256
                let request_heights = Self::batch_request_uca_candidate_heights(own_tip_height);
1✔
1257
                let mut ordered_preferred_block_digests = vec![];
1✔
1258
                for height in request_heights {
2✔
1259
                    let digest = global_state
1✔
1260
                        .chain
1✔
1261
                        .archival_state()
1✔
1262
                        .archival_block_mmr
1✔
1263
                        .ammr()
1✔
1264
                        .get_leaf_async(height.into())
1✔
1265
                        .await;
1✔
1266
                    ordered_preferred_block_digests.push(digest);
1✔
1267
                }
1268
                ordered_preferred_block_digests
1✔
1269
            }
1270
        };
1271

1272
        // Send message to the relevant peer loop to request the blocks
1273
        let chosen_peer = chosen_peer.unwrap();
1✔
1274
        info!(
1✔
1275
            "Sending block batch request to {}\nrequesting blocks descending from {}\n height {}",
×
1276
            chosen_peer, own_tip_hash, own_tip_height
1277
        );
1278
        self.main_to_peer_broadcast_tx
1✔
1279
            .send(MainToPeerTask::RequestBlockBatch(
1✔
1280
                MainToPeerTaskBatchBlockRequest {
1✔
1281
                    peer_addr_target: *chosen_peer,
1✔
1282
                    known_blocks: ordered_preferred_block_digests,
1✔
1283
                    anchor_mmr: anchor.block_mmr.clone(),
1✔
1284
                },
1✔
1285
            ))
1✔
1286
            .expect("Sending message to peers must succeed");
1✔
1287

1✔
1288
        // Record that this request was sent to the peer
1✔
1289
        let requested_block_height = own_tip_height.next();
1✔
1290
        main_loop_state
1✔
1291
            .sync_state
1✔
1292
            .record_request(requested_block_height, *chosen_peer, self.now());
1✔
1293

1✔
1294
        Ok(())
1✔
1295
    }
3✔
1296

1297
    /// Scheduled task for upgrading the proofs of transactions in the mempool.
1298
    ///
1299
    /// Will either perform a merge of two transactions supported with single
1300
    /// proofs, or will upgrade a transaction proof of the type
1301
    /// `ProofCollection` to `SingleProof`.
1302
    ///
1303
    /// All proving takes place in a spawned task such that it doesn't block
1304
    /// the main loop. The MutableMainLoopState gets the JoinHandle of the
1305
    /// spawned upgrade task such that its status can be expected.
1306
    async fn proof_upgrader(&mut self, main_loop_state: &mut MutableMainLoopState) -> Result<()> {
3✔
1307
        fn attempt_upgrade(
3✔
1308
            global_state: &GlobalState,
3✔
1309
            now: SystemTime,
3✔
1310
            tx_upgrade_interval: Option<Duration>,
3✔
1311
            main_loop_state: &MutableMainLoopState,
3✔
1312
        ) -> Result<bool> {
3✔
1313
            let duration_since_last_upgrade =
3✔
1314
                now.duration_since(global_state.net.last_tx_proof_upgrade_attempt)?;
3✔
1315
            let previous_upgrade_task_is_still_running = main_loop_state
3✔
1316
                .proof_upgrader_task
3✔
1317
                .as_ref()
3✔
1318
                .is_some_and(|x| !x.is_finished());
3✔
1319
            Ok(global_state.net.sync_anchor.is_none()
3✔
1320
                && global_state.proving_capability() == TxProvingCapability::SingleProof
3✔
1321
                && !previous_upgrade_task_is_still_running
3✔
1322
                && tx_upgrade_interval
3✔
1323
                    .is_some_and(|upgrade_interval| duration_since_last_upgrade > upgrade_interval))
3✔
1324
        }
3✔
1325

1326
        trace!("Running proof upgrader scheduled task");
3✔
1327

1328
        // Check if it's time to run the proof-upgrader, and if we're capable
1329
        // of upgrading a transaction proof.
1330
        let tx_upgrade_interval = self.global_state_lock.cli().tx_upgrade_interval();
3✔
1331
        let (upgrade_candidate, tx_origin) = {
1✔
1332
            let global_state = self.global_state_lock.lock_guard().await;
3✔
1333
            let now = self.now();
3✔
1334
            if !attempt_upgrade(&global_state, now, tx_upgrade_interval, main_loop_state)? {
3✔
1335
                trace!("Not attempting upgrade.");
2✔
1336
                return Ok(());
2✔
1337
            }
1✔
1338

1✔
1339
            debug!("Attempting to run transaction-proof-upgrade");
1✔
1340

1341
            // Find a candidate for proof upgrade
1342
            let Some((upgrade_candidate, tx_origin)) = get_upgrade_task_from_mempool(&global_state)
1✔
1343
            else {
1344
                debug!("Found no transaction-proof to upgrade");
×
1345
                return Ok(());
×
1346
            };
1347

1348
            (upgrade_candidate, tx_origin)
1✔
1349
        };
1✔
1350

1✔
1351
        info!(
1✔
1352
            "Attempting to upgrade transaction proofs of: {}",
×
1353
            upgrade_candidate.affected_txids().iter().join("; ")
×
1354
        );
1355

1356
        // Perform the upgrade, if we're not using the prover for anything else,
1357
        // like mining, or proving our own transaction. Running the prover takes
1358
        // a long time (minutes), so we spawn a task for this such that we do
1359
        // not block the main loop.
1360
        let vm_job_queue = self.global_state_lock.vm_job_queue().clone();
1✔
1361
        let perform_ms_update_if_needed =
1✔
1362
            self.global_state_lock.cli().proving_capability() == TxProvingCapability::SingleProof;
1✔
1363

1✔
1364
        let global_state_lock_clone = self.global_state_lock.clone();
1✔
1365
        let main_to_peer_broadcast_tx_clone = self.main_to_peer_broadcast_tx.clone();
1✔
1366
        let proof_upgrader_task =
1✔
1367
            tokio::task::Builder::new()
1✔
1368
                .name("proof_upgrader")
1✔
1369
                .spawn(async move {
1✔
1370
                    upgrade_candidate
1✔
1371
                        .handle_upgrade(
1✔
1372
                            &vm_job_queue,
1✔
1373
                            tx_origin,
1✔
1374
                            perform_ms_update_if_needed,
1✔
1375
                            global_state_lock_clone,
1✔
1376
                            main_to_peer_broadcast_tx_clone,
1✔
1377
                        )
1✔
1378
                        .await
1✔
1379
                })?;
1✔
1380

1381
        main_loop_state.proof_upgrader_task = Some(proof_upgrader_task);
1✔
1382

1✔
1383
        Ok(())
1✔
1384
    }
3✔
1385

1386
    /// Post-processing when new block has arrived. Spawn a task to update
1387
    /// transactions in the mempool. Only when the spawned task has completed,
1388
    /// should the miner continue.
1389
    fn spawn_mempool_txs_update_job(
1✔
1390
        &self,
1✔
1391
        main_loop_state: &mut MutableMainLoopState,
1✔
1392
        update_jobs: Vec<UpdateMutatorSetDataJob>,
1✔
1393
    ) {
1✔
1394
        // job completion of the spawned task is communicated through the
1✔
1395
        // `update_mempool_txs_handle` channel.
1✔
1396
        let vm_job_queue = self.global_state_lock.vm_job_queue().clone();
1✔
1397
        if let Some(handle) = main_loop_state.update_mempool_txs_handle.as_ref() {
1✔
1398
            handle.abort();
×
1399
        }
1✔
1400
        let (update_sender, update_receiver) =
1✔
1401
            mpsc::channel::<Vec<Transaction>>(TX_UPDATER_CHANNEL_CAPACITY);
1✔
1402

1✔
1403
        // note: if this task is cancelled, the job will continue
1✔
1404
        // because TritonVmJobOptions::cancel_job_rx is None.
1✔
1405
        // see how compose_task handles cancellation in mine_loop.
1✔
1406
        let job_options = self
1✔
1407
            .global_state_lock
1✔
1408
            .cli()
1✔
1409
            .proof_job_options(TritonVmJobPriority::Highest);
1✔
1410
        main_loop_state.update_mempool_txs_handle = Some(
1✔
1411
            tokio::task::Builder::new()
1✔
1412
                .name("mempool tx ms-updater")
1✔
1413
                .spawn(async move {
1✔
1414
                    Self::update_mempool_jobs(
×
1415
                        update_jobs,
×
1416
                        &vm_job_queue,
×
1417
                        update_sender,
×
1418
                        job_options,
×
1419
                    )
×
1420
                    .await
×
1421
                })
1✔
1422
                .unwrap(),
1✔
1423
        );
1✔
1424
        main_loop_state.update_mempool_receiver = update_receiver;
1✔
1425
    }
1✔
1426

1427
    pub(crate) async fn run(
×
1428
        &mut self,
×
1429
        mut peer_task_to_main_rx: mpsc::Receiver<PeerTaskToMain>,
×
1430
        mut miner_to_main_rx: mpsc::Receiver<MinerToMain>,
×
1431
        mut rpc_server_to_main_rx: mpsc::Receiver<RPCServerToMain>,
×
1432
        task_handles: Vec<JoinHandle<()>>,
×
1433
    ) -> Result<i32> {
×
1434
        // Handle incoming connections, messages from peer tasks, and messages from the mining task
×
NEW
1435
        let mut main_loop_state =
×
NEW
1436
            MutableMainLoopState::new(self.global_state_lock.cli(), task_handles);
×
1437

×
1438
        // Set peer discovery to run every N seconds. All timers must be reset
×
1439
        // every time they have run.
×
1440
        let peer_discovery_timer_interval = Duration::from_secs(PEER_DISCOVERY_INTERVAL_IN_SECONDS);
×
1441
        let peer_discovery_timer = time::sleep(peer_discovery_timer_interval);
×
1442
        tokio::pin!(peer_discovery_timer);
×
1443

×
1444
        // Set synchronization to run every M seconds.
×
1445
        let block_sync_interval = Duration::from_secs(SYNC_REQUEST_INTERVAL_IN_SECONDS);
×
1446
        let block_sync_timer = time::sleep(block_sync_interval);
×
1447
        tokio::pin!(block_sync_timer);
×
1448

×
1449
        // Set removal of transactions from mempool.
×
1450
        let mempool_cleanup_interval = Duration::from_secs(MEMPOOL_PRUNE_INTERVAL_IN_SECS);
×
1451
        let mempool_cleanup_timer = time::sleep(mempool_cleanup_interval);
×
1452
        tokio::pin!(mempool_cleanup_timer);
×
1453

×
1454
        // Set removal of stale notifications for incoming UTXOs.
×
1455
        let utxo_notification_cleanup_interval =
×
1456
            Duration::from_secs(EXPECTED_UTXOS_PRUNE_INTERVAL_IN_SECS);
×
1457
        let utxo_notification_cleanup_timer = time::sleep(utxo_notification_cleanup_interval);
×
1458
        tokio::pin!(utxo_notification_cleanup_timer);
×
1459

×
1460
        // Set restoration of membership proofs to run every Q seconds.
×
1461
        let mp_resync_interval = Duration::from_secs(MP_RESYNC_INTERVAL_IN_SECS);
×
1462
        let mp_resync_timer = time::sleep(mp_resync_interval);
×
1463
        tokio::pin!(mp_resync_timer);
×
1464

×
1465
        // Set transasction-proof-upgrade-checker to run every R secnods.
×
1466
        let tx_proof_upgrade_interval =
×
1467
            Duration::from_secs(TRANSACTION_UPGRADE_CHECK_INTERVAL_IN_SECONDS);
×
1468
        let tx_proof_upgrade_timer = time::sleep(tx_proof_upgrade_interval);
×
1469
        tokio::pin!(tx_proof_upgrade_timer);
×
1470

×
1471
        // Spawn tasks to monitor for SIGTERM, SIGINT, and SIGQUIT. These
×
1472
        // signals are only used on Unix systems.
×
1473
        let (tx_term, mut rx_term): (mpsc::Sender<()>, mpsc::Receiver<()>) =
×
1474
            tokio::sync::mpsc::channel(2);
×
1475
        let (tx_int, mut rx_int): (mpsc::Sender<()>, mpsc::Receiver<()>) =
×
1476
            tokio::sync::mpsc::channel(2);
×
1477
        let (tx_quit, mut rx_quit): (mpsc::Sender<()>, mpsc::Receiver<()>) =
×
1478
            tokio::sync::mpsc::channel(2);
×
1479
        #[cfg(unix)]
1480
        {
×
1481
            use tokio::signal::unix::signal;
×
1482
            use tokio::signal::unix::SignalKind;
×
1483

1484
            // Monitor for SIGTERM
1485
            let mut sigterm = signal(SignalKind::terminate())?;
×
1486
            tokio::task::Builder::new()
×
1487
                .name("sigterm_handler")
×
1488
                .spawn(async move {
×
1489
                    if sigterm.recv().await.is_some() {
×
1490
                        info!("Received SIGTERM");
×
1491
                        tx_term.send(()).await.unwrap();
×
1492
                    }
×
1493
                })?;
×
1494

1495
            // Monitor for SIGINT
1496
            let mut sigint = signal(SignalKind::interrupt())?;
×
1497
            tokio::task::Builder::new()
×
1498
                .name("sigint_handler")
×
1499
                .spawn(async move {
×
1500
                    if sigint.recv().await.is_some() {
×
1501
                        info!("Received SIGINT");
×
1502
                        tx_int.send(()).await.unwrap();
×
1503
                    }
×
1504
                })?;
×
1505

1506
            // Monitor for SIGQUIT
1507
            let mut sigquit = signal(SignalKind::quit())?;
×
1508
            tokio::task::Builder::new()
×
1509
                .name("sigquit_handler")
×
1510
                .spawn(async move {
×
1511
                    if sigquit.recv().await.is_some() {
×
1512
                        info!("Received SIGQUIT");
×
1513
                        tx_quit.send(()).await.unwrap();
×
1514
                    }
×
1515
                })?;
×
1516
        }
1517

1518
        #[cfg(not(unix))]
1519
        drop((tx_term, tx_int, tx_quit));
1520

1521
        let exit_code: i32 = loop {
×
1522
            select! {
×
1523
                Ok(()) = signal::ctrl_c() => {
×
1524
                    info!("Detected Ctrl+c signal.");
×
1525
                    break SUCCESS_EXIT_CODE;
×
1526
                }
1527

1528
                // Monitor for SIGTERM, SIGINT, and SIGQUIT.
1529
                Some(_) = rx_term.recv() => {
×
1530
                    info!("Detected SIGTERM signal.");
×
1531
                    break SUCCESS_EXIT_CODE;
×
1532
                }
1533
                Some(_) = rx_int.recv() => {
×
1534
                    info!("Detected SIGINT signal.");
×
1535
                    break SUCCESS_EXIT_CODE;
×
1536
                }
1537
                Some(_) = rx_quit.recv() => {
×
1538
                    info!("Detected SIGQUIT signal.");
×
1539
                    break SUCCESS_EXIT_CODE;
×
1540
                }
1541

1542
                // Handle incoming connections from peer
1543
                Ok((stream, peer_address)) = self.incoming_peer_listener.accept() => {
×
1544
                    // Return early if no incoming connections are accepted. Do
1545
                    // not send application-handshake.
1546
                    if self.global_state_lock.cli().disallow_all_incoming_peer_connections() {
×
1547
                        warn!("Got incoming connection despite not accepting any. Ignoring");
×
1548
                        continue;
×
1549
                    }
×
1550

1551
                    let state = self.global_state_lock.lock_guard().await;
×
1552
                    let main_to_peer_broadcast_rx_clone: broadcast::Receiver<MainToPeerTask> = self.main_to_peer_broadcast_tx.subscribe();
×
1553
                    let peer_task_to_main_tx_clone: mpsc::Sender<PeerTaskToMain> = self.peer_task_to_main_tx.clone();
×
1554
                    let own_handshake_data: HandshakeData = state.get_own_handshakedata();
×
1555
                    let global_state_lock = self.global_state_lock.clone(); // bump arc refcount.
×
1556
                    let incoming_peer_task_handle = tokio::task::Builder::new()
×
1557
                        .name("answer_peer_wrapper")
×
1558
                        .spawn(async move {
×
1559
                        match answer_peer(
×
1560
                            stream,
×
1561
                            global_state_lock,
×
1562
                            peer_address,
×
1563
                            main_to_peer_broadcast_rx_clone,
×
1564
                            peer_task_to_main_tx_clone,
×
1565
                            own_handshake_data,
×
1566
                        ).await {
×
1567
                            Ok(()) => (),
×
1568
                            Err(err) => error!("Got error: {:?}", err),
×
1569
                        }
1570
                    })?;
×
1571
                    main_loop_state.task_handles.push(incoming_peer_task_handle);
×
1572
                    main_loop_state.task_handles.retain(|th| !th.is_finished());
×
1573
                }
×
1574

1575
                // Handle messages from peer tasks
1576
                Some(msg) = peer_task_to_main_rx.recv() => {
×
1577
                    debug!("Received message sent to main task.");
×
1578
                    self.handle_peer_task_message(
×
1579
                        msg,
×
1580
                        &mut main_loop_state,
×
1581
                    )
×
1582
                    .await?
×
1583
                }
1584

1585
                // Handle messages from miner task
1586
                Some(main_message) = miner_to_main_rx.recv() => {
×
1587
                    let exit_code = self.handle_miner_task_message(main_message, &mut main_loop_state).await?;
×
1588

1589
                    if let Some(exit_code) = exit_code {
×
1590
                        break exit_code;
×
1591
                    }
×
1592

1593
                }
1594

1595
                // Handle the completion of mempool tx-update jobs after new block.
1596
                Some(ms_updated_transactions) = main_loop_state.update_mempool_receiver.recv() => {
×
1597
                    self.handle_updated_mempool_txs(ms_updated_transactions).await;
×
1598
                }
1599

1600
                // Handle messages from rpc server task
1601
                Some(rpc_server_message) = rpc_server_to_main_rx.recv() => {
×
1602
                    let shutdown_after_execution = self.handle_rpc_server_message(rpc_server_message.clone(), &mut main_loop_state).await?;
×
1603
                    if shutdown_after_execution {
×
1604
                        break SUCCESS_EXIT_CODE
×
1605
                    }
×
1606
                }
1607

1608
                // Handle peer discovery
1609
                _ = &mut peer_discovery_timer => {
×
1610
                    log_slow_scope!(fn_name!() + "::select::peer_discovery_timer");
×
1611

×
1612
                    // Check number of peers we are connected to and connect to more peers
×
1613
                    // if needed.
×
1614
                    debug!("Timer: peer discovery job");
×
1615
                    self.prune_peers().await?;
×
1616
                    self.reconnect(&mut main_loop_state).await?;
×
1617
                    self.discover_peers(&mut main_loop_state).await?;
×
1618

1619
                    // Reset the timer to run this branch again in N seconds
1620
                    peer_discovery_timer.as_mut().reset(tokio::time::Instant::now() + peer_discovery_timer_interval);
×
1621
                }
1622

1623
                // Handle synchronization (i.e. batch-downloading of blocks)
1624
                _ = &mut block_sync_timer => {
×
1625
                    log_slow_scope!(fn_name!() + "::select::block_sync_timer");
×
1626

×
1627
                    trace!("Timer: block-synchronization job");
×
1628
                    self.block_sync(&mut main_loop_state).await?;
×
1629

1630
                    // Reset the timer to run this branch again in M seconds
1631
                    block_sync_timer.as_mut().reset(tokio::time::Instant::now() + block_sync_interval);
×
1632
                }
1633

1634
                // Handle mempool cleanup, i.e. removing stale/too old txs from mempool
1635
                _ = &mut mempool_cleanup_timer => {
×
1636
                    log_slow_scope!(fn_name!() + "::select::mempool_cleanup_timer");
×
1637

×
1638
                    debug!("Timer: mempool-cleaner job");
×
1639
                    self.global_state_lock.lock_guard_mut().await.mempool_prune_stale_transactions().await;
×
1640

1641
                    // Reset the timer to run this branch again in P seconds
1642
                    mempool_cleanup_timer.as_mut().reset(tokio::time::Instant::now() + mempool_cleanup_interval);
×
1643
                }
1644

1645
                // Handle incoming UTXO notification cleanup, i.e. removing stale/too old UTXO notification from pool
1646
                _ = &mut utxo_notification_cleanup_timer => {
×
1647
                    log_slow_scope!(fn_name!() + "::select::utxo_notification_cleanup_timer");
×
1648

×
1649
                    debug!("Timer: UTXO notification pool cleanup job");
×
1650

1651
                    // Danger: possible loss of funds.
1652
                    //
1653
                    // See description of prune_stale_expected_utxos().
1654
                    //
1655
                    // This call is disabled until such time as a thorough
1656
                    // evaluation and perhaps reimplementation determines that
1657
                    // it can be called safely without possible loss of funds.
1658
                    // self.global_state_lock.lock_mut(|s| s.wallet_state.prune_stale_expected_utxos()).await;
1659

1660
                    utxo_notification_cleanup_timer.as_mut().reset(tokio::time::Instant::now() + utxo_notification_cleanup_interval);
×
1661
                }
1662

1663
                // Handle membership proof resynchronization
1664
                _ = &mut mp_resync_timer => {
×
1665
                    log_slow_scope!(fn_name!() + "::select::mp_resync_timer");
×
1666

×
1667
                    debug!("Timer: Membership proof resync job");
×
1668
                    self.global_state_lock.resync_membership_proofs().await?;
×
1669

1670
                    mp_resync_timer.as_mut().reset(tokio::time::Instant::now() + mp_resync_interval);
×
1671
                }
1672

1673
                // Check if it's time to run the proof upgrader
1674
                _ = &mut tx_proof_upgrade_timer => {
×
1675
                    log_slow_scope!(fn_name!() + "::select::tx_upgrade_proof_timer");
×
1676

×
1677
                    trace!("Timer: tx-proof-upgrader");
×
1678
                    self.proof_upgrader(&mut main_loop_state).await?;
×
1679

1680
                    tx_proof_upgrade_timer.as_mut().reset(tokio::time::Instant::now() + tx_proof_upgrade_interval);
×
1681
                }
1682

1683
            }
1684
        };
1685

1686
        self.graceful_shutdown(main_loop_state.task_handles).await?;
×
1687
        info!("Shutdown completed.");
×
1688

1689
        Ok(exit_code)
×
1690
    }
×
1691

1692
    /// Handle messages from the RPC server. Returns `true` iff the client should shut down
1693
    /// after handling this message.
1694
    async fn handle_rpc_server_message(
×
1695
        &mut self,
×
1696
        msg: RPCServerToMain,
×
1697
        main_loop_state: &mut MutableMainLoopState,
×
1698
    ) -> Result<bool> {
×
1699
        match msg {
×
1700
            RPCServerToMain::BroadcastTx(transaction) => {
×
1701
                debug!(
×
1702
                    "`main` received following transaction from RPC Server. {} inputs, {} outputs. Synced to mutator set hash: {}",
×
1703
                    transaction.kernel.inputs.len(),
×
1704
                    transaction.kernel.outputs.len(),
×
1705
                    transaction.kernel.mutator_set_hash
×
1706
                );
1707

1708
                // insert transaction into mempool
1709
                self.global_state_lock
×
1710
                    .lock_guard_mut()
×
1711
                    .await
×
1712
                    .mempool_insert(*transaction.clone(), TransactionOrigin::Own)
×
1713
                    .await;
×
1714

1715
                // Is this a transaction we can share with peers? If so, share
1716
                // it immediately.
1717
                if let Ok(notification) = transaction.as_ref().try_into() {
×
1718
                    self.main_to_peer_broadcast_tx
×
1719
                        .send(MainToPeerTask::TransactionNotification(notification))?;
×
1720
                } else {
1721
                    // Otherwise, upgrade its proof quality, and share it by
1722
                    // spinning up the proof upgrader.
1723
                    let TransactionProof::Witness(primitive_witness) = transaction.proof else {
×
1724
                        panic!("Expected Primitive witness. Got: {:?}", transaction.proof);
×
1725
                    };
1726

1727
                    let vm_job_queue = self.global_state_lock.vm_job_queue().clone();
×
1728

×
1729
                    let proving_capability = self.global_state_lock.cli().proving_capability();
×
1730
                    let upgrade_job =
×
1731
                        UpgradeJob::from_primitive_witness(proving_capability, primitive_witness);
×
1732

×
1733
                    // note: handle_upgrade() hands off proving to the
×
1734
                    //       triton-vm job queue and waits for job completion.
×
1735
                    // note: handle_upgrade() broadcasts to peers on success.
×
1736

×
1737
                    let global_state_lock_clone = self.global_state_lock.clone();
×
1738
                    let main_to_peer_broadcast_tx_clone = self.main_to_peer_broadcast_tx.clone();
×
1739
                    let _proof_upgrader_task = tokio::task::Builder::new()
×
1740
                        .name("proof_upgrader")
×
1741
                        .spawn(async move {
×
1742
                        upgrade_job
×
1743
                            .handle_upgrade(
×
1744
                                &vm_job_queue,
×
1745
                                TransactionOrigin::Own,
×
1746
                                true,
×
1747
                                global_state_lock_clone,
×
1748
                                main_to_peer_broadcast_tx_clone,
×
1749
                            )
×
1750
                            .await
×
1751
                    })?;
×
1752

1753
                    // main_loop_state.proof_upgrader_task = Some(proof_upgrader_task);
1754
                    // If transaction could not be shared immediately because
1755
                    // it contains secret data, upgrade its proof-type.
1756
                }
1757

1758
                // do not shut down
1759
                Ok(false)
×
1760
            }
1761
            RPCServerToMain::BroadcastMempoolTransactions => {
1762
                info!("Broadcasting transaction notifications for all shareable transactions in mempool");
×
1763
                let state = self.global_state_lock.lock_guard().await;
×
1764
                let txs = state.mempool.get_sorted_iter().collect_vec();
×
1765
                for (txid, _) in txs {
×
1766
                    // Since a read-lock is held over global state, the
1767
                    // transaction must exist in the mempool.
1768
                    let tx = state
×
1769
                        .mempool
×
1770
                        .get(txid)
×
1771
                        .expect("Transaction from iter must exist in mempool");
×
1772
                    let notification = TransactionNotification::try_from(tx);
×
1773
                    match notification {
×
1774
                        Ok(notification) => {
×
1775
                            self.main_to_peer_broadcast_tx
×
1776
                                .send(MainToPeerTask::TransactionNotification(notification))?;
×
1777
                        }
1778
                        Err(error) => {
×
1779
                            warn!("{error}");
×
1780
                        }
1781
                    };
1782
                }
1783
                Ok(false)
×
1784
            }
1785
            RPCServerToMain::ProofOfWorkSolution(new_block) => {
×
1786
                info!("Handling PoW solution from RPC call");
×
1787

1788
                self.handle_self_guessed_block(main_loop_state, new_block)
×
1789
                    .await?;
×
1790
                Ok(false)
×
1791
            }
1792
            RPCServerToMain::PauseMiner => {
1793
                info!("Received RPC request to stop miner");
×
1794

1795
                self.main_to_miner_tx.send(MainToMiner::StopMining);
×
1796
                Ok(false)
×
1797
            }
1798
            RPCServerToMain::RestartMiner => {
1799
                info!("Received RPC request to start miner");
×
1800
                self.main_to_miner_tx.send(MainToMiner::StartMining);
×
1801
                Ok(false)
×
1802
            }
1803
            RPCServerToMain::Shutdown => {
1804
                info!("Received RPC shutdown request.");
×
1805

1806
                // shut down
1807
                Ok(true)
×
1808
            }
1809
        }
1810
    }
×
1811

1812
    async fn graceful_shutdown(&mut self, task_handles: Vec<JoinHandle<()>>) -> Result<()> {
×
1813
        info!("Shutdown initiated.");
×
1814

1815
        // Stop mining
1816
        self.main_to_miner_tx.send(MainToMiner::Shutdown);
×
1817

×
1818
        // Send 'bye' message to all peers.
×
1819
        let _result = self
×
1820
            .main_to_peer_broadcast_tx
×
1821
            .send(MainToPeerTask::DisconnectAll());
×
1822
        debug!("sent bye");
×
1823

1824
        // Flush all databases
1825
        self.global_state_lock.flush_databases().await?;
×
1826

1827
        tokio::time::sleep(Duration::from_millis(50)).await;
×
1828

1829
        // Child processes should have finished by now. If not, abort them violently.
1830
        task_handles.iter().for_each(|jh| jh.abort());
×
1831

×
1832
        // wait for all to finish.
×
1833
        futures::future::join_all(task_handles).await;
×
1834

1835
        Ok(())
×
1836
    }
×
1837
}
1838

1839
#[cfg(test)]
1840
mod test {
1841
    use std::str::FromStr;
1842
    use std::time::UNIX_EPOCH;
1843

1844
    use tracing_test::traced_test;
1845

1846
    use super::*;
1847
    use crate::config_models::cli_args;
1848
    use crate::config_models::network::Network;
1849
    use crate::tests::shared::get_dummy_peer_incoming;
1850
    use crate::tests::shared::get_test_genesis_setup;
1851
    use crate::tests::shared::invalid_empty_block;
1852
    use crate::MINER_CHANNEL_CAPACITY;
1853

1854
    struct TestSetup {
1855
        peer_to_main_rx: mpsc::Receiver<PeerTaskToMain>,
1856
        miner_to_main_rx: mpsc::Receiver<MinerToMain>,
1857
        rpc_server_to_main_rx: mpsc::Receiver<RPCServerToMain>,
1858
        task_join_handles: Vec<JoinHandle<()>>,
1859
        main_loop_handler: MainLoopHandler,
1860
        main_to_peer_rx: broadcast::Receiver<MainToPeerTask>,
1861
    }
1862

1863
    async fn setup(num_init_peers_outgoing: u8, num_peers_incoming: u8) -> TestSetup {
10✔
1864
        const CHANNEL_CAPACITY_MINER_TO_MAIN: usize = 10;
1865

1866
        let network = Network::Main;
10✔
1867
        let (
1868
            main_to_peer_tx,
10✔
1869
            main_to_peer_rx,
10✔
1870
            peer_to_main_tx,
10✔
1871
            peer_to_main_rx,
10✔
1872
            mut state,
10✔
1873
            _own_handshake_data,
10✔
1874
        ) = get_test_genesis_setup(network, num_init_peers_outgoing, cli_args::Args::default())
10✔
1875
            .await
10✔
1876
            .unwrap();
10✔
1877
        assert!(
10✔
1878
            state
10✔
1879
                .lock_guard()
10✔
1880
                .await
10✔
1881
                .net
1882
                .peer_map
1883
                .iter()
10✔
1884
                .all(|(_addr, peer)| peer.connection_is_outbound()),
51✔
1885
            "Test assumption: All initial peers must represent outgoing connections."
×
1886
        );
1887

1888
        for i in 0..num_peers_incoming {
26✔
1889
            let peer_address =
26✔
1890
                std::net::SocketAddr::from_str(&format!("255.254.253.{}:8080", i)).unwrap();
26✔
1891
            state
26✔
1892
                .lock_guard_mut()
26✔
1893
                .await
26✔
1894
                .net
1895
                .peer_map
1896
                .insert(peer_address, get_dummy_peer_incoming(peer_address));
26✔
1897
        }
1898

1899
        let incoming_peer_listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
10✔
1900

10✔
1901
        let (main_to_miner_tx, _main_to_miner_rx) =
10✔
1902
            mpsc::channel::<MainToMiner>(MINER_CHANNEL_CAPACITY);
10✔
1903
        let (_miner_to_main_tx, miner_to_main_rx) =
10✔
1904
            mpsc::channel::<MinerToMain>(CHANNEL_CAPACITY_MINER_TO_MAIN);
10✔
1905
        let (_rpc_server_to_main_tx, rpc_server_to_main_rx) =
10✔
1906
            mpsc::channel::<RPCServerToMain>(CHANNEL_CAPACITY_MINER_TO_MAIN);
10✔
1907

10✔
1908
        let main_loop_handler = MainLoopHandler::new(
10✔
1909
            incoming_peer_listener,
10✔
1910
            state,
10✔
1911
            main_to_peer_tx,
10✔
1912
            peer_to_main_tx,
10✔
1913
            main_to_miner_tx,
10✔
1914
        );
10✔
1915

10✔
1916
        let task_join_handles = vec![];
10✔
1917

10✔
1918
        TestSetup {
10✔
1919
            miner_to_main_rx,
10✔
1920
            peer_to_main_rx,
10✔
1921
            rpc_server_to_main_rx,
10✔
1922
            task_join_handles,
10✔
1923
            main_loop_handler,
10✔
1924
            main_to_peer_rx,
10✔
1925
        }
10✔
1926
    }
10✔
1927

1928
    #[tokio::test]
1929
    async fn handle_self_guessed_block_new_tip() {
1✔
1930
        // A new tip is registered by main_loop. Verify correct state update.
1✔
1931
        let test_setup = setup(1, 0).await;
1✔
1932
        let TestSetup {
1✔
1933
            task_join_handles,
1✔
1934
            mut main_loop_handler,
1✔
1935
            mut main_to_peer_rx,
1✔
1936
            ..
1✔
1937
        } = test_setup;
1✔
1938
        let cli_args = main_loop_handler.global_state_lock.cli();
1✔
1939
        let mut mutable_main_loop_state = MutableMainLoopState::new(cli_args, task_join_handles);
1✔
1940

1✔
1941
        let network = main_loop_handler.global_state_lock.cli().network;
1✔
1942
        let block1 = invalid_empty_block(&Block::genesis(network));
1✔
1943

1✔
1944
        assert!(
1✔
1945
            main_loop_handler
1✔
1946
                .global_state_lock
1✔
1947
                .lock_guard()
1✔
1948
                .await
1✔
1949
                .chain
1✔
1950
                .light_state()
1✔
1951
                .header()
1✔
1952
                .height
1✔
1953
                .is_genesis(),
1✔
1954
            "Tip must be genesis prior to handling of new block"
1✔
1955
        );
1✔
1956

1✔
1957
        let block1 = Box::new(block1);
1✔
1958
        main_loop_handler
1✔
1959
            .handle_self_guessed_block(&mut mutable_main_loop_state, block1.clone())
1✔
1960
            .await
1✔
1961
            .unwrap();
1✔
1962
        let new_block_height: u64 = main_loop_handler
1✔
1963
            .global_state_lock
1✔
1964
            .lock_guard()
1✔
1965
            .await
1✔
1966
            .chain
1✔
1967
            .light_state()
1✔
1968
            .header()
1✔
1969
            .height
1✔
1970
            .into();
1✔
1971
        assert_eq!(
1✔
1972
            1u64, new_block_height,
1✔
1973
            "Tip height must be 1 after handling of new block"
1✔
1974
        );
1✔
1975
        let msg_to_peer_loops = main_to_peer_rx.recv().await.unwrap();
1✔
1976
        if let MainToPeerTask::Block(block_to_peers) = msg_to_peer_loops {
1✔
1977
            assert_eq!(
1✔
1978
                block1, block_to_peers,
1✔
1979
                "Peer loops must have received block 1"
1✔
1980
            );
1✔
1981
        } else {
1✔
1982
            panic!("Must have sent block notification to peer loops")
1✔
1983
        }
1✔
1984
    }
1✔
1985

1986
    mod sync_mode {
1987
        use tasm_lib::twenty_first::util_types::mmr::mmr_accumulator::MmrAccumulator;
1988
        use test_strategy::proptest;
1989

1990
        use super::*;
1991
        use crate::tests::shared::get_dummy_socket_address;
1992

1993
        #[proptest]
256✔
1994
        fn batch_request_heights_prop(#[strategy(0u64..100_000_000_000)] own_height: u64) {
1✔
1995
            batch_request_heights_sanity(own_height);
1996
        }
1997

1998
        #[test]
1999
        fn batch_request_heights_unit() {
1✔
2000
            let own_height = 1_000_000u64;
1✔
2001
            batch_request_heights_sanity(own_height);
1✔
2002
        }
1✔
2003

2004
        fn batch_request_heights_sanity(own_height: u64) {
257✔
2005
            let heights = MainLoopHandler::batch_request_uca_candidate_heights(own_height.into());
257✔
2006

257✔
2007
            let mut heights_rev = heights.clone();
257✔
2008
            heights_rev.reverse();
257✔
2009
            assert!(
257✔
2010
                heights_rev.is_sorted(),
257✔
2011
                "Heights must be sorted from high-to-low"
×
2012
            );
2013

2014
            heights_rev.dedup();
257✔
2015
            assert_eq!(heights_rev.len(), heights.len(), "duplicates");
257✔
2016

2017
            assert_eq!(heights[0], own_height.into(), "starts with own tip height");
257✔
2018
            assert!(
257✔
2019
                heights.last().unwrap().is_genesis(),
257✔
2020
                "ends with genesis block"
×
2021
            );
2022
        }
257✔
2023

2024
        #[tokio::test]
2025
        #[traced_test]
×
2026
        async fn sync_mode_abandoned_on_global_timeout() {
1✔
2027
            let num_outgoing_connections = 0;
1✔
2028
            let num_incoming_connections = 0;
1✔
2029
            let test_setup = setup(num_outgoing_connections, num_incoming_connections).await;
1✔
2030
            let TestSetup {
2031
                task_join_handles,
1✔
2032
                mut main_loop_handler,
1✔
2033
                ..
1✔
2034
            } = test_setup;
1✔
2035

1✔
2036
            let cli_args = main_loop_handler.global_state_lock.cli();
1✔
2037
            let mut mutable_main_loop_state =
1✔
2038
                MutableMainLoopState::new(cli_args, task_join_handles);
1✔
2039

1✔
2040
            main_loop_handler
1✔
2041
                .block_sync(&mut mutable_main_loop_state)
1✔
2042
                .await
1✔
2043
                .expect("Must return OK when no sync mode is set");
1✔
2044

1✔
2045
            // Mock that we are in a valid sync state
1✔
2046
            let claimed_max_height = 1_000u64.into();
1✔
2047
            let claimed_max_pow = ProofOfWork::new([100; 6]);
1✔
2048
            main_loop_handler
1✔
2049
                .global_state_lock
1✔
2050
                .lock_guard_mut()
1✔
2051
                .await
1✔
2052
                .net
1✔
2053
                .sync_anchor = Some(SyncAnchor::new(
1✔
2054
                claimed_max_pow,
1✔
2055
                MmrAccumulator::new_from_leafs(vec![]),
1✔
2056
            ));
1✔
2057
            mutable_main_loop_state.sync_state.peer_sync_states.insert(
1✔
2058
                get_dummy_socket_address(0),
1✔
2059
                PeerSynchronizationState::new(claimed_max_height, claimed_max_pow),
1✔
2060
            );
1✔
2061

2062
            let sync_start_time = main_loop_handler
1✔
2063
                .global_state_lock
1✔
2064
                .lock_guard()
1✔
2065
                .await
1✔
2066
                .net
2067
                .sync_anchor
2068
                .as_ref()
1✔
2069
                .unwrap()
1✔
2070
                .updated;
1✔
2071
            main_loop_handler
1✔
2072
                .block_sync(&mut mutable_main_loop_state)
1✔
2073
                .await
1✔
2074
                .expect("Must return OK when sync mode has not timed out yet");
1✔
2075
            assert!(
1✔
2076
                main_loop_handler
1✔
2077
                    .global_state_lock
1✔
2078
                    .lock_guard()
1✔
2079
                    .await
1✔
2080
                    .net
2081
                    .sync_anchor
2082
                    .is_some(),
1✔
2083
                "Sync mode must still be set before timeout has occurred"
×
2084
            );
2085

2086
            assert_eq!(
1✔
2087
                sync_start_time,
1✔
2088
                main_loop_handler
1✔
2089
                    .global_state_lock
1✔
2090
                    .lock_guard()
1✔
2091
                    .await
1✔
2092
                    .net
2093
                    .sync_anchor
2094
                    .as_ref()
1✔
2095
                    .unwrap()
1✔
2096
                    .updated,
2097
                "timestamp may not be updated without state change"
×
2098
            );
2099

2100
            // Mock that sync-mode has timed out
2101
            main_loop_handler = main_loop_handler.with_mocked_time(
1✔
2102
                SystemTime::now()
1✔
2103
                    + Duration::from_secs(GLOBAL_SYNCHRONIZATION_TIMEOUT_IN_SECONDS + 1),
1✔
2104
            );
1✔
2105

1✔
2106
            main_loop_handler
1✔
2107
                .block_sync(&mut mutable_main_loop_state)
1✔
2108
                .await
1✔
2109
                .expect("Must return OK when sync mode has timed out");
1✔
2110
            assert!(
1✔
2111
                main_loop_handler
1✔
2112
                    .global_state_lock
1✔
2113
                    .lock_guard()
1✔
2114
                    .await
1✔
2115
                    .net
1✔
2116
                    .sync_anchor
1✔
2117
                    .is_none(),
1✔
2118
                "Sync mode must be unset on timeout"
1✔
2119
            );
1✔
2120
        }
1✔
2121
    }
2122

2123
    mod proof_upgrader {
2124
        use super::*;
2125
        use crate::job_queue::triton_vm::TritonVmJobQueue;
2126
        use crate::models::blockchain::transaction::Transaction;
2127
        use crate::models::blockchain::transaction::TransactionProof;
2128
        use crate::models::blockchain::type_scripts::native_currency_amount::NativeCurrencyAmount;
2129
        use crate::models::peer::transfer_transaction::TransactionProofQuality;
2130
        use crate::models::proof_abstractions::timestamp::Timestamp;
2131
        use crate::models::state::wallet::utxo_notification::UtxoNotificationMedium;
2132

2133
        async fn tx_no_outputs(
1✔
2134
            global_state_lock: &GlobalStateLock,
1✔
2135
            tx_proof_type: TxProvingCapability,
1✔
2136
            fee: NativeCurrencyAmount,
1✔
2137
        ) -> Transaction {
1✔
2138
            let change_key = global_state_lock
1✔
2139
                .lock_guard()
1✔
2140
                .await
1✔
2141
                .wallet_state
2142
                .wallet_entropy
2143
                .nth_generation_spending_key_for_tests(0);
1✔
2144
            let in_seven_months = global_state_lock
1✔
2145
                .lock_guard()
1✔
2146
                .await
1✔
2147
                .chain
2148
                .light_state()
1✔
2149
                .header()
1✔
2150
                .timestamp
1✔
2151
                + Timestamp::months(7);
1✔
2152

2153
            let global_state = global_state_lock.lock_guard().await;
1✔
2154
            global_state
1✔
2155
                .create_transaction_with_prover_capability(
1✔
2156
                    vec![].into(),
1✔
2157
                    change_key.into(),
1✔
2158
                    UtxoNotificationMedium::OffChain,
1✔
2159
                    fee,
1✔
2160
                    in_seven_months,
1✔
2161
                    tx_proof_type,
1✔
2162
                    &TritonVmJobQueue::dummy(),
1✔
2163
                )
1✔
2164
                .await
1✔
2165
                .unwrap()
1✔
2166
                .0
1✔
2167
        }
1✔
2168

2169
        #[tokio::test]
2170
        #[traced_test]
×
2171
        async fn upgrade_proof_collection_to_single_proof_foreign_tx() {
1✔
2172
            let num_outgoing_connections = 0;
1✔
2173
            let num_incoming_connections = 0;
1✔
2174
            let test_setup = setup(num_outgoing_connections, num_incoming_connections).await;
1✔
2175
            let TestSetup {
2176
                peer_to_main_rx,
1✔
2177
                miner_to_main_rx,
1✔
2178
                rpc_server_to_main_rx,
1✔
2179
                task_join_handles,
1✔
2180
                mut main_loop_handler,
1✔
2181
                mut main_to_peer_rx,
1✔
2182
            } = test_setup;
1✔
2183

1✔
2184
            // Force instance to create SingleProofs, otherwise CI and other
1✔
2185
            // weak machines fail.
1✔
2186
            let mocked_cli = cli_args::Args {
1✔
2187
                tx_proving_capability: Some(TxProvingCapability::SingleProof),
1✔
2188
                tx_proof_upgrade_interval: 100, // seconds
1✔
2189
                ..Default::default()
1✔
2190
            };
1✔
2191

1✔
2192
            main_loop_handler
1✔
2193
                .global_state_lock
1✔
2194
                .set_cli(mocked_cli)
1✔
2195
                .await;
1✔
2196
            let mut main_loop_handler = main_loop_handler.with_mocked_time(SystemTime::now());
1✔
2197
            let cli_args = main_loop_handler.global_state_lock.cli();
1✔
2198
            let mut mutable_main_loop_state =
1✔
2199
                MutableMainLoopState::new(cli_args, task_join_handles);
1✔
2200

1✔
2201
            assert!(
1✔
2202
                main_loop_handler
1✔
2203
                    .proof_upgrader(&mut mutable_main_loop_state)
1✔
2204
                    .await
1✔
2205
                    .is_ok(),
1✔
2206
                "Scheduled task returns OK when run on empty mempool"
×
2207
            );
2208

2209
            let fee = NativeCurrencyAmount::coins(1);
1✔
2210
            let proof_collection_tx = tx_no_outputs(
1✔
2211
                &main_loop_handler.global_state_lock,
1✔
2212
                TxProvingCapability::ProofCollection,
1✔
2213
                fee,
1✔
2214
            )
1✔
2215
            .await;
1✔
2216

2217
            main_loop_handler
1✔
2218
                .global_state_lock
1✔
2219
                .lock_guard_mut()
1✔
2220
                .await
1✔
2221
                .mempool_insert(proof_collection_tx.clone(), TransactionOrigin::Foreign)
1✔
2222
                .await;
1✔
2223

2224
            assert!(
1✔
2225
                main_loop_handler
1✔
2226
                    .proof_upgrader(&mut mutable_main_loop_state)
1✔
2227
                    .await
1✔
2228
                    .is_ok(),
1✔
2229
                "Scheduled task returns OK when it's not yet time to upgrade"
×
2230
            );
2231

2232
            assert!(
1✔
2233
                matches!(
×
2234
                    main_loop_handler
1✔
2235
                        .global_state_lock
1✔
2236
                        .lock_guard()
1✔
2237
                        .await
1✔
2238
                        .mempool
2239
                        .get(proof_collection_tx.kernel.txid())
1✔
2240
                        .unwrap()
1✔
2241
                        .proof,
2242
                    TransactionProof::ProofCollection(_)
2243
                ),
2244
                "Proof in mempool must still be of type proof collection"
×
2245
            );
2246

2247
            // Mock that enough time has passed to perform the upgrade. Then
2248
            // perform the upgrade.
2249
            let mut main_loop_handler =
1✔
2250
                main_loop_handler.with_mocked_time(SystemTime::now() + Duration::from_secs(300));
1✔
2251
            assert!(
1✔
2252
                main_loop_handler
1✔
2253
                    .proof_upgrader(&mut mutable_main_loop_state)
1✔
2254
                    .await
1✔
2255
                    .is_ok(),
1✔
2256
                "Scheduled task must return OK when it's time to upgrade"
×
2257
            );
2258

2259
            // Wait for upgrade task to finish.
2260
            let handle = mutable_main_loop_state.proof_upgrader_task.unwrap().await;
1✔
2261
            assert!(
1✔
2262
                handle.is_ok(),
1✔
2263
                "Proof-upgrade task must finish successfully."
×
2264
            );
2265

2266
            // At this point there should be one transaction in the mempool,
2267
            // which is (if all is well) the merger of the ProofCollection
2268
            // transaction inserted above and one of the upgrader's fee
2269
            // gobblers. The point is that this transaction is a SingleProof
2270
            // transaction, so test that.
2271

2272
            let (merged_txid, _) = main_loop_handler
1✔
2273
                .global_state_lock
1✔
2274
                .lock_guard()
1✔
2275
                .await
1✔
2276
                .mempool
2277
                .get_sorted_iter()
1✔
2278
                .next_back()
1✔
2279
                .expect("mempool should contain one item here");
1✔
2280

1✔
2281
            assert!(
1✔
2282
                matches!(
×
2283
                    main_loop_handler
1✔
2284
                        .global_state_lock
1✔
2285
                        .lock_guard()
1✔
2286
                        .await
1✔
2287
                        .mempool
2288
                        .get(merged_txid)
1✔
2289
                        .unwrap()
1✔
2290
                        .proof,
2291
                    TransactionProof::SingleProof(_)
2292
                ),
2293
                "Proof in mempool must now be of type single proof"
×
2294
            );
2295

2296
            match main_to_peer_rx.recv().await {
1✔
2297
                Ok(MainToPeerTask::TransactionNotification(tx_noti)) => {
1✔
2298
                    assert_eq!(merged_txid, tx_noti.txid);
1✔
2299
                    assert_eq!(TransactionProofQuality::SingleProof, tx_noti.proof_quality);
1✔
2300
                }
2301
                other => panic!("Must have sent transaction notification to peer loop after successful proof upgrade. Got:\n{other:?}"),
×
2302
            }
2303

2304
            // These values are kept alive as the transmission-counterpart will
2305
            // otherwise fail on `send`.
2306
            drop(peer_to_main_rx);
1✔
2307
            drop(miner_to_main_rx);
1✔
2308
            drop(rpc_server_to_main_rx);
1✔
2309
            drop(main_to_peer_rx);
1✔
2310
        }
1✔
2311
    }
2312

2313
    mod peer_discovery {
2314
        use super::*;
2315
        use crate::models::peer::bootstrap_info::BootstrapInfo;
2316
        use crate::tests::shared::get_dummy_socket_address;
2317

2318
        #[tokio::test]
2319
        #[traced_test]
×
2320
        async fn prune_peers_too_many_connections() {
1✔
2321
            let num_init_peers_outgoing = 10;
1✔
2322
            let num_init_peers_incoming = 4;
1✔
2323
            let test_setup = setup(num_init_peers_outgoing, num_init_peers_incoming).await;
1✔
2324
            let TestSetup {
2325
                mut main_to_peer_rx,
1✔
2326
                mut main_loop_handler,
1✔
2327
                ..
1✔
2328
            } = test_setup;
1✔
2329

1✔
2330
            let mocked_cli = cli_args::Args {
1✔
2331
                max_num_peers: num_init_peers_outgoing as usize,
1✔
2332
                ..Default::default()
1✔
2333
            };
1✔
2334

1✔
2335
            main_loop_handler
1✔
2336
                .global_state_lock
1✔
2337
                .set_cli(mocked_cli)
1✔
2338
                .await;
1✔
2339

2340
            main_loop_handler.prune_peers().await.unwrap();
1✔
2341
            assert_eq!(4, main_to_peer_rx.len());
1✔
2342
            for _ in 0..4 {
5✔
2343
                let peer_msg = main_to_peer_rx.recv().await.unwrap();
4✔
2344
                assert!(matches!(peer_msg, MainToPeerTask::Disconnect(_)))
4✔
2345
            }
1✔
2346
        }
1✔
2347

2348
        #[tokio::test]
2349
        #[traced_test]
×
2350
        async fn prune_peers_not_too_many_connections() {
1✔
2351
            let num_init_peers_outgoing = 10;
1✔
2352
            let num_init_peers_incoming = 1;
1✔
2353
            let test_setup = setup(num_init_peers_outgoing, num_init_peers_incoming).await;
1✔
2354
            let TestSetup {
2355
                main_to_peer_rx,
1✔
2356
                mut main_loop_handler,
1✔
2357
                ..
1✔
2358
            } = test_setup;
1✔
2359

1✔
2360
            let mocked_cli = cli_args::Args {
1✔
2361
                max_num_peers: 200,
1✔
2362
                ..Default::default()
1✔
2363
            };
1✔
2364

1✔
2365
            main_loop_handler
1✔
2366
                .global_state_lock
1✔
2367
                .set_cli(mocked_cli)
1✔
2368
                .await;
1✔
2369

2370
            main_loop_handler.prune_peers().await.unwrap();
1✔
2371
            assert!(main_to_peer_rx.is_empty());
1✔
2372
        }
1✔
2373

2374
        #[tokio::test]
2375
        #[traced_test]
1✔
2376
        async fn skip_peer_discovery_if_peer_limit_is_exceeded() {
1✔
2377
            let num_init_peers_outgoing = 2;
1✔
2378
            let num_init_peers_incoming = 0;
1✔
2379
            let test_setup = setup(num_init_peers_outgoing, num_init_peers_incoming).await;
1✔
2380
            let TestSetup {
2381
                task_join_handles,
1✔
2382
                mut main_loop_handler,
1✔
2383
                ..
1✔
2384
            } = test_setup;
1✔
2385

1✔
2386
            let mocked_cli = cli_args::Args {
1✔
2387
                max_num_peers: 0,
1✔
2388
                ..Default::default()
1✔
2389
            };
1✔
2390
            main_loop_handler
1✔
2391
                .global_state_lock
1✔
2392
                .set_cli(mocked_cli)
1✔
2393
                .await;
1✔
2394
            let cli_args = main_loop_handler.global_state_lock.cli();
1✔
2395
            main_loop_handler
1✔
2396
                .discover_peers(&mut MutableMainLoopState::new(cli_args, task_join_handles))
1✔
2397
                .await
1✔
2398
                .unwrap();
1✔
2399

1✔
2400
            assert!(logs_contain("Skipping peer discovery."));
1✔
2401
        }
1✔
2402

2403
        #[tokio::test]
2404
        #[traced_test]
1✔
2405
        async fn performs_peer_discovery_on_few_connections() {
1✔
2406
            let num_init_peers_outgoing = 2;
1✔
2407
            let num_init_peers_incoming = 0;
1✔
2408
            let TestSetup {
2409
                task_join_handles,
1✔
2410
                mut main_loop_handler,
1✔
2411
                mut main_to_peer_rx,
1✔
2412
                peer_to_main_rx: _keep_channel_open,
1✔
2413
                ..
2414
            } = setup(num_init_peers_outgoing, num_init_peers_incoming).await;
1✔
2415

2416
            // Set CLI to attempt to make more connections
2417
            let mocked_cli = cli_args::Args {
1✔
2418
                max_num_peers: 10,
1✔
2419
                ..Default::default()
1✔
2420
            };
1✔
2421
            main_loop_handler
1✔
2422
                .global_state_lock
1✔
2423
                .set_cli(mocked_cli)
1✔
2424
                .await;
1✔
2425
            let cli_args = main_loop_handler.global_state_lock.cli();
1✔
2426
            main_loop_handler
1✔
2427
                .discover_peers(&mut MutableMainLoopState::new(cli_args, task_join_handles))
1✔
2428
                .await
1✔
2429
                .unwrap();
1✔
2430

1✔
2431
            let peer_discovery_sent_messages_on_peer_channel = main_to_peer_rx.try_recv().is_ok();
1✔
2432
            assert!(peer_discovery_sent_messages_on_peer_channel);
1✔
2433
            assert!(logs_contain("Performing peer discovery"));
1✔
2434
        }
1✔
2435

2436
        #[tokio::test]
NEW
2437
        #[traced_test]
×
2438
        async fn poorly_connected_node_tries_to_connect_to_bootstrap_node() {
1✔
2439
            let mut global_state_lock = setup(1, 1).await.main_loop_handler.global_state_lock;
1✔
2440
            let cli_args = cli_args::Args {
1✔
2441
                max_num_peers: 100,
1✔
2442
                ..Default::default()
1✔
2443
            };
1✔
2444
            global_state_lock.set_cli(cli_args).await;
1✔
2445
            let (peer_candidate, peer_state) =
1✔
2446
                set_up_peer_candidate_test(global_state_lock.clone()).await;
1✔
2447

2448
            let net_state = &global_state_lock.lock_guard().await.net;
1✔
2449
            let chosen_candidate = peer_state.peer_candidate(net_state);
1✔
2450
            assert_eq!(Some(peer_candidate), chosen_candidate);
1✔
2451
        }
1✔
2452

2453
        #[tokio::test]
NEW
2454
        #[traced_test]
×
2455
        async fn well_connected_node_does_not_try_to_connect_to_bootstrap_nodes() {
1✔
2456
            let mut global_state_lock = setup(20, 20).await.main_loop_handler.global_state_lock;
1✔
2457
            let cli_args = cli_args::Args {
1✔
2458
                max_num_peers: 42,
1✔
2459
                ..Default::default()
1✔
2460
            };
1✔
2461
            global_state_lock.set_cli(cli_args).await;
1✔
2462
            let (_, peer_state) = set_up_peer_candidate_test(global_state_lock.clone()).await;
1✔
2463

2464
            let net_state = &global_state_lock.lock_guard().await.net;
1✔
2465
            let chosen_candidate = peer_state.peer_candidate(net_state);
1✔
2466
            assert!(chosen_candidate.is_none());
1✔
2467
        }
1✔
2468

2469
        /// Create a [PotentialPeersState] and [add](PotentialPeersState::add)
2470
        /// one [peer candidate](PeerCandidate) that is a
2471
        /// [bootstrap node](BootstrapStatus::Bootstrap). The added peer
2472
        /// candidate as well as the resulting [PotentialPeersState] are
2473
        /// returned.
2474
        ///
2475
        ///   * acquires `global_state_lock` for write
2476
        async fn set_up_peer_candidate_test(
2✔
2477
            mut global_state_lock: GlobalStateLock,
2✔
2478
        ) -> (PeerCandidate, PotentialPeersState) {
2✔
2479
            // try to avoid collisions: peer ids are implicit in the setup
2✔
2480
            let peer_id = 255;
2✔
2481
            let peer_socket_address = get_dummy_socket_address(peer_id as u8);
2✔
2482
            let peer_bootstrap_info = BootstrapInfo::new(BootstrapStatus::Bootstrap);
2✔
2483
            global_state_lock
2✔
2484
                .lock_guard_mut()
2✔
2485
                .await
2✔
2486
                .net
2487
                .bootstrap_status
2488
                .insert(peer_socket_address, peer_bootstrap_info);
2✔
2489

2✔
2490
            let peer_candidate = PeerCandidate::new(peer_socket_address, peer_id, 1);
2✔
2491
            let mut peer_state = PotentialPeersState::new(global_state_lock.cli());
2✔
2492
            peer_state.add(peer_candidate);
2✔
2493
            (peer_candidate, peer_state)
2✔
2494
        }
2✔
2495
    }
2496

2497
    #[test]
2498
    fn older_systemtime_ranks_first() {
1✔
2499
        let start = UNIX_EPOCH;
1✔
2500
        let other = UNIX_EPOCH + Duration::from_secs(1000);
1✔
2501
        let mut instants = [start, other];
1✔
2502

1✔
2503
        assert_eq!(
1✔
2504
            start,
1✔
2505
            instants.iter().copied().min_by(|l, r| l.cmp(r)).unwrap()
1✔
2506
        );
1✔
2507

2508
        instants.reverse();
1✔
2509

1✔
2510
        assert_eq!(
1✔
2511
            start,
1✔
2512
            instants.iter().copied().min_by(|l, r| l.cmp(r)).unwrap()
1✔
2513
        );
1✔
2514
    }
1✔
2515

2516
    mod bootstrap_mode {
2517
        use rand::Rng;
2518

2519
        use super::*;
2520
        use crate::models::peer::PeerMessage;
2521
        use crate::models::peer::TransferConnectionStatus;
2522
        use crate::tests::shared::get_dummy_peer_connection_data_genesis;
2523
        use crate::tests::shared::to_bytes;
2524

2525
        #[tokio::test]
2526
        #[traced_test]
×
2527
        async fn disconnect_from_oldest_peer_upon_connection_request() {
1✔
2528
            // Set up a node in bootstrap mode and connected to a given
1✔
2529
            // number of peers, which is one less than the maximum. Initiate a
1✔
2530
            // connection request. Verify that the oldest of the existing
1✔
2531
            // connections is dropped.
1✔
2532

1✔
2533
            let network = Network::Main;
1✔
2534
            let num_init_peers_outgoing = 5;
1✔
2535
            let num_init_peers_incoming = 0;
1✔
2536
            let test_setup = setup(num_init_peers_outgoing, num_init_peers_incoming).await;
1✔
2537
            let TestSetup {
2538
                mut peer_to_main_rx,
1✔
2539
                task_join_handles,
1✔
2540
                mut main_loop_handler,
1✔
2541
                mut main_to_peer_rx,
1✔
2542
                ..
1✔
2543
            } = test_setup;
1✔
2544

1✔
2545
            let mocked_cli = cli_args::Args {
1✔
2546
                max_num_peers: usize::from(num_init_peers_outgoing) + 1,
1✔
2547
                bootstrap: true,
1✔
2548
                network,
1✔
2549
                ..Default::default()
1✔
2550
            };
1✔
2551
            main_loop_handler
1✔
2552
                .global_state_lock
1✔
2553
                .set_cli(mocked_cli)
1✔
2554
                .await;
1✔
2555

2556
            let cli_args = main_loop_handler.global_state_lock.cli();
1✔
2557
            let mut mutable_main_loop_state =
1✔
2558
                MutableMainLoopState::new(cli_args, task_join_handles);
1✔
2559

1✔
2560
            // check sanity: at startup, we are connected to the initial number of peers
1✔
2561
            assert_eq!(
1✔
2562
                usize::from(num_init_peers_outgoing),
1✔
2563
                main_loop_handler
1✔
2564
                    .global_state_lock
1✔
2565
                    .lock_guard()
1✔
2566
                    .await
1✔
2567
                    .net
2568
                    .peer_map
2569
                    .len()
1✔
2570
            );
2571

2572
            // randomize "connection established" timestamps
2573
            let random_offset = || {
5✔
2574
                let unix_epoch_to_now_in_millis = SystemTime::now()
5✔
2575
                    .duration_since(UNIX_EPOCH)
5✔
2576
                    .unwrap()
5✔
2577
                    .as_millis() as u64;
5✔
2578
                Duration::from_millis(rand::rng().random_range(0..unix_epoch_to_now_in_millis))
5✔
2579
            };
5✔
2580
            main_loop_handler
1✔
2581
                .global_state_lock
1✔
2582
                .lock_guard_mut()
1✔
2583
                .await
1✔
2584
                .net
2585
                .peer_map
2586
                .iter_mut()
1✔
2587
                .for_each(|(_, peer_info)| {
5✔
2588
                    peer_info.set_connection_established(UNIX_EPOCH + random_offset());
5✔
2589
                });
5✔
2590

2591
            // compute which peer will be dropped, for later reference
2592
            let expected_drop_peer_socket_address = main_loop_handler
1✔
2593
                .global_state_lock
1✔
2594
                .lock_guard()
1✔
2595
                .await
1✔
2596
                .net
2597
                .peer_map
2598
                .iter()
1✔
2599
                .min_by(|(_, left), (_, right)| {
4✔
2600
                    left.connection_established()
4✔
2601
                        .cmp(&right.connection_established())
4✔
2602
                })
4✔
2603
                .map(|(socket_address, _)| socket_address)
1✔
2604
                .copied()
1✔
2605
                .unwrap();
1✔
2606

1✔
2607
            // simulate incoming connection
1✔
2608
            let (peer_handshake_data, peer_socket_address) =
1✔
2609
                get_dummy_peer_connection_data_genesis(network, 1);
1✔
2610
            let own_handshake_data = main_loop_handler
1✔
2611
                .global_state_lock
1✔
2612
                .lock_guard()
1✔
2613
                .await
1✔
2614
                .get_own_handshakedata();
1✔
2615
            assert_eq!(peer_handshake_data.network, own_handshake_data.network);
1✔
2616
            assert_eq!(peer_handshake_data.version, own_handshake_data.version);
1✔
2617
            let mock_stream = tokio_test::io::Builder::new()
1✔
2618
                .read(
1✔
2619
                    &to_bytes(&PeerMessage::Handshake(Box::new((
1✔
2620
                        crate::MAGIC_STRING_REQUEST.to_vec(),
1✔
2621
                        peer_handshake_data.clone(),
1✔
2622
                    ))))
1✔
2623
                    .unwrap(),
1✔
2624
                )
1✔
2625
                .write(
1✔
2626
                    &to_bytes(&PeerMessage::Handshake(Box::new((
1✔
2627
                        crate::MAGIC_STRING_RESPONSE.to_vec(),
1✔
2628
                        own_handshake_data.clone(),
1✔
2629
                    ))))
1✔
2630
                    .unwrap(),
1✔
2631
                )
1✔
2632
                .write(
1✔
2633
                    &to_bytes(&PeerMessage::ConnectionStatus(
1✔
2634
                        TransferConnectionStatus::Accepted,
1✔
2635
                    ))
1✔
2636
                    .unwrap(),
1✔
2637
                )
1✔
2638
                .build();
1✔
2639
            let peer_to_main_tx_clone = main_loop_handler.peer_task_to_main_tx.clone();
1✔
2640
            let global_state_lock_clone = main_loop_handler.global_state_lock.clone();
1✔
2641
            let main_to_peer_rx_clone = main_loop_handler.main_to_peer_broadcast_tx.subscribe();
1✔
2642
            let incoming_peer_task_handle = tokio::task::Builder::new()
1✔
2643
                .name("answer_peer_wrapper")
1✔
2644
                .spawn(async move {
1✔
2645
                    answer_peer(
1✔
2646
                        mock_stream,
1✔
2647
                        global_state_lock_clone,
1✔
2648
                        peer_socket_address,
1✔
2649
                        main_to_peer_rx_clone,
1✔
2650
                        peer_to_main_tx_clone,
1✔
2651
                        own_handshake_data,
1✔
2652
                    )
1✔
2653
                    .await
1✔
2654
                    .unwrap();
1✔
2655
                })
1✔
2656
                .unwrap();
1✔
2657

2658
            // `answer_peer_wrapper` should send a
2659
            // `DisconnectFromLongestLivedPeer` message to main
2660
            let peer_to_main_message = peer_to_main_rx.recv().await.unwrap();
1✔
2661
            assert!(matches!(
1✔
2662
                peer_to_main_message,
1✔
2663
                PeerTaskToMain::DisconnectFromLongestLivedPeer,
2664
            ));
2665

2666
            // process this message
2667
            main_loop_handler
1✔
2668
                .handle_peer_task_message(peer_to_main_message, &mut mutable_main_loop_state)
1✔
2669
                .await
1✔
2670
                .unwrap();
1✔
2671

2672
            // main loop should send a `Disconnect` message
2673
            let main_to_peers_message = main_to_peer_rx.recv().await.unwrap();
1✔
2674
            let MainToPeerTask::Disconnect(observed_drop_peer_socket_address) =
1✔
2675
                main_to_peers_message
1✔
2676
            else {
UNCOV
2677
                panic!("Expected disconnect, got {main_to_peers_message:?}");
×
2678
            };
2679

2680
            // matched observed droppee against expectation
2681
            assert_eq!(
1✔
2682
                expected_drop_peer_socket_address,
1✔
2683
                observed_drop_peer_socket_address,
1✔
2684
            );
1✔
2685
            println!("Dropped connection with {expected_drop_peer_socket_address}.");
1✔
2686

1✔
2687
            // don't forget to terminate the peer task, which is still running
1✔
2688
            incoming_peer_task_handle.abort();
1✔
2689
        }
1✔
2690
    }
2691
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc