• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

stacks-network / stacks-core / 25903914664-1

15 May 2026 06:28AM UTC coverage: 47.122% (-38.8%) from 85.959%
25903914664-1

Pull #7199

github

94e391
web-flow
Merge 109f2828c into 1c7b8e6ac
Pull Request #7199: Feat: L1 and L2 early unlocks, updating signer

103343 of 219309 relevant lines covered (47.12%)

12880462.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.39
/stackslib/src/net/stackerdb/sync.rs
1
// Copyright (C) 2013-2020 Blockstack PBC, a public benefit corporation
2
// Copyright (C) 2020-2023 Stacks Open Internet Foundation
3
//
4
// This program is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8
//
9
// This program is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13
//
14
// You should have received a copy of the GNU General Public License
15
// along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

17
use std::collections::{HashMap, HashSet};
18
use std::mem;
19

20
use clarity::vm::types::QualifiedContractIdentifier;
21
use rand::prelude::SliceRandom;
22
use rand::{thread_rng, Rng};
23
use stacks_common::types::chainstate::ConsensusHash;
24
use stacks_common::util::get_epoch_time_secs;
25

26
use crate::net::db::PeerDB;
27
use crate::net::neighbors::comms::ToNeighborKey;
28
use crate::net::neighbors::NeighborComms;
29
use crate::net::p2p::PeerNetwork;
30
use crate::net::stackerdb::{
31
    StackerDBConfig, StackerDBSync, StackerDBSyncResult, StackerDBSyncState, StackerDBs,
32
};
33
use crate::net::{
34
    Error as net_error, NackErrorCodes, NeighborAddress, StackerDBChunkData, StackerDBChunkInvData,
35
    StackerDBGetChunkData, StackerDBGetChunkInvData, StackerDBPushChunkData, StacksMessageType,
36
};
37

38
const MAX_CHUNKS_IN_FLIGHT: usize = 6;
39
const MAX_DB_NEIGHBORS: usize = 32;
40

41
impl<NC: NeighborComms> StackerDBSync<NC> {
42
    pub fn new(
7,449✔
43
        smart_contract: QualifiedContractIdentifier,
7,449✔
44
        config: &StackerDBConfig,
7,449✔
45
        comms: NC,
7,449✔
46
        stackerdbs: StackerDBs,
7,449✔
47
    ) -> StackerDBSync<NC> {
7,449✔
48
        let mut dbsync = StackerDBSync {
7,449✔
49
            state: StackerDBSyncState::ConnectBegin,
7,449✔
50
            rc_consensus_hash: None,
7,449✔
51
            smart_contract_id: smart_contract,
7,449✔
52
            num_slots: config.num_slots() as usize,
7,449✔
53
            write_freq: config.write_freq,
7,449✔
54
            chunk_invs: HashMap::new(),
7,449✔
55
            chunk_fetch_priorities: vec![],
7,449✔
56
            chunk_push_priorities: vec![],
7,449✔
57
            chunk_push_receipts: HashMap::new(),
7,449✔
58
            next_chunk_fetch_priority: 0,
7,449✔
59
            next_chunk_push_priority: 0,
7,449✔
60
            expected_versions: vec![],
7,449✔
61
            downloaded_chunks: HashMap::new(),
7,449✔
62
            replicas: HashSet::new(),
7,449✔
63
            connected_replicas: HashSet::new(),
7,449✔
64
            comms,
7,449✔
65
            stackerdbs,
7,449✔
66
            request_capacity: MAX_CHUNKS_IN_FLIGHT,
7,449✔
67
            max_neighbors: MAX_DB_NEIGHBORS,
7,449✔
68
            total_stored: 0,
7,449✔
69
            total_pushed: 0,
7,449✔
70
            last_run_ts: 0,
7,449✔
71
            need_resync: false,
7,449✔
72
            stale_inv: false,
7,449✔
73
            stale_neighbors: HashSet::new(),
7,449✔
74
            num_connections: 0,
7,449✔
75
            num_attempted_connections: 0,
7,449✔
76
            rounds: 0,
7,449✔
77
            push_round: 0,
7,449✔
78
            last_eviction_time: get_epoch_time_secs(),
7,449✔
79
        };
7,449✔
80
        dbsync.reset(None, config);
7,449✔
81
        dbsync
7,449✔
82
    }
7,449✔
83

84
    /// Find stackerdb replicas and apply filtering rules
85
    fn find_qualified_replicas(
18,929,646✔
86
        &self,
18,929,646✔
87
        network: &PeerNetwork,
18,929,646✔
88
    ) -> Result<HashSet<NeighborAddress>, net_error> {
18,929,646✔
89
        let mut found = HashSet::new();
18,929,646✔
90
        let mut min_age =
18,929,646✔
91
            get_epoch_time_secs().saturating_sub(network.get_connection_opts().max_neighbor_age);
18,929,646✔
92

93
        let local_naddr = network.get_local_peer().to_neighbor_addr();
18,929,646✔
94

95
        while found.len() < self.max_neighbors {
37,859,256✔
96
            let peers_iter = PeerDB::find_stacker_db_replicas(
37,859,256✔
97
                network.peerdb_conn(),
37,859,256✔
98
                network.get_local_peer().network_id,
37,859,256✔
99
                &self.smart_contract_id,
37,859,256✔
100
                min_age,
37,859,256✔
101
                self.max_neighbors,
37,859,256✔
102
            )?
×
103
            .into_iter()
37,859,256✔
104
            .map(|neighbor| {
37,859,256✔
105
                (
8,833,986✔
106
                    NeighborAddress::from_neighbor(&neighbor),
8,833,986✔
107
                    neighbor.last_contact_time,
8,833,986✔
108
                )
8,833,986✔
109
            })
8,833,986✔
110
            .filter(|(naddr, _)| {
37,859,256✔
111
                if naddr.addrbytes.is_anynet() {
8,833,986✔
112
                    return false;
×
113
                }
8,833,986✔
114
                if naddr.public_key_hash == local_naddr.public_key_hash {
8,833,986✔
115
                    // don't talk to us by another address
116
                    return false;
×
117
                }
8,833,986✔
118
                if !network.get_connection_opts().private_neighbors
8,833,986✔
119
                    && naddr.addrbytes.is_in_private_range()
×
120
                {
121
                    return false;
×
122
                }
8,833,986✔
123
                true
8,833,986✔
124
            });
8,833,986✔
125

126
            for (peer, last_contact) in peers_iter {
37,859,256✔
127
                found.insert(peer);
8,833,986✔
128
                if found.len() >= self.max_neighbors {
8,833,986✔
129
                    break;
×
130
                }
8,833,986✔
131
                min_age = min_age.min(last_contact);
8,833,986✔
132
            }
133

134
            // search for older neighbors
135
            if min_age > 1 {
37,859,256✔
136
                min_age = 1;
18,929,628✔
137
            } else if min_age <= 1 {
18,929,628✔
138
                break;
18,929,646✔
139
            }
3✔
140
        }
141
        Ok(found)
18,929,646✔
142
    }
18,929,646✔
143

144
    /// Calculate the new set of replicas to contact.
145
    /// This is the same as the set that was connected on the last sync, plus any
146
    /// config hints and discovered nodes from the DB.
147
    fn find_new_replicas(
12,286,986✔
148
        &self,
12,286,986✔
149
        mut connected_replicas: HashSet<NeighborAddress>,
12,286,986✔
150
        network: Option<&PeerNetwork>,
12,286,986✔
151
        config: &StackerDBConfig,
12,286,986✔
152
    ) -> Result<HashSet<NeighborAddress>, net_error> {
12,286,986✔
153
        // keep all connected replicas, and replenish from config hints and the DB as needed
154
        let mut peers = config.hint_replicas.clone();
12,286,986✔
155
        if let Some(network) = network {
12,286,986✔
156
            let extra_peers = self.find_qualified_replicas(network)?;
12,226,617✔
157
            peers.extend(extra_peers);
12,226,617✔
158
        }
60,369✔
159

160
        peers.shuffle(&mut thread_rng());
12,286,986✔
161

162
        for peer in peers {
12,286,986✔
163
            if connected_replicas.len() >= config.max_neighbors {
4,344,462✔
164
                break;
18✔
165
            }
3,532,762✔
166
            connected_replicas.insert(peer);
3,532,762✔
167
        }
168
        Ok(connected_replicas)
12,286,986✔
169
    }
12,286,986✔
170

171
    /// Reset this state machine, and get the StackerDBSyncResult with newly-obtained chunk data
172
    /// and newly-learned information about connection statistics
173
    pub fn reset(
12,286,986✔
174
        &mut self,
12,286,986✔
175
        network: Option<&PeerNetwork>,
12,286,986✔
176
        config: &StackerDBConfig,
12,286,986✔
177
    ) -> StackerDBSyncResult {
12,286,986✔
178
        debug!(
12,286,986✔
179
            "{}: Reset with config {:?}",
180
            &self.smart_contract_id, config
×
181
        );
182
        let mut chunks = vec![];
12,286,986✔
183
        let downloaded_chunks = mem::replace(&mut self.downloaded_chunks, HashMap::new());
12,286,986✔
184
        for (_, mut data) in downloaded_chunks.into_iter() {
12,286,986✔
185
            chunks.append(&mut data);
50,157✔
186
        }
50,157✔
187

188
        let chunk_invs = mem::replace(&mut self.chunk_invs, HashMap::new());
12,286,986✔
189
        let result = StackerDBSyncResult {
12,286,986✔
190
            contract_id: self.smart_contract_id.clone(),
12,286,986✔
191
            chunk_invs,
12,286,986✔
192
            chunks_to_store: chunks,
12,286,986✔
193
            stale: std::mem::replace(&mut self.stale_neighbors, HashSet::new()),
12,286,986✔
194
            num_connections: self.num_connections,
12,286,986✔
195
            num_attempted_connections: self.num_attempted_connections,
12,286,986✔
196
        };
12,286,986✔
197

198
        // keep all connected replicas, and replenish from config hints and the DB as needed
199
        let connected_replicas = mem::replace(&mut self.connected_replicas, HashSet::new());
12,286,986✔
200
        let next_connected_replicas =
12,286,986✔
201
            if let Ok(new_replicas) = self.find_new_replicas(connected_replicas, network, config) {
12,286,986✔
202
                new_replicas
12,286,986✔
203
            } else {
204
                self.replicas.clone()
×
205
            };
206

207
        self.replicas = next_connected_replicas;
12,286,986✔
208

209
        self.chunk_fetch_priorities.clear();
12,286,986✔
210
        self.chunk_push_priorities.clear();
12,286,986✔
211
        self.next_chunk_fetch_priority = 0;
12,286,986✔
212
        self.next_chunk_push_priority = 0;
12,286,986✔
213
        self.chunk_push_receipts.clear();
12,286,986✔
214
        self.expected_versions.clear();
12,286,986✔
215
        self.downloaded_chunks.clear();
12,286,986✔
216

217
        // reset comms, but keep all connected replicas pinned.
218
        // Randomly evict one every so often.
219
        self.comms.reset();
12,286,986✔
220
        if let Some(network) = network {
12,286,986✔
221
            let mut eviction_index = None;
12,226,617✔
222
            if self.last_eviction_time + 60 < get_epoch_time_secs() {
12,226,617✔
223
                self.last_eviction_time = get_epoch_time_secs();
149,445✔
224
                if !self.replicas.is_empty() {
149,445✔
225
                    eviction_index = Some(thread_rng().gen_range(0..self.replicas.len()));
57,105✔
226
                }
139,482✔
227
            }
12,077,172✔
228

229
            let remove_naddr = eviction_index.and_then(|idx| {
12,226,617✔
230
                let removed = self.replicas.iter().nth(idx).cloned();
57,105✔
231
                if let Some(naddr) = removed.as_ref() {
57,105✔
232
                    debug!(
57,105✔
233
                        "{:?}: {}: don't reuse connection for replica {:?}",
234
                        network.get_local_peer(),
×
235
                        &self.smart_contract_id,
×
236
                        &naddr,
×
237
                    );
238
                }
×
239
                removed
57,105✔
240
            });
57,105✔
241

242
            if let Some(naddr) = remove_naddr {
12,226,617✔
243
                self.replicas.remove(&naddr);
57,123✔
244
            }
12,169,494✔
245

246
            // retain the remaining replica connections
247
            for naddr in self.replicas.iter() {
9,122,725✔
248
                if let Some(event_id) = network.get_event_id(&naddr.to_neighbor_key(network)) {
3,489,022✔
249
                    self.comms.pin_connection(event_id);
3,479,671✔
250
                    debug!(
3,479,671✔
251
                        "{:?}: {}: reuse connection for replica {:?} on event {}",
252
                        network.get_local_peer(),
×
253
                        &self.smart_contract_id,
×
254
                        &naddr,
×
255
                        event_id
256
                    );
257
                }
42,327✔
258
            }
259
        }
60,369✔
260

261
        // reload from config
262
        self.num_slots = config.num_slots() as usize;
12,286,986✔
263
        self.write_freq = config.write_freq;
12,286,986✔
264

265
        self.need_resync = false;
12,286,986✔
266
        self.stale_inv = false;
12,286,986✔
267
        self.last_run_ts = get_epoch_time_secs();
12,286,986✔
268

269
        self.state = StackerDBSyncState::ConnectBegin;
12,286,986✔
270
        self.num_connections = 0;
12,286,986✔
271
        self.num_attempted_connections = 0;
12,286,986✔
272
        self.rounds += 1;
12,286,986✔
273
        self.rc_consensus_hash = None;
12,286,986✔
274
        result
12,286,986✔
275
    }
12,286,986✔
276

277
    /// Get the set of connection IDs in use
278
    pub fn get_pinned_connections(&self) -> &HashSet<usize> {
79,253,218✔
279
        self.comms.get_pinned_connections()
79,253,218✔
280
    }
79,253,218✔
281

282
    /// Unpin and remove a connected replica by naddr
283
    pub fn unpin_connected_replica(&mut self, network: &PeerNetwork, naddr: &NeighborAddress) {
243✔
284
        let nk = naddr.to_neighbor_key(network);
243✔
285
        if let Some(event_id) = network.get_event_id(&nk) {
243✔
286
            self.comms.unpin_connection(event_id);
×
287
        }
243✔
288
        self.connected_replicas.remove(naddr);
243✔
289
    }
243✔
290

291
    /// Make a chunk inv request
292
    pub fn make_getchunkinv(&self, rc_consensus_hash: &ConsensusHash) -> StacksMessageType {
3,917,052✔
293
        StacksMessageType::StackerDBGetChunkInv(StackerDBGetChunkInvData {
3,917,052✔
294
            contract_id: self.smart_contract_id.clone(),
3,917,052✔
295
            rc_consensus_hash: rc_consensus_hash.clone(),
3,917,052✔
296
        })
3,917,052✔
297
    }
3,917,052✔
298

299
    /// Given the downloaded set of chunk inventories, identify:
300
    /// * which chunks we need to fetch, because they're newer than ours.
301
    /// * what order to fetch chunks in, in rarest-first order
302
    /// Returns a list of (chunk requests, list of neighbors that can service them), which is
303
    /// ordered from rarest chunk to most-common chunk.
304
    pub fn make_chunk_request_schedule(
3,890,592✔
305
        &self,
3,890,592✔
306
        network: &PeerNetwork,
3,890,592✔
307
        local_slot_versions_opt: Option<Vec<u32>>,
3,890,592✔
308
    ) -> Result<Vec<(StackerDBGetChunkData, Vec<NeighborAddress>)>, net_error> {
3,890,592✔
309
        let rc_consensus_hash = network.get_chain_view().rc_consensus_hash.clone();
3,890,592✔
310
        let local_slot_versions = if let Some(local_slot_versions) = local_slot_versions_opt {
3,890,592✔
311
            local_slot_versions
59,472✔
312
        } else {
313
            self.stackerdbs.get_slot_versions(&self.smart_contract_id)?
3,831,120✔
314
        };
315

316
        let local_write_timestamps = self
3,890,592✔
317
            .stackerdbs
3,890,592✔
318
            .get_slot_write_timestamps(&self.smart_contract_id)?;
3,890,592✔
319

320
        if local_slot_versions.len() != local_write_timestamps.len() {
3,890,592✔
321
            let msg = format!("{}: Local slot versions ({}) out of sync with DB slot versions ({}); abandoning sync and trying again", &self.smart_contract_id, local_slot_versions.len(), local_write_timestamps.len());
×
322
            warn!("{}", &msg);
×
323
            return Err(net_error::Transient(msg));
×
324
        }
3,890,592✔
325

326
        let mut need_chunks: HashMap<usize, (StackerDBGetChunkData, Vec<NeighborAddress>)> =
3,890,592✔
327
            HashMap::new();
3,890,592✔
328
        let now = get_epoch_time_secs();
3,890,592✔
329

330
        // who has data we need?
331
        for ((i, local_version), write_ts) in local_slot_versions
9,703,008✔
332
            .iter()
3,890,592✔
333
            .enumerate()
3,890,592✔
334
            .zip(local_write_timestamps.iter())
3,890,592✔
335
        {
336
            if self.write_freq > 0 && write_ts + self.write_freq > now {
9,579,807✔
337
                debug!(
×
338
                    "{:?}: {}: Chunk {} was written too frequently ({} + {} > {}) in {}, so will not fetch chunk",
339
                    network.get_local_peer(),
×
340
                    &self.smart_contract_id,
×
341
                    i,
342
                    write_ts,
343
                    self.write_freq,
344
                    now,
345
                    &self.smart_contract_id,
×
346
                );
347
                continue;
×
348
            }
9,579,807✔
349

350
            for (naddr, chunk_inv) in self.chunk_invs.iter() {
9,579,807✔
351
                if chunk_inv.slot_versions.len() != local_slot_versions.len() {
8,537,265✔
352
                    // remote peer and our DB are out of sync, so just skip this
353
                    continue;
468✔
354
                }
8,536,797✔
355

356
                let Some(remote_version) = chunk_inv.slot_versions.get(i) else {
8,536,797✔
357
                    // remote peer isn't tracking this chunk
358
                    continue;
×
359
                };
360

361
                if local_version >= remote_version {
8,536,797✔
362
                    // remote peer has same view as local peer, or stale
363
                    continue;
8,456,616✔
364
                }
80,181✔
365

366
                let (request, available) = if let Some(x) = need_chunks.get_mut(&i) {
80,181✔
367
                    // someone has this chunk already
368
                    x
×
369
                } else {
370
                    // haven't seen anyone with this data yet.
371
                    // Add a record for it
372
                    need_chunks.insert(
80,181✔
373
                        i,
80,181✔
374
                        (
80,181✔
375
                            StackerDBGetChunkData {
80,181✔
376
                                contract_id: self.smart_contract_id.clone(),
80,181✔
377
                                rc_consensus_hash: rc_consensus_hash.clone(),
80,181✔
378
                                slot_id: i as u32,
80,181✔
379
                                slot_version: *remote_version,
80,181✔
380
                            },
80,181✔
381
                            vec![naddr.clone()],
80,181✔
382
                        ),
80,181✔
383
                    );
384
                    continue;
80,181✔
385
                };
386

387
                if request.slot_version < *remote_version {
×
388
                    // this peer has a newer view
×
389
                    available.clear();
×
390
                    available.push(naddr.clone());
×
391
                    *request = StackerDBGetChunkData {
×
392
                        contract_id: self.smart_contract_id.clone(),
×
393
                        rc_consensus_hash: rc_consensus_hash.clone(),
×
394
                        slot_id: i as u32,
×
395
                        slot_version: *remote_version,
×
396
                    };
×
397
                } else if request.slot_version == *remote_version {
×
398
                    // this peer has the same view as a prior peer.
×
399
                    // just track how many times we see this
×
400
                    available.push(naddr.clone());
×
401
                }
×
402
            }
403
        }
404

405
        // prioritize requests by rarest-chunk-first order, but choose neighbors in random order
406
        let mut schedule: Vec<_> = need_chunks
3,890,592✔
407
            .into_iter()
3,890,592✔
408
            .map(|(_, (stackerdb_getchunkdata, mut neighbors))| {
3,890,592✔
409
                neighbors.shuffle(&mut thread_rng());
80,181✔
410
                (stackerdb_getchunkdata, neighbors)
80,181✔
411
            })
80,181✔
412
            .collect();
3,890,592✔
413

414
        schedule.sort_by(|item_1, item_2| item_1.1.len().cmp(&item_2.1.len()));
3,890,592✔
415
        schedule.reverse();
3,890,592✔
416

417
        debug!(
3,890,592✔
418
            "{:?}: {}: Will request up to {} chunks. Schedule: {:?}",
419
            network.get_local_peer(),
×
420
            &self.smart_contract_id,
×
421
            &schedule.len(),
×
422
            &schedule
×
423
        );
424
        Ok(schedule)
3,890,592✔
425
    }
3,890,592✔
426

427
    /// Given the downloaded set of chunk inventories, identify:
428
    /// * which chunks we need to push, because we have them and the neighbor does not
429
    /// * what order to push them in, in rarest-first order
430
    pub fn make_chunk_push_schedule(
3,829,959✔
431
        &self,
3,829,959✔
432
        network: &PeerNetwork,
3,829,959✔
433
    ) -> Result<Vec<(StackerDBPushChunkData, Vec<NeighborAddress>)>, net_error> {
3,829,959✔
434
        let rc_consensus_hash = network.get_chain_view().rc_consensus_hash.clone();
3,829,959✔
435
        let local_slot_versions = self.stackerdbs.get_slot_versions(&self.smart_contract_id)?;
3,829,959✔
436

437
        let mut need_chunks: HashMap<usize, (StackerDBPushChunkData, Vec<NeighborAddress>)> =
3,829,959✔
438
            HashMap::new();
3,829,959✔
439

440
        // who needs data we can serve?
441
        for (i, local_version) in local_slot_versions.iter().enumerate() {
9,407,835✔
442
            let mut local_chunk = None;
9,279,315✔
443
            for (naddr, chunk_inv) in self.chunk_invs.iter() {
9,279,315✔
444
                if chunk_inv.slot_versions.len() != local_slot_versions.len() {
8,236,773✔
445
                    // remote peer and our DB are out of sync, so just skip this
446
                    continue;
468✔
447
                }
8,236,305✔
448

449
                let Some(remote_version) = chunk_inv.slot_versions.get(i) else {
8,236,305✔
450
                    // remote peer isn't tracking this chunk
451
                    continue;
×
452
                };
453

454
                if local_version <= remote_version {
8,236,305✔
455
                    // remote peer has same or newer view than local peer
456
                    continue;
8,129,538✔
457
                }
106,767✔
458

459
                if local_chunk.is_none() {
106,767✔
460
                    let chunk_data = if let Some(chunk_data) = self.stackerdbs.get_chunk(
106,767✔
461
                        &self.smart_contract_id,
106,767✔
462
                        i as u32,
106,767✔
463
                        *local_version,
106,767✔
464
                    )? {
×
465
                        chunk_data
106,767✔
466
                    } else {
467
                        // we don't have this chunk
468
                        break;
×
469
                    };
470
                    local_chunk = Some(StackerDBPushChunkData {
106,767✔
471
                        contract_id: self.smart_contract_id.clone(),
106,767✔
472
                        rc_consensus_hash: rc_consensus_hash.clone(),
106,767✔
473
                        chunk_data,
106,767✔
474
                    });
106,767✔
475
                }
×
476

477
                let our_chunk = if let Some(chunk) = local_chunk.as_ref() {
106,767✔
478
                    chunk
106,767✔
479
                } else {
480
                    // we don't have this chunk
481
                    break;
×
482
                };
483

484
                // replicate with probability 1/num-outbound-replicas
485
                let do_replicate = if chunk_inv.num_outbound_replicas == 0 {
106,767✔
486
                    true
27,099✔
487
                } else {
488
                    thread_rng().gen::<u32>() % chunk_inv.num_outbound_replicas == 0
79,668✔
489
                };
490

491
                debug!(
106,767✔
492
                    "{:?}: {}: Can push chunk StackerDBChunk(id={},ver={}) to {}. Replicate? {}",
493
                    &network.get_local_peer(),
×
494
                    &self.smart_contract_id,
×
495
                    our_chunk.chunk_data.slot_id,
496
                    our_chunk.chunk_data.slot_version,
497
                    &naddr,
×
498
                    do_replicate
499
                );
500

501
                if !do_replicate {
106,767✔
502
                    continue;
×
503
                }
106,767✔
504

505
                if let Some((_, receivers)) = need_chunks.get_mut(&i) {
106,767✔
506
                    // someone needs this chunk already
×
507
                    receivers.push(naddr.clone());
×
508
                } else {
106,767✔
509
                    // haven't seen anyone that needs this data yet.
106,767✔
510
                    // Add a record for it.
106,767✔
511
                    need_chunks.insert(i, (our_chunk.clone(), vec![naddr.clone()]));
106,767✔
512
                };
106,767✔
513
            }
514
        }
515

516
        // prioritize requests by rarest-chunk-first order.
517
        // no need to randomize; we'll pick recipients at random
518
        let mut schedule: Vec<_> = need_chunks
3,829,959✔
519
            .into_iter()
3,829,959✔
520
            .map(|(_, (stackerdb_chunkdata, neighbors))| (stackerdb_chunkdata, neighbors))
3,829,959✔
521
            .collect();
3,829,959✔
522

523
        schedule.sort_by(|item_1, item_2| item_1.1.len().cmp(&item_2.1.len()));
3,829,959✔
524
        debug!(
3,829,959✔
525
            "{:?}: {}: Will push up to {} chunks",
526
            network.get_local_peer(),
×
527
            &self.smart_contract_id,
×
528
            &schedule.len(),
×
529
        );
530
        Ok(schedule)
3,829,959✔
531
    }
3,829,959✔
532

533
    /// Validate a downloaded chunk
534
    pub fn validate_downloaded_chunk(
78,885✔
535
        &self,
78,885✔
536
        network: &PeerNetwork,
78,885✔
537
        config: &StackerDBConfig,
78,885✔
538
        data: &StackerDBChunkData,
78,885✔
539
    ) -> Result<bool, net_error> {
78,885✔
540
        // validate -- must be a valid chunk
541
        if !network.validate_received_chunk(
78,885✔
542
            &self.smart_contract_id,
78,885✔
543
            config,
78,885✔
544
            data,
78,885✔
545
            &self.expected_versions,
78,885✔
546
        )? {
×
547
            return Ok(false);
×
548
        }
78,885✔
549

550
        // no need to validate the timestamp, because we already skipped requesting it if it was
551
        // written too recently.
552

553
        Ok(true)
78,885✔
554
    }
78,885✔
555

556
    /// Store a downloaded chunk to RAM, and update bookkeeping
557
    pub fn add_downloaded_chunk(&mut self, naddr: NeighborAddress, data: StackerDBChunkData) {
78,885✔
558
        let slot_id = data.slot_id;
78,885✔
559
        let _slot_version = data.slot_version;
78,885✔
560

561
        if let Some(data_list) = self.downloaded_chunks.get_mut(&naddr) {
78,885✔
562
            data_list.push(data);
28,728✔
563
        } else {
50,724✔
564
            self.downloaded_chunks.insert(naddr.clone(), vec![data]);
50,157✔
565
        }
50,157✔
566

567
        self.chunk_fetch_priorities
78,885✔
568
            .retain(|(chunk, ..)| chunk.slot_id != slot_id);
117,576✔
569

570
        if !self.chunk_fetch_priorities.is_empty() {
78,885✔
571
            let next_chunk_fetch_priority =
29,322✔
572
                self.next_chunk_fetch_priority % self.chunk_fetch_priorities.len();
29,322✔
573
            self.next_chunk_fetch_priority = next_chunk_fetch_priority;
29,322✔
574
        }
50,616✔
575

576
        self.total_stored += 1;
78,885✔
577
    }
78,885✔
578

579
    /// Update bookkeeping about which chunks we have pushed.
580
    /// Stores the new chunk inventory to RAM.
581
    /// Returns true if the inventory changed (indicating that we need to resync)
582
    /// Returns false otherwise
583
    pub fn add_pushed_chunk(
59,472✔
584
        &mut self,
59,472✔
585
        _network: &PeerNetwork,
59,472✔
586
        naddr: NeighborAddress,
59,472✔
587
        new_inv: StackerDBChunkInvData,
59,472✔
588
        slot_id: u32,
59,472✔
589
    ) -> bool {
59,472✔
590
        // safety (should already be checked) -- don't accept if the size is wrong
591
        if new_inv.slot_versions.len() != self.num_slots {
59,472✔
592
            return false;
×
593
        }
59,472✔
594

595
        let need_resync = if let Some(old_inv) = self.chunk_invs.get(&naddr) {
59,472✔
596
            let mut resync = false;
59,472✔
597
            for (old_slot_id, (old_version, new_version)) in old_inv
145,674✔
598
                .slot_versions
59,472✔
599
                .iter()
59,472✔
600
                .zip(new_inv.slot_versions.iter())
59,472✔
601
                .enumerate()
59,472✔
602
            {
603
                if old_version < new_version {
145,674✔
604
                    // remote peer indicated that it has a newer version of this chunk.
605
                    debug!(
59,472✔
606
                        "{:?}: {}: peer {:?} has a newer version of slot {} ({} < {})",
607
                        _network.get_local_peer(),
×
608
                        &self.smart_contract_id,
×
609
                        &naddr,
×
610
                        old_slot_id,
611
                        old_version,
612
                        new_version,
613
                    );
614
                    resync = true;
59,472✔
615
                    break;
59,472✔
616
                }
86,202✔
617
            }
618
            resync
59,472✔
619
        } else {
620
            false
×
621
        };
622

623
        self.chunk_invs.insert(naddr, new_inv);
59,472✔
624

625
        self.chunk_push_priorities
59,472✔
626
            .retain(|(chunk, ..)| chunk.chunk_data.slot_id != slot_id);
106,038✔
627

628
        if !self.chunk_push_priorities.is_empty() {
59,472✔
629
            let next_chunk_push_priority =
33,912✔
630
                self.next_chunk_push_priority % self.chunk_push_priorities.len();
33,912✔
631
            self.next_chunk_push_priority = next_chunk_push_priority;
33,912✔
632
        }
37,503✔
633

634
        self.total_pushed += 1;
59,472✔
635
        need_resync
59,472✔
636
    }
59,472✔
637

638
    /// Ask inbound neighbors who replicate this DB for their chunk inventories.
639
    /// Don't send them a message if they're also outbound.
640
    /// Logs errors but does not return them.
641
    fn send_getchunkinv_to_inbound_neighbors(
3,894,948✔
642
        &mut self,
3,894,948✔
643
        network: &mut PeerNetwork,
3,894,948✔
644
        already_sent: &[NeighborAddress],
3,894,948✔
645
    ) {
3,894,948✔
646
        let sent_naddr_set: HashSet<_> = already_sent.iter().collect();
3,894,948✔
647
        let mut to_send = vec![];
3,894,948✔
648
        for event_id in network.iter_peer_event_ids() {
7,406,991✔
649
            let convo = if let Some(c) = network.get_p2p_convo(*event_id) {
7,406,991✔
650
                c
7,406,991✔
651
            } else {
652
                continue;
×
653
            };
654

655
            // only want inbound peers that replicate this DB
656
            if convo.is_outbound() {
7,406,991✔
657
                continue;
3,894,462✔
658
            }
3,512,529✔
659
            if !convo.replicates_stackerdb(&self.smart_contract_id) {
3,512,529✔
660
                continue;
2,194,551✔
661
            }
1,317,978✔
662

663
            let naddr = convo.to_neighbor_address();
1,317,978✔
664
            if sent_naddr_set.contains(&naddr) {
1,317,978✔
665
                continue;
21,861✔
666
            }
1,296,117✔
667

668
            let has_reciprocal_outbound = network
1,296,117✔
669
                .get_pubkey_events(&naddr.public_key_hash)
1,296,117✔
670
                .iter()
1,296,117✔
671
                .find(|event_id| {
1,896,048✔
672
                    if let Some(convo) = network.get_p2p_convo(**event_id) {
1,896,048✔
673
                        if !convo.is_outbound() {
1,896,048✔
674
                            return false;
599,931✔
675
                        }
1,296,117✔
676
                        let other_naddr = convo.to_neighbor_address();
1,296,117✔
677
                        if sent_naddr_set.contains(&other_naddr) {
1,296,117✔
678
                            return true;
1,290,771✔
679
                        }
5,346✔
680
                    }
×
681
                    return false;
5,346✔
682
                })
1,896,048✔
683
                .is_some();
1,296,117✔
684

685
            if has_reciprocal_outbound {
1,296,117✔
686
                // this inbound neighbor is also connected to us as an outbound neighbor, and we
687
                // already sent it a getchunkinv request
688
                continue;
1,290,771✔
689
            }
5,346✔
690

691
            let chunks_req = self.make_getchunkinv(&network.get_chain_view().rc_consensus_hash);
5,346✔
692
            to_send.push((naddr, chunks_req));
5,346✔
693
        }
694

695
        for (naddr, chunks_req) in to_send.into_iter() {
3,894,948✔
696
            debug!("{:?}: {}: send_getchunksinv_to_inbound_neighbors: Send StackerDBGetChunkInv at {} to inbound {:?}", network.get_local_peer(), &self.smart_contract_id, &network.get_chain_view().rc_consensus_hash, &naddr);
5,346✔
697
            if let Err(_e) = self.comms.neighbor_send(network, &naddr, chunks_req) {
5,346✔
698
                info!(
×
699
                    "{:?}: {}: Failed to send StackerDBGetChunkInv to inbound {:?}: {:?}",
700
                    network.get_local_peer(),
×
701
                    &self.smart_contract_id,
×
702
                    &naddr,
×
703
                    &_e
×
704
                );
705
            }
5,346✔
706
        }
707
    }
3,894,948✔
708

709
    /// Establish sessions with remote replicas.
710
    /// We might not be connected to any yet.
711
    /// Clears self.replicas, and fills in self.connected_replicas with already-connected neighbors
712
    /// Returns Ok(true) if we can proceed to sync
713
    /// Returns Ok(false) if we should try this again
714
    /// Returns Err(NoSuchNeighbor) if we don't have anyone to talk to
715
    /// Returns Err(..) on DB query error
716
    pub fn connect_begin(&mut self, network: &mut PeerNetwork) -> Result<bool, net_error> {
10,548,522✔
717
        if self.replicas.is_empty() {
10,548,522✔
718
            // find some from the peer DB
719
            let replicas = self.find_qualified_replicas(network)?;
6,703,029✔
720
            self.replicas = replicas;
6,703,029✔
721
        }
3,845,493✔
722
        debug!(
10,548,522✔
723
            "{:?}: {}: connect_begin: establish StackerDB sessions to {} neighbors (out of {} p2p peers)",
724
            network.get_local_peer(),
×
725
            &self.smart_contract_id,
×
726
            self.replicas.len(),
×
727
            network.get_num_p2p_convos();
×
728
            "replicas" => ?self.replicas
729
        );
730
        if self.replicas.is_empty() {
10,548,522✔
731
            // nothing to do
732
            return Err(net_error::NoSuchNeighbor);
6,630,498✔
733
        }
3,918,024✔
734

735
        let naddrs = mem::replace(&mut self.replicas, HashSet::new());
3,918,024✔
736
        for naddr in naddrs.into_iter() {
3,940,371✔
737
            if self.comms.is_neighbor_connecting(network, &naddr) {
3,940,371✔
738
                debug!(
22,176✔
739
                    "{:?}: {}: connect_begin: already connecting to StackerDB peer {:?}",
740
                    network.get_local_peer(),
×
741
                    &self.smart_contract_id,
×
742
                    &naddr
×
743
                );
744
                self.replicas.insert(naddr);
22,176✔
745
                continue;
22,176✔
746
            }
3,918,195✔
747
            if self.comms.has_neighbor_session(network, &naddr) {
3,918,195✔
748
                debug!(
3,875,292✔
749
                    "{:?}: {}: connect_begin: already connected to StackerDB peer {:?}",
750
                    network.get_local_peer(),
×
751
                    &self.smart_contract_id,
×
752
                    &naddr
×
753
                );
754
                self.connected_replicas.insert(naddr);
3,875,292✔
755
                continue;
3,875,292✔
756
            }
42,903✔
757

758
            debug!(
42,903✔
759
                "{:?}: {}: connect_begin: Send Handshake to StackerDB peer {:?}",
760
                network.get_local_peer(),
×
761
                &self.smart_contract_id,
×
762
                &naddr
×
763
            );
764
            match self.comms.neighbor_session_begin(network, &naddr) {
42,903✔
765
                Ok(true) => {
766
                    // connected!
767
                    debug!(
41,760✔
768
                        "{:?}: {}: connect_begin: connected to StackerDB peer {:?}",
769
                        network.get_local_peer(),
×
770
                        &self.smart_contract_id,
×
771
                        &naddr
×
772
                    );
773
                    self.num_attempted_connections += 1;
41,760✔
774
                    self.num_connections += 1;
41,760✔
775
                    self.connected_replicas.insert(naddr);
41,760✔
776
                }
777
                Ok(false) => {
1,089✔
778
                    // need to retry
1,089✔
779
                    self.num_attempted_connections += 1;
1,089✔
780
                    self.replicas.insert(naddr);
1,089✔
781
                }
1,089✔
782
                Err(_e) => {
54✔
783
                    debug!(
54✔
784
                        "{:?}: {}: Failed to begin session with {:?}: {:?}",
785
                        &network.get_local_peer(),
×
786
                        &self.smart_contract_id,
×
787
                        &naddr,
×
788
                        &_e
×
789
                    );
790
                }
791
            }
792
        }
793
        Ok(!self.connected_replicas.is_empty())
3,918,024✔
794
    }
10,548,522✔
795

796
    /// Finish up connecting to our replicas.
797
    /// Fills in self.connected_replicas based on receipt of a handshake accept.
798
    /// Returns true if we've received all pending messages
799
    /// Returns false otherwise
800
    pub fn connect_try_finish(&mut self, network: &mut PeerNetwork) -> Result<bool, net_error> {
4,084,992✔
801
        for (naddr, message) in self.comms.collect_replies(network).into_iter() {
4,084,992✔
802
            let data = match message.payload {
41,031✔
803
                StacksMessageType::StackerDBHandshakeAccept(_, db_data) => {
41,031✔
804
                    if network.get_chain_view().rc_consensus_hash != db_data.rc_consensus_hash {
41,031✔
805
                        // stale or inconsistent view. Do not proceed
806
                        debug!(
4,617✔
807
                            "{:?}: {}: remote peer {:?} has stale view ({} != {})",
808
                            network.get_local_peer(),
×
809
                            &self.smart_contract_id,
×
810
                            &naddr,
×
811
                            &network.get_chain_view().rc_consensus_hash,
×
812
                            &db_data.rc_consensus_hash
×
813
                        );
814
                        // don't unpin, since it's usually transient
815
                        self.connected_replicas.remove(&naddr);
4,617✔
816
                        continue;
4,617✔
817
                    }
36,414✔
818
                    db_data
36,414✔
819
                }
820
                StacksMessageType::Nack(data) => {
×
821
                    debug!(
×
822
                        "{:?}: {}: remote peer {:?} NACK'ed our StackerDBHandshake with code {}",
823
                        &network.get_local_peer(),
×
824
                        &self.smart_contract_id,
×
825
                        &naddr,
×
826
                        data.error_code
827
                    );
828
                    if data.error_code == NackErrorCodes::StaleView
×
829
                        || data.error_code == NackErrorCodes::FutureView
×
830
                    {
×
831
                        self.connected_replicas.remove(&naddr);
×
832
                        self.stale_neighbors.insert(naddr);
×
833
                    } else {
×
834
                        self.unpin_connected_replica(network, &naddr);
×
835
                    }
×
836
                    continue;
×
837
                }
838
                x => {
×
839
                    info!(
×
840
                        "{:?}: {}: Received unexpected message {:?}",
841
                        &network.get_local_peer(),
×
842
                        &self.smart_contract_id,
×
843
                        &x
×
844
                    );
845
                    continue;
×
846
                }
847
            };
848

849
            if data
36,414✔
850
                .smart_contracts
36,414✔
851
                .iter()
36,414✔
852
                .find(|db_id| *db_id == &self.smart_contract_id)
514,953✔
853
                .is_none()
36,414✔
854
            {
855
                debug!(
243✔
856
                    "{:?}: {}: remote peer does not replicate",
857
                    network.get_local_peer(),
×
858
                    &self.smart_contract_id
×
859
                );
860

861
                // disconnect
862
                self.unpin_connected_replica(network, &naddr);
243✔
863
                continue;
243✔
864
            }
36,171✔
865

866
            debug!(
36,171✔
867
                "{:?}: {}: connect_try_finish: Received StackerDBHandshakeAccept from {:?} for {:?}",
868
                network.get_local_peer(),
×
869
                &self.smart_contract_id,
×
870
                &naddr,
×
871
                &data
×
872
            );
873

874
            // this neighbor is good
875
            self.connected_replicas.insert(naddr);
36,171✔
876
        }
877

878
        if self.comms.count_inflight() > 0 {
4,084,992✔
879
            // still blocked
880
            return Ok(false);
190,044✔
881
        }
3,894,948✔
882

883
        if self.connected_replicas.is_empty() {
3,894,948✔
884
            // no one to talk to
885
            debug!(
×
886
                "{:?}: {}: connect_try_finish: no valid replicas",
887
                &self.smart_contract_id,
×
888
                network.get_local_peer()
×
889
            );
890
            return Err(net_error::PeerNotConnected(format!(
×
891
                "StackerDB connect_try_finish: no valid replicas for {}",
×
892
                &self.smart_contract_id
×
893
            )));
×
894
        }
3,894,948✔
895

896
        Ok(true)
3,894,948✔
897
    }
4,084,992✔
898

899
    /// Ask each replica for its chunk inventories.
900
    /// Also ask each inbound neighbor.
901
    /// Clears self.connected_replicas.
902
    /// StackerDBGetChunksInv
903
    /// Always succeeds; does not block.
904
    pub fn getchunksinv_begin(&mut self, network: &mut PeerNetwork) {
3,894,948✔
905
        let naddrs = mem::replace(&mut self.connected_replicas, HashSet::new());
3,894,948✔
906
        let mut already_sent = vec![];
3,894,948✔
907
        debug!(
3,894,948✔
908
            "{:?}: {}: getchunksinv_begin: Send StackerDBGetChunksInv to {} replicas",
909
            network.get_local_peer(),
×
910
            &self.smart_contract_id,
×
911
            naddrs.len();
×
912
            "connected_replicas" => ?naddrs,
913
        );
914
        for naddr in naddrs.into_iter() {
3,911,706✔
915
            debug!(
3,911,706✔
916
                "{:?}: {}: getchunksinv_begin: Send StackerDBGetChunksInv at {} to {:?}",
917
                network.get_local_peer(),
×
918
                &self.smart_contract_id,
×
919
                &network.get_chain_view().rc_consensus_hash,
×
920
                &naddr,
×
921
            );
922
            let chunks_req = self.make_getchunkinv(&network.get_chain_view().rc_consensus_hash);
3,911,706✔
923
            if let Err(e) = self.comms.neighbor_send(network, &naddr, chunks_req) {
3,911,706✔
924
                debug!(
22,833✔
925
                    "{:?}: {}: failed to send StackerDBGetChunkInv to {:?}: {:?}",
926
                    network.get_local_peer(),
×
927
                    &self.smart_contract_id,
×
928
                    &naddr,
×
929
                    &e
×
930
                );
931
                continue;
22,833✔
932
            }
3,888,873✔
933
            already_sent.push(naddr);
3,888,873✔
934
        }
935
        self.send_getchunkinv_to_inbound_neighbors(network, &already_sent);
3,894,948✔
936
    }
3,894,948✔
937

938
    /// Collect each chunk inventory request.
939
    /// Restores self.connected_replicas based on messages received.
940
    /// Return Ok(true) if we've received all pending messages
941
    /// Return Ok(false) if not
942
    pub fn getchunksinv_try_finish(
17,022,060✔
943
        &mut self,
17,022,060✔
944
        network: &mut PeerNetwork,
17,022,060✔
945
    ) -> Result<bool, net_error> {
17,022,060✔
946
        for (naddr, message) in self.comms.collect_replies(network).into_iter() {
17,022,060✔
947
            let chunk_inv_opt = match message.payload {
3,818,520✔
948
                StacksMessageType::StackerDBChunkInv(data) => {
3,419,478✔
949
                    if data.slot_versions.len() != self.num_slots {
3,419,478✔
950
                        info!("{:?}: {}: Received malformed StackerDBChunkInv from {:?}: expected {} chunks, got {}", network.get_local_peer(), &self.smart_contract_id, &naddr, self.num_slots, data.slot_versions.len());
2,709✔
951
                        None
2,709✔
952
                    } else {
953
                        Some(data)
3,416,769✔
954
                    }
955
                }
956
                StacksMessageType::Nack(data) => {
399,042✔
957
                    debug!(
399,042✔
958
                        "{:?}: {}: remote peer {:?} NACK'ed our StackerDBGetChunksInv with code {}",
959
                        network.get_local_peer(),
×
960
                        &self.smart_contract_id,
×
961
                        &naddr,
×
962
                        data.error_code
963
                    );
964
                    if data.error_code == NackErrorCodes::StaleView
399,042✔
965
                        || data.error_code == NackErrorCodes::FutureView
123,039✔
966
                    {
399,042✔
967
                        self.connected_replicas.remove(&naddr);
399,042✔
968
                        self.stale_neighbors.insert(naddr);
399,042✔
969
                    } else {
399,042✔
970
                        self.unpin_connected_replica(network, &naddr);
×
971
                    }
×
972
                    continue;
399,042✔
973
                }
974
                x => {
×
975
                    info!(
×
976
                        "{:?}: {}: Received unexpected message {:?}",
977
                        network.get_local_peer(),
×
978
                        &self.smart_contract_id,
×
979
                        &x
×
980
                    );
981
                    self.unpin_connected_replica(network, &naddr);
×
982
                    continue;
×
983
                }
984
            };
985
            debug!(
3,419,478✔
986
                "{:?}: {}: getchunksinv_try_finish: Received StackerDBChunkInv from {:?}: {:?}",
987
                network.get_local_peer(),
×
988
                &self.smart_contract_id,
×
989
                &naddr,
×
990
                &chunk_inv_opt
×
991
            );
992

993
            if let Some(chunk_inv) = chunk_inv_opt {
3,419,478✔
994
                self.chunk_invs.insert(naddr.clone(), chunk_inv);
3,416,769✔
995
                self.connected_replicas.insert(naddr);
3,416,769✔
996
            }
3,416,769✔
997
        }
998
        if self.comms.count_inflight() > 0 {
17,022,060✔
999
            // not done yet, so blocked
1000
            return Ok(false);
13,190,940✔
1001
        }
3,831,120✔
1002

1003
        // got everything. Calculate download priority
1004
        let priorities = self.make_chunk_request_schedule(network, None)?;
3,831,120✔
1005
        let expected_versions = self.stackerdbs.get_slot_versions(&self.smart_contract_id)?;
3,831,120✔
1006

1007
        self.chunk_fetch_priorities = priorities;
3,831,120✔
1008
        self.expected_versions = expected_versions;
3,831,120✔
1009
        Ok(true)
3,831,120✔
1010
    }
17,022,060✔
1011

1012
    /// Ask each prioritized replica for some chunks we need.
1013
    /// Return Ok(true) if we processed all requested chunks
1014
    /// Return Ok(false) if there are still some requests to make
1015
    pub fn getchunks_begin(&mut self, network: &mut PeerNetwork) -> Result<bool, net_error> {
4,169,763✔
1016
        if self.chunk_fetch_priorities.is_empty() {
4,169,763✔
1017
            // done
1018
            debug!(
3,889,431✔
1019
                "{:?}: {}: getchunks_begin: no chunks prioritized",
1020
                network.get_local_peer(),
×
1021
                &self.smart_contract_id
×
1022
            );
1023
            return Ok(true);
3,889,431✔
1024
        }
280,332✔
1025

1026
        let mut cur_priority = self.next_chunk_fetch_priority % self.chunk_fetch_priorities.len();
280,332✔
1027

1028
        debug!(
280,332✔
1029
            "{:?}: {}: getchunks_begin: Issue up to {} StackerDBGetChunk requests",
1030
            &network.get_local_peer(),
×
1031
            &self.smart_contract_id,
×
1032
            self.request_capacity;
1033
            "chunk_fetch_priorities" => ?self.chunk_fetch_priorities,
1034
        );
1035

1036
        let mut requested = 0;
280,332✔
1037
        let mut unpin = HashSet::new();
280,332✔
1038

1039
        // fill up our comms with $capacity requests
1040
        for _i in 0..self.request_capacity {
1,681,992✔
1041
            if self.comms.count_inflight() >= self.request_capacity {
1,681,992✔
1042
                break;
×
1043
            }
1,681,992✔
1044
            let cur_fetch_priority = self
1,681,992✔
1045
                .chunk_fetch_priorities
1,681,992✔
1046
                .get_mut(cur_priority)
1,681,992✔
1047
                .ok_or_else(|| {
1,681,992✔
1048
                    error!(
×
1049
                        "Error setting chunk fetch priories. Priority index out of bounds";
1050
                        "cur_priority" => cur_priority,
×
1051
                    );
1052
                    net_error::InvalidState
×
1053
                })?;
×
1054

1055
            let chunk_request = cur_fetch_priority.0.clone();
1,681,992✔
1056
            let selected_neighbor_opt = cur_fetch_priority
1,681,992✔
1057
                .1
1,681,992✔
1058
                .iter()
1,681,992✔
1059
                .enumerate()
1,681,992✔
1060
                .find(|(_i, naddr)| !self.comms.has_inflight(naddr));
1,681,992✔
1061

1062
            let (idx, selected_neighbor) = if let Some(x) = selected_neighbor_opt {
1,681,992✔
1063
                x
80,181✔
1064
            } else {
1065
                continue;
1,601,811✔
1066
            };
1067

1068
            debug!(
80,181✔
1069
                "{:?}: {}: getchunks_begin: Send StackerDBGetChunk(id={},ver={}) at {} to {}",
1070
                &network.get_local_peer(),
×
1071
                &self.smart_contract_id,
×
1072
                chunk_request.slot_id,
1073
                chunk_request.slot_version,
1074
                &chunk_request.rc_consensus_hash,
×
1075
                &selected_neighbor
×
1076
            );
1077

1078
            if let Err(e) = self.comms.neighbor_send(
80,181✔
1079
                network,
80,181✔
1080
                selected_neighbor,
80,181✔
1081
                StacksMessageType::StackerDBGetChunk(chunk_request.clone()),
80,181✔
1082
            ) {
80,181✔
1083
                info!(
162✔
1084
                    "{:?}: {} Failed to request chunk {} from {:?}: {:?}",
1085
                    network.get_local_peer(),
162✔
1086
                    &self.smart_contract_id,
162✔
1087
                    chunk_request.slot_id,
1088
                    selected_neighbor,
1089
                    &e
162✔
1090
                );
1091
                unpin.insert(selected_neighbor.clone());
162✔
1092
                continue;
162✔
1093
            }
80,019✔
1094

1095
            requested += 1;
80,019✔
1096

1097
            // don't ask this neighbor again
1098
            cur_fetch_priority.1.remove(idx);
80,019✔
1099

1100
            // next-prioritized chunk
1101
            cur_priority = (cur_priority + 1) % self.chunk_fetch_priorities.len();
80,019✔
1102
        }
1103
        let _ = unpin
280,332✔
1104
            .into_iter()
280,332✔
1105
            .map(|naddr| self.unpin_connected_replica(network, &naddr));
280,332✔
1106

1107
        if requested == 0 && self.comms.count_inflight() == 0 {
280,332✔
1108
            return Err(net_error::PeerNotConnected(format!(
639✔
1109
                "StackerDB getchunks_begin: no chunks to request"
639✔
1110
            )));
639✔
1111
        }
279,693✔
1112

1113
        self.next_chunk_fetch_priority = cur_priority;
279,693✔
1114

1115
        Ok(self.chunk_fetch_priorities.is_empty())
279,693✔
1116
    }
4,169,763✔
1117

1118
    /// Collect chunk replies from neighbors
1119
    /// Returns Ok(true) if all inflight messages have been received (or dealt with)
1120
    /// Returns Ok(false) otherwise
1121
    pub fn getchunks_try_finish(
4,169,124✔
1122
        &mut self,
4,169,124✔
1123
        network: &mut PeerNetwork,
4,169,124✔
1124
        config: &StackerDBConfig,
4,169,124✔
1125
    ) -> Result<bool, net_error> {
4,169,124✔
1126
        for (naddr, message) in self.comms.collect_replies(network).into_iter() {
4,169,124✔
1127
            let data = match message.payload {
79,587✔
1128
                StacksMessageType::StackerDBChunk(data) => data,
78,885✔
1129
                StacksMessageType::Nack(data) => {
702✔
1130
                    debug!(
702✔
1131
                        "{:?}: {}: remote peer {:?} NACK'ed our StackerDBGetChunk with code {}",
1132
                        network.get_local_peer(),
×
1133
                        &self.smart_contract_id,
×
1134
                        &naddr,
×
1135
                        data.error_code
1136
                    );
1137
                    if data.error_code == NackErrorCodes::StaleView
702✔
1138
                        || data.error_code == NackErrorCodes::FutureView
549✔
1139
                    {
153✔
1140
                        self.stale_neighbors.insert(naddr);
153✔
1141
                    } else if data.error_code == NackErrorCodes::StaleVersion {
621✔
1142
                        // try again immediately, without throttling
531✔
1143
                        self.stale_inv = true;
531✔
1144
                    }
540✔
1145
                    continue;
702✔
1146
                }
1147
                x => {
×
1148
                    info!(
×
1149
                        "{:?}: {}: Received unexpected message {:?}",
1150
                        network.get_local_peer(),
×
1151
                        &self.smart_contract_id,
×
1152
                        &x
×
1153
                    );
1154
                    self.unpin_connected_replica(network, &naddr);
×
1155
                    continue;
×
1156
                }
1157
            };
1158

1159
            // validate
1160
            if !self.validate_downloaded_chunk(network, config, &data)? {
78,885✔
1161
                info!(
×
1162
                    "{:?}: {}: Remote neighbor {:?} served an invalid chunk for ID {}",
1163
                    network.get_local_peer(),
×
1164
                    &self.smart_contract_id,
×
1165
                    &naddr,
×
1166
                    data.slot_id
1167
                );
1168
                self.unpin_connected_replica(network, &naddr);
×
1169
                continue;
×
1170
            }
78,885✔
1171

1172
            // update bookkeeping
1173
            debug!(
78,885✔
1174
                "{:?}: {}, getchunks_try_finish: Received StackerDBChunk from {:?}",
1175
                network.get_local_peer(),
×
1176
                &self.smart_contract_id,
×
1177
                &naddr
×
1178
            );
1179
            self.add_downloaded_chunk(naddr, data);
78,885✔
1180
        }
1181

1182
        Ok(self.comms.count_inflight() == 0)
4,169,124✔
1183
    }
4,169,124✔
1184

1185
    /// Push out chunks to peers
1186
    /// Returns true if there are no more chunks to push.
1187
    /// Returns false if there are
1188
    pub fn pushchunks_begin(&mut self, network: &mut PeerNetwork) -> Result<bool, net_error> {
4,089,861✔
1189
        if self.chunk_push_priorities.is_empty() && self.push_round != self.rounds {
4,089,861✔
1190
            // only do this once per round
1191
            let priorities = self.make_chunk_push_schedule(network)?;
3,829,959✔
1192
            self.chunk_push_priorities = priorities;
3,829,959✔
1193
            self.push_round = self.rounds;
3,829,959✔
1194
        }
259,902✔
1195
        if self.chunk_push_priorities.is_empty() {
4,089,861✔
1196
            // done
1197
            debug!(
3,795,579✔
1198
                "{:?}:{}: pushchunks_begin: no chunks prioritized",
1199
                network.get_local_peer(),
×
1200
                &self.smart_contract_id
×
1201
            );
1202
            return Ok(true);
3,795,579✔
1203
        }
294,282✔
1204

1205
        let mut cur_priority = self.next_chunk_push_priority % self.chunk_push_priorities.len();
294,282✔
1206

1207
        debug!(
294,282✔
1208
            "{:?}: {}: pushchunks_begin: Send up to {} StackerDBChunk pushes",
1209
            &network.get_local_peer(),
×
1210
            &self.smart_contract_id,
×
1211
            self.chunk_push_priorities.len();
×
1212
            "chunk_push_priorities" => ?self.chunk_push_priorities
1213
        );
1214

1215
        // fill up our comms with $capacity requests
1216
        let mut num_sent = 0;
294,282✔
1217
        for _i in 0..self.chunk_push_priorities.len() {
533,952✔
1218
            if self.comms.count_inflight() >= self.request_capacity {
533,952✔
1219
                break;
×
1220
            }
533,952✔
1221
            let cur_push_priority = self
533,952✔
1222
                .chunk_push_priorities
533,952✔
1223
                .get_mut(cur_priority)
533,952✔
1224
                .ok_or_else(|| {
533,952✔
1225
                    error!(
×
1226
                        "Error setting chunk push priories. Priority index out of bounds";
1227
                        "cur_priority" => cur_priority,
×
1228
                    );
1229
                    net_error::InvalidState
×
1230
                })?;
×
1231

1232
            let chunk_push = cur_push_priority.0.clone();
533,952✔
1233
            // try the first neighbor in the chunk_push_priorities list
1234
            let selected_neighbor_opt = cur_push_priority.1.first().map(|neighbor| (0, neighbor));
533,952✔
1235

1236
            let Some((idx, selected_neighbor)) = selected_neighbor_opt else {
533,952✔
1237
                debug!("{:?}: {}: pushchunks_begin: no available neighbor to send StackerDBChunk(id={},ver={}) to",
427,077✔
1238
                    &network.get_local_peer(),
×
1239
                    &self.smart_contract_id,
×
1240
                    chunk_push.chunk_data.slot_id,
1241
                    chunk_push.chunk_data.slot_version,
1242
                );
1243

1244
                // next-prioritized chunk
1245
                cur_priority = (cur_priority + 1) % self.chunk_push_priorities.len();
427,077✔
1246
                continue;
427,077✔
1247
            };
1248

1249
            debug!(
106,875✔
1250
                "{:?}: {}: pushchunks_begin: Send StackerDBChunk(id={},ver={}) at {} to {}",
1251
                &network.get_local_peer(),
×
1252
                &self.smart_contract_id,
×
1253
                chunk_push.chunk_data.slot_id,
1254
                chunk_push.chunk_data.slot_version,
1255
                &chunk_push.rc_consensus_hash,
×
1256
                &selected_neighbor
×
1257
            );
1258

1259
            let slot_id = chunk_push.chunk_data.slot_id;
106,875✔
1260
            let slot_version = chunk_push.chunk_data.slot_version;
106,875✔
1261
            if let Err(e) = self.comms.neighbor_send(
106,875✔
1262
                network,
106,875✔
1263
                selected_neighbor,
106,875✔
1264
                StacksMessageType::StackerDBPushChunk(chunk_push),
106,875✔
1265
            ) {
106,875✔
1266
                info!(
117✔
1267
                    "{:?}: {}: Failed to send chunk {} from {:?}: {:?}",
1268
                    network.get_local_peer(),
117✔
1269
                    &self.smart_contract_id,
117✔
1270
                    slot_id,
1271
                    selected_neighbor,
1272
                    &e
117✔
1273
                );
1274
                continue;
117✔
1275
            }
106,758✔
1276

1277
            // record what we just sent
1278
            self.chunk_push_receipts
106,758✔
1279
                .insert(selected_neighbor.clone(), (slot_id, slot_version));
106,758✔
1280

1281
            // don't send to this neighbor again
1282
            cur_push_priority.1.remove(idx);
106,758✔
1283

1284
            // next-prioritized chunk
1285
            cur_priority = (cur_priority + 1) % self.chunk_push_priorities.len();
106,758✔
1286

1287
            num_sent += 1;
106,758✔
1288
            if num_sent > self.request_capacity {
106,758✔
1289
                break;
×
1290
            }
106,758✔
1291
        }
1292
        self.next_chunk_push_priority = cur_priority;
294,282✔
1293
        Ok(self
294,282✔
1294
            .chunk_push_priorities
294,282✔
1295
            .iter()
294,282✔
1296
            .fold(0usize, |acc, (_chunk, num_naddrs)| {
533,952✔
1297
                acc.saturating_add(num_naddrs.len())
533,952✔
1298
            })
533,952✔
1299
            == 0)
1300
    }
4,089,861✔
1301

1302
    /// Collect push-chunk replies from neighbors.
1303
    /// If a remote neighbor replies with a chunk-inv for a pushed chunk which contains newer data
1304
    /// than we have, then set `self.need_resync` to true.
1305
    /// Returns true if all inflight messages have been received (or dealt with)
1306
    /// Returns false otherwise
1307
    pub fn pushchunks_try_finish(&mut self, network: &mut PeerNetwork) -> bool {
4,089,861✔
1308
        for (naddr, message) in self.comms.collect_replies(network).into_iter() {
4,089,861✔
1309
            let new_chunk_inv = match message.payload {
59,472✔
1310
                StacksMessageType::StackerDBChunkInv(data) => data,
59,472✔
1311
                StacksMessageType::Nack(data) => {
×
1312
                    debug!(
×
1313
                        "{:?}: {}: remote peer {:?} NACK'ed our StackerDBChunk with code {}",
1314
                        network.get_local_peer(),
×
1315
                        &self.smart_contract_id,
×
1316
                        &naddr,
×
1317
                        data.error_code
1318
                    );
1319
                    if data.error_code == NackErrorCodes::StaleView
×
1320
                        || data.error_code == NackErrorCodes::FutureView
×
1321
                    {
×
1322
                        self.stale_neighbors.insert(naddr);
×
1323
                    }
×
1324
                    continue;
×
1325
                }
1326
                x => {
×
1327
                    info!(
×
1328
                        "{:?}: {}: Received unexpected message {:?}",
1329
                        network.get_local_peer(),
×
1330
                        &self.smart_contract_id,
×
1331
                        &x
×
1332
                    );
1333
                    continue;
×
1334
                }
1335
            };
1336

1337
            // must be well-formed
1338
            if new_chunk_inv.slot_versions.len() != self.num_slots {
59,472✔
1339
                info!("{:?}: {}: Received malformed StackerDBChunkInv from {:?}: expected {} chunks, got {}", network.get_local_peer(), &self.smart_contract_id, &naddr, self.num_slots, new_chunk_inv.slot_versions.len());
×
1340
                continue;
×
1341
            }
59,472✔
1342

1343
            // update bookkeeping
1344
            debug!(
59,472✔
1345
                "{:?}: {}: pushchunks_try_finish: Received StackerDBChunkInv from {:?}",
1346
                network.get_local_peer(),
×
1347
                &self.smart_contract_id,
×
1348
                &naddr
×
1349
            );
1350

1351
            if let Some((slot_id, _)) = self.chunk_push_receipts.get(&naddr) {
59,472✔
1352
                self.need_resync = self.need_resync
59,472✔
1353
                    || self.add_pushed_chunk(network, naddr, new_chunk_inv, *slot_id);
59,472✔
1354
            }
×
1355
        }
1356

1357
        let inflight = self.comms.count_inflight();
4,089,861✔
1358
        debug!(
4,089,861✔
1359
            "{:?}: {}: inflight messages: {:?}",
1360
            network.get_local_peer(),
×
1361
            &self.smart_contract_id,
×
1362
            inflight
1363
        );
1364
        inflight == 0
4,089,861✔
1365
    }
4,089,861✔
1366

1367
    /// Recalculate the download schedule based on chunkinvs received on push
1368
    pub fn recalculate_chunk_request_schedule(
59,472✔
1369
        &mut self,
59,472✔
1370
        network: &PeerNetwork,
59,472✔
1371
    ) -> Result<(), net_error> {
59,472✔
1372
        // figure out the new expected versions
1373
        let mut expected_versions = vec![0u32; self.num_slots];
59,472✔
1374
        for (_, chunk_inv) in self.chunk_invs.iter() {
59,472✔
1375
            for (slot_version, expected_version) in chunk_inv
294,732✔
1376
                .slot_versions
59,472✔
1377
                .iter()
59,472✔
1378
                .zip(expected_versions.iter_mut())
59,472✔
1379
            {
294,732✔
1380
                *expected_version = (*slot_version).max(*expected_version);
294,732✔
1381
            }
294,732✔
1382
        }
1383

1384
        let priorities =
59,472✔
1385
            self.make_chunk_request_schedule(network, Some(expected_versions.clone()))?;
59,472✔
1386

1387
        self.chunk_fetch_priorities = priorities;
59,472✔
1388
        self.expected_versions = expected_versions;
59,472✔
1389
        Ok(())
59,472✔
1390
    }
59,472✔
1391

1392
    /// Forcibly wake up the state machine if it is throttled
1393
    pub fn wakeup(&mut self) {
2,221,776✔
1394
        debug!("wake up StackerDB sync for {}", &self.smart_contract_id);
2,221,776✔
1395
        self.last_run_ts = 0;
2,221,776✔
1396
    }
2,221,776✔
1397

1398
    /// Run the state machine.
1399
    /// If we run to completion, then reset and return the sync result.
1400
    /// Otherwise, if there's still more work to do, then return None
1401
    pub fn run(
222,408,225✔
1402
        &mut self,
222,408,225✔
1403
        network: &mut PeerNetwork,
222,408,225✔
1404
        config: &StackerDBConfig,
222,408,225✔
1405
    ) -> Result<Option<StackerDBSyncResult>, net_error> {
222,408,225✔
1406
        if network.get_connection_opts().disable_stackerdb_sync {
222,408,225✔
1407
            test_debug!(
×
1408
                "{:?}: stacker DB sync is disabled",
1409
                network.get_local_peer()
×
1410
            );
1411
            return Ok(None);
×
1412
        }
222,408,225✔
1413

1414
        // make sure we have an up-to-date chain view.
1415
        // If not, then abort and immediately retry the sync (since any queued messages we have are
1416
        // likely gonna fail)
1417
        if let Some(rc_consensus_hash) = self.rc_consensus_hash.as_ref() {
222,408,225✔
1418
            if network.get_chain_view().rc_consensus_hash != *rc_consensus_hash {
211,912,371✔
1419
                debug!("{:?}: {}: Resetting and restarting running StackerDB sync due to chain view change", network.get_local_peer(), &self.smart_contract_id);
1,765,917✔
1420
                let result = self.reset(Some(network), config);
1,765,917✔
1421
                self.state = StackerDBSyncState::ConnectBegin;
1,765,917✔
1422
                self.rc_consensus_hash = Some(network.get_chain_view().rc_consensus_hash.clone());
1,765,917✔
1423
                self.wakeup();
1,765,917✔
1424
                return Ok(Some(result));
1,765,917✔
1425
            }
210,146,454✔
1426
        } else {
10,495,854✔
1427
            self.rc_consensus_hash = Some(network.get_chain_view().rc_consensus_hash.clone());
10,495,854✔
1428
        }
10,495,854✔
1429

1430
        // throttle to write_freq
1431
        if self.last_run_ts + config.write_freq.max(1) > get_epoch_time_secs() {
220,642,308✔
1432
            debug!(
196,297,272✔
1433
                "{:?}: {}: stacker DB sync is throttled until {}",
1434
                network.get_local_peer(),
×
1435
                &self.smart_contract_id,
×
1436
                self.last_run_ts + config.write_freq
×
1437
            );
1438
            return Ok(None);
196,297,272✔
1439
        }
24,345,036✔
1440

1441
        loop {
1442
            debug!(
54,270,207✔
1443
                "{:?}: {}: stacker DB sync state is {:?}",
1444
                network.get_local_peer(),
×
1445
                &self.smart_contract_id,
×
1446
                &self.state
×
1447
            );
1448

1449
            let mut blocked = true;
54,270,207✔
1450
            match self.state {
54,270,207✔
1451
                StackerDBSyncState::ConnectBegin => {
1452
                    let done = match self.connect_begin(network) {
10,548,522✔
1453
                        Ok(done) => done,
3,918,024✔
1454
                        Err(net_error::NoSuchNeighbor) => {
1455
                            // nothing to do
1456
                            self.state = StackerDBSyncState::Finished;
6,630,498✔
1457
                            blocked = false;
6,630,498✔
1458
                            false
6,630,498✔
1459
                        }
1460
                        Err(e) => {
×
1461
                            return Err(e);
×
1462
                        }
1463
                    };
1464
                    if done {
10,548,522✔
1465
                        self.state = StackerDBSyncState::ConnectFinish;
3,895,191✔
1466
                        blocked = false;
3,895,191✔
1467
                    }
9,772,290✔
1468
                }
1469
                StackerDBSyncState::ConnectFinish => {
1470
                    let done = self.connect_try_finish(network)?;
4,084,992✔
1471
                    if done {
4,084,992✔
1472
                        self.state = StackerDBSyncState::GetChunksInvBegin;
3,894,948✔
1473
                        blocked = false;
3,894,948✔
1474
                    }
3,941,676✔
1475
                }
1476
                StackerDBSyncState::GetChunksInvBegin => {
3,894,948✔
1477
                    // does not block
3,894,948✔
1478
                    self.getchunksinv_begin(network);
3,894,948✔
1479
                    self.state = StackerDBSyncState::GetChunksInvFinish;
3,894,948✔
1480
                    blocked = false;
3,894,948✔
1481
                }
3,894,948✔
1482
                StackerDBSyncState::GetChunksInvFinish => {
1483
                    let done = self.getchunksinv_try_finish(network)?;
17,022,060✔
1484
                    if done {
17,022,060✔
1485
                        self.state = StackerDBSyncState::GetChunks;
3,831,120✔
1486
                        blocked = false;
3,831,120✔
1487
                    }
13,190,940✔
1488
                }
1489
                StackerDBSyncState::GetChunks => {
1490
                    if network.get_connection_opts().disable_stackerdb_get_chunks {
4,169,763✔
1491
                        // fault injection -- force the system to rely exclusively on push-chunk
1492
                        // behavior
1493
                        self.state = StackerDBSyncState::PushChunks;
×
1494
                        continue;
×
1495
                    }
4,169,763✔
1496

1497
                    let requests_finished = self.getchunks_begin(network)?;
4,169,763✔
1498
                    let inflight_finished = self.getchunks_try_finish(network, config)?;
4,169,124✔
1499
                    let done = requests_finished && inflight_finished;
4,169,124✔
1500
                    if done {
4,169,124✔
1501
                        self.state = StackerDBSyncState::PushChunks;
3,889,431✔
1502
                        blocked = false;
3,889,431✔
1503
                    }
3,889,431✔
1504
                }
1505
                StackerDBSyncState::PushChunks => {
1506
                    let pushes_finished = self.pushchunks_begin(network)?;
4,089,861✔
1507
                    let inflight_finished = self.pushchunks_try_finish(network);
4,089,861✔
1508
                    let done = pushes_finished && inflight_finished;
4,089,861✔
1509
                    if done {
4,089,861✔
1510
                        if self.need_resync
3,889,035✔
1511
                            && !network.get_connection_opts().disable_stackerdb_get_chunks
59,472✔
1512
                        {
1513
                            // someone pushed newer chunk data to us, and getting chunks is
1514
                            // enabled, so immediately go request them
1515
                            debug!(
59,472✔
1516
                                "{:?}: {}: immediately retry StackerDB GetChunks due to PushChunk NACK",
1517
                                network.get_local_peer(),
×
1518
                                &self.smart_contract_id
×
1519
                            );
1520
                            self.recalculate_chunk_request_schedule(network)?;
59,472✔
1521
                            self.state = StackerDBSyncState::GetChunks;
59,472✔
1522
                        } else {
3,829,563✔
1523
                            // done syncing
3,829,563✔
1524
                            self.state = StackerDBSyncState::Finished;
3,829,563✔
1525
                        }
3,829,563✔
1526
                        self.need_resync = false;
3,889,035✔
1527
                        blocked = false;
3,889,035✔
1528
                    }
200,826✔
1529
                }
1530
                StackerDBSyncState::Finished => {
1531
                    let stale_inv = self.stale_inv;
10,460,061✔
1532

1533
                    let result = self.reset(Some(network), config);
10,460,061✔
1534
                    self.state = StackerDBSyncState::ConnectBegin;
10,460,061✔
1535

1536
                    if stale_inv {
10,460,061✔
1537
                        debug!(
×
1538
                            "{:?}: {}: immediately retry StackerDB sync due to stale inventory",
1539
                            network.get_local_peer(),
×
1540
                            &self.smart_contract_id
×
1541
                        );
1542
                        self.wakeup();
×
1543
                    }
10,460,061✔
1544
                    return Ok(Some(result));
10,460,061✔
1545
                }
1546
            };
1547

1548
            if blocked {
43,809,507✔
1549
                return Ok(None);
13,884,336✔
1550
            }
29,925,171✔
1551
        }
1552
    }
222,408,225✔
1553
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc