• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

stacks-network / stacks-core / 23943169302

03 Apr 2026 10:28AM UTC coverage: 77.573% (-8.1%) from 85.712%
23943169302

Pull #7076

github

7f2377
web-flow
Merge bb87ecec2 into c529ad924
Pull Request #7076: feat: sortition side-table copy and validation

3743 of 4318 new or added lines in 19 files covered. (86.68%)

19304 existing lines in 182 files now uncovered.

172097 of 221852 relevant lines covered (77.57%)

7722182.76 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/stacks-node/src/nakamoto_node/peer.rs
1
// Copyright (C) 2013-2020 Blockstack PBC, a public benefit corporation
2
// Copyright (C) 2020-2023 Stacks Open Internet Foundation
3
//
4
// This program is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8
//
9
// This program is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13
//
14
// You should have received a copy of the GNU General Public License
15
// along with this program.  If not, see <http://www.gnu.org/licenses/>.
16
use std::net::SocketAddr;
17
use std::sync::mpsc::TrySendError;
18
use std::thread;
19
use std::time::Duration;
20

21
use stacks::burnchains::db::BurnchainHeaderReader;
22
use stacks::burnchains::PoxConstants;
23
use stacks::chainstate::burn::db::sortdb::SortitionDB;
24
use stacks::chainstate::stacks::db::StacksChainState;
25
use stacks::chainstate::stacks::miner::signal_mining_blocked;
26
use stacks::core::mempool::MemPoolDB;
27
use stacks::cost_estimates::metrics::{CostMetric, UnitMetric};
28
use stacks::cost_estimates::{CostEstimator, FeeEstimator, UnitEstimator};
29
use stacks::net::dns::{DNSClient, DNSResolver};
30
use stacks::net::p2p::PeerNetwork;
31
use stacks::net::RPCHandlerArgs;
32
use stacks_common::util::hash::Sha256Sum;
33

34
use crate::burnchains::make_bitcoin_indexer;
35
use crate::nakamoto_node::relayer::RelayerDirective;
36
use crate::neon_node::open_chainstate_with_faults;
37
use crate::run_loop::nakamoto::{Globals, RunLoop};
38
use crate::{Config, EventDispatcher};
39

40
/// Thread that runs the network state machine, handling both p2p and http requests.
41
pub struct PeerThread {
42
    /// Node config
43
    config: Config,
44
    /// instance of the peer network. Made optional in order to trick the borrow checker.
45
    net: PeerNetwork,
46
    /// handle to global inter-thread comms
47
    globals: Globals,
48
    /// how long to wait for network messages on each poll, in millis
49
    poll_timeout: u64,
50
    /// handle to the sortition DB
51
    sortdb: SortitionDB,
52
    /// handle to the chainstate DB
53
    chainstate: StacksChainState,
54
    /// handle to the mempool DB
55
    mempool: MemPoolDB,
56
    /// Buffered network result relayer command.
57
    /// P2P network results are consolidated into a single directive.
58
    results_with_data: Option<RelayerDirective>,
59
    /// total number of p2p state-machine passes so far. Used to signal when to download the next
60
    /// reward cycle of blocks
61
    num_p2p_state_machine_passes: u64,
62
    /// total number of inventory state-machine passes so far. Used to signal when to download the
63
    /// next reward cycle of blocks.
64
    num_inv_sync_passes: u64,
65
    /// total number of download state-machine passes so far. Used to signal when to download the
66
    /// next reward cycle of blocks.
67
    num_download_passes: u64,
68
    /// last burnchain block seen in the PeerNetwork's chain view since the last run
69
    last_burn_block_height: u64,
70
}
71

72
impl PeerThread {
73
    /// Main loop of the p2p thread.
74
    /// Runs in a separate thread.
75
    /// Continuously receives, until told otherwise.
UNCOV
76
    pub fn main(mut self, event_dispatcher: EventDispatcher) {
×
UNCOV
77
        debug!("p2p thread ID is {:?}", thread::current().id());
×
UNCOV
78
        let should_keep_running = self.globals.should_keep_running.clone();
×
UNCOV
79
        let (mut dns_resolver, mut dns_client) = DNSResolver::new(10);
×
80

81
        // spawn a daemon thread that runs the DNS resolver.
82
        // It will die when the rest of the system dies.
83
        {
UNCOV
84
            let _jh = thread::Builder::new()
×
UNCOV
85
                .name("dns-resolver".to_string())
×
UNCOV
86
                .spawn(move || {
×
UNCOV
87
                    debug!("DNS resolver thread ID is {:?}", thread::current().id());
×
UNCOV
88
                    dns_resolver.thread_main();
×
UNCOV
89
                })
×
UNCOV
90
                .unwrap();
×
91
        }
92

93
        // NOTE: these must be instantiated in the thread context, since it can't be safely sent
94
        // between threads
UNCOV
95
        let fee_estimator_opt = self.config.make_fee_estimator();
×
UNCOV
96
        let cost_estimator = self
×
UNCOV
97
            .config
×
UNCOV
98
            .make_cost_estimator()
×
UNCOV
99
            .unwrap_or_else(|| Box::new(UnitEstimator));
×
UNCOV
100
        let cost_metric = self
×
UNCOV
101
            .config
×
UNCOV
102
            .make_cost_metric()
×
UNCOV
103
            .unwrap_or_else(|| Box::new(UnitMetric));
×
104

UNCOV
105
        let indexer = make_bitcoin_indexer(&self.config, Some(should_keep_running));
×
106

107
        // receive until we can't reach the receiver thread
108
        loop {
UNCOV
109
            if !self.globals.keep_running() {
×
UNCOV
110
                break;
×
UNCOV
111
            }
×
UNCOV
112
            if !self.run_one_pass(
×
UNCOV
113
                &indexer,
×
UNCOV
114
                Some(&mut dns_client),
×
UNCOV
115
                &event_dispatcher,
×
UNCOV
116
                &cost_estimator,
×
UNCOV
117
                &cost_metric,
×
UNCOV
118
                fee_estimator_opt.as_ref(),
×
UNCOV
119
            ) {
×
UNCOV
120
                break;
×
UNCOV
121
            }
×
122
        }
123

124
        // kill miner
UNCOV
125
        signal_mining_blocked(self.globals.get_miner_status());
×
126

127
        // set termination flag so other threads die
UNCOV
128
        self.globals.signal_stop();
×
129

130
        // thread exited, so signal to the relayer thread to die.
131
        while let Err(TrySendError::Full(_)) =
UNCOV
132
            self.globals.relay_send.try_send(RelayerDirective::Exit)
×
133
        {
UNCOV
134
            warn!("Failed to direct relayer thread to exit, sleeping and trying again");
×
UNCOV
135
            thread::sleep(Duration::from_secs(5));
×
136
        }
UNCOV
137
        info!("P2P thread exit!");
×
UNCOV
138
    }
×
139

140
    /// Instantiate the p2p thread.
141
    /// Binds the addresses in the config (which may panic if the port is blocked).
142
    /// This is so the node will crash "early" before any new threads start if there's going to be
143
    /// a bind error anyway.
UNCOV
144
    pub fn new(runloop: &RunLoop, net: PeerNetwork) -> PeerThread {
×
UNCOV
145
        Self::new_all(
×
UNCOV
146
            runloop.get_globals(),
×
UNCOV
147
            runloop.config(),
×
UNCOV
148
            runloop.get_burnchain().pox_constants,
×
UNCOV
149
            net,
×
150
        )
UNCOV
151
    }
×
152

UNCOV
153
    fn new_all(
×
UNCOV
154
        globals: Globals,
×
UNCOV
155
        config: &Config,
×
UNCOV
156
        pox_constants: PoxConstants,
×
UNCOV
157
        mut net: PeerNetwork,
×
UNCOV
158
    ) -> Self {
×
UNCOV
159
        let config = config.clone();
×
UNCOV
160
        let mempool = config
×
UNCOV
161
            .connect_mempool_db()
×
UNCOV
162
            .expect("FATAL: database failure opening mempool");
×
UNCOV
163
        let burn_db_path = config.get_burn_db_file_path();
×
164

UNCOV
165
        let sortdb = SortitionDB::open(
×
UNCOV
166
            &burn_db_path,
×
167
            false,
UNCOV
168
            pox_constants,
×
UNCOV
169
            Some(config.node.get_marf_opts()),
×
170
        )
UNCOV
171
        .expect("FATAL: could not open sortition DB");
×
172

UNCOV
173
        let chainstate =
×
UNCOV
174
            open_chainstate_with_faults(&config).expect("FATAL: could not open chainstate DB");
×
175

UNCOV
176
        let p2p_sock: SocketAddr = config
×
UNCOV
177
            .node
×
UNCOV
178
            .p2p_bind
×
UNCOV
179
            .parse()
×
UNCOV
180
            .unwrap_or_else(|_| panic!("Failed to parse socket: {}", &config.node.p2p_bind));
×
UNCOV
181
        let rpc_sock = config
×
UNCOV
182
            .node
×
UNCOV
183
            .rpc_bind
×
UNCOV
184
            .parse()
×
UNCOV
185
            .unwrap_or_else(|_| panic!("Failed to parse socket: {}", &config.node.rpc_bind));
×
186

UNCOV
187
        let did_bind = net
×
UNCOV
188
            .try_bind(&p2p_sock, &rpc_sock)
×
UNCOV
189
            .expect("BUG: PeerNetwork could not bind");
×
190

UNCOV
191
        if !did_bind {
×
UNCOV
192
            info!("`PeerNetwork::bind()` skipped, already bound");
×
UNCOV
193
        }
×
194

UNCOV
195
        let poll_timeout = config.get_poll_time();
×
UNCOV
196
        PeerThread {
×
UNCOV
197
            config,
×
UNCOV
198
            net,
×
UNCOV
199
            globals,
×
UNCOV
200
            poll_timeout,
×
UNCOV
201
            sortdb,
×
UNCOV
202
            chainstate,
×
UNCOV
203
            mempool,
×
UNCOV
204
            results_with_data: None,
×
UNCOV
205
            num_p2p_state_machine_passes: 0,
×
UNCOV
206
            num_inv_sync_passes: 0,
×
UNCOV
207
            num_download_passes: 0,
×
UNCOV
208
            last_burn_block_height: 0,
×
UNCOV
209
        }
×
UNCOV
210
    }
×
211

212
    /// Check if the StackerDB config needs to be updated (by looking
213
    ///  at the signal in `self.globals`), and if so, refresh the
214
    ///  StackerDB config
UNCOV
215
    fn refresh_stackerdb(&mut self) {
×
UNCOV
216
        if !self.globals.coord_comms.need_stackerdb_update() {
×
UNCOV
217
            return;
×
UNCOV
218
        }
×
219

UNCOV
220
        if let Err(e) = self
×
UNCOV
221
            .net
×
UNCOV
222
            .refresh_stacker_db_configs(&self.sortdb, &mut self.chainstate)
×
223
        {
224
            warn!("Failed to update StackerDB configs: {e}");
×
UNCOV
225
        }
×
226

UNCOV
227
        self.globals.coord_comms.set_stackerdb_update(false);
×
UNCOV
228
    }
×
229

230
    /// Run one pass of the p2p/http state machine
231
    /// Return true if we should continue running passes; false if not
232
    #[allow(clippy::borrowed_box)]
UNCOV
233
    pub(crate) fn run_one_pass<B: BurnchainHeaderReader>(
×
UNCOV
234
        &mut self,
×
UNCOV
235
        indexer: &B,
×
UNCOV
236
        dns_client_opt: Option<&mut DNSClient>,
×
UNCOV
237
        event_dispatcher: &EventDispatcher,
×
UNCOV
238
        cost_estimator: &Box<dyn CostEstimator>,
×
UNCOV
239
        cost_metric: &Box<dyn CostMetric>,
×
UNCOV
240
        fee_estimator: Option<&Box<dyn FeeEstimator>>,
×
UNCOV
241
    ) -> bool {
×
242
        // initial block download?
UNCOV
243
        let ibd = self.globals.sync_comms.get_ibd();
×
UNCOV
244
        let download_backpressure = self
×
UNCOV
245
            .results_with_data
×
UNCOV
246
            .as_ref()
×
UNCOV
247
            .map(|res| {
×
UNCOV
248
                if let RelayerDirective::HandleNetResult(netres) = &res {
×
UNCOV
249
                    netres.has_block_data_to_store()
×
250
                } else {
251
                    false
×
252
                }
UNCOV
253
            })
×
UNCOV
254
            .unwrap_or(false);
×
255

UNCOV
256
        let poll_ms = if !download_backpressure && self.net.has_more_downloads() {
×
257
            // keep getting those blocks -- drive the downloader state-machine
UNCOV
258
            debug!(
×
259
                "P2P: backpressure: {download_backpressure}, more downloads: {}",
260
                self.net.has_more_downloads()
×
261
            );
UNCOV
262
            1
×
263
        } else {
UNCOV
264
            self.poll_timeout
×
265
        };
266

UNCOV
267
        self.refresh_stackerdb();
×
268

269
        // do one pass
UNCOV
270
        let p2p_res = {
×
271
            // NOTE: handler_args must be created such that it outlives the inner net.run() call and
272
            // doesn't ref anything within p2p_thread.
UNCOV
273
            let handler_args = RPCHandlerArgs {
×
UNCOV
274
                exit_at_block_height: self.config.burnchain.process_exit_at_block_height,
×
UNCOV
275
                genesis_chainstate_hash: Sha256Sum::from_hex(stx_genesis::GENESIS_CHAINSTATE_HASH)
×
UNCOV
276
                    .unwrap(),
×
UNCOV
277
                event_observer: Some(event_dispatcher),
×
UNCOV
278
                cost_estimator: Some(cost_estimator.as_ref()),
×
UNCOV
279
                cost_metric: Some(cost_metric.as_ref()),
×
UNCOV
280
                fee_estimator: fee_estimator.map(|boxed_estimator| boxed_estimator.as_ref()),
×
UNCOV
281
                coord_comms: Some(&self.globals.coord_comms),
×
282
            };
UNCOV
283
            self.net.run(
×
UNCOV
284
                indexer,
×
UNCOV
285
                &self.sortdb,
×
UNCOV
286
                &mut self.chainstate,
×
UNCOV
287
                &mut self.mempool,
×
UNCOV
288
                dns_client_opt,
×
UNCOV
289
                download_backpressure,
×
UNCOV
290
                ibd,
×
UNCOV
291
                poll_ms,
×
UNCOV
292
                &handler_args,
×
UNCOV
293
                self.config.node.txindex,
×
294
            )
295
        };
UNCOV
296
        match p2p_res {
×
UNCOV
297
            Ok(network_result) => {
×
UNCOV
298
                if self.num_p2p_state_machine_passes < network_result.num_state_machine_passes {
×
UNCOV
299
                    // p2p state-machine did a full pass. Notify anyone listening.
×
UNCOV
300
                    self.globals.sync_comms.notify_p2p_state_pass();
×
UNCOV
301
                    self.num_p2p_state_machine_passes = network_result.num_state_machine_passes;
×
UNCOV
302
                }
×
303

UNCOV
304
                if self.num_inv_sync_passes < network_result.num_inv_sync_passes {
×
UNCOV
305
                    // inv-sync state-machine did a full pass. Notify anyone listening.
×
UNCOV
306
                    self.globals.sync_comms.notify_inv_sync_pass();
×
UNCOV
307
                    self.num_inv_sync_passes = network_result.num_inv_sync_passes;
×
UNCOV
308
                }
×
309

UNCOV
310
                if self.num_download_passes < network_result.num_download_passes {
×
UNCOV
311
                    // download state-machine did a full pass.  Notify anyone listening.
×
UNCOV
312
                    self.globals.sync_comms.notify_download_pass();
×
UNCOV
313
                    self.num_download_passes = network_result.num_download_passes;
×
UNCOV
314
                }
×
315

UNCOV
316
                self.last_burn_block_height = network_result.burn_height;
×
UNCOV
317
                if let Some(res) = self.results_with_data.take() {
×
UNCOV
318
                    if let RelayerDirective::HandleNetResult(netres) = res {
×
UNCOV
319
                        let new_res = netres.update(network_result);
×
UNCOV
320
                        self.results_with_data = Some(RelayerDirective::HandleNetResult(new_res));
×
UNCOV
321
                    }
×
UNCOV
322
                } else {
×
UNCOV
323
                    self.results_with_data =
×
UNCOV
324
                        Some(RelayerDirective::HandleNetResult(network_result));
×
UNCOV
325
                }
×
326

UNCOV
327
                self.globals.raise_initiative(
×
UNCOV
328
                    "PeerThread::run_one_pass() with data-bearing network result".to_string(),
×
329
                );
330
            }
UNCOV
331
            Err(e) => {
×
332
                // this is only reachable if the network is not instantiated correctly --
333
                // i.e. you didn't connect it
UNCOV
334
                panic!("P2P: Failed to process network dispatch: {e:?}");
×
335
            }
336
        };
337

UNCOV
338
        if let Some(next_result) = self.results_with_data.take() {
×
339
            // have blocks, microblocks, and/or transactions (don't care about anything else),
340
            // or a directive to mine microblocks
UNCOV
341
            self.globals.raise_initiative(
×
UNCOV
342
                "PeerThread::run_one_pass() with backlogged network results".to_string(),
×
343
            );
UNCOV
344
            if let Err(e) = self.globals.relay_send.try_send(next_result) {
×
UNCOV
345
                debug!(
×
346
                    "P2P: {:?}: download backpressure detected",
347
                    &self.net.local_peer,
×
348
                );
UNCOV
349
                match e {
×
UNCOV
350
                    TrySendError::Full(directive) => {
×
UNCOV
351
                        // don't lose this data -- just try it again
×
UNCOV
352
                        self.results_with_data = Some(directive);
×
UNCOV
353
                    }
×
354
                    TrySendError::Disconnected(_) => {
UNCOV
355
                        info!("P2P: Relayer hang up with p2p channel");
×
UNCOV
356
                        self.globals.signal_stop();
×
UNCOV
357
                        return false;
×
358
                    }
359
                }
360
            } else {
UNCOV
361
                debug!("P2P: Dispatched result to Relayer!",);
×
362
            }
363
        }
×
364

UNCOV
365
        true
×
UNCOV
366
    }
×
367
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc