• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

stacks-network / stacks-core / 26250451051-1

21 May 2026 08:11PM UTC coverage: 85.585% (-0.1%) from 85.712%
26250451051-1

Pull #7215

github

ec9d4c
web-flow
Merge 9487bf852 into af1280aac
Pull Request #7215: Chore: fix flake in non_blocking_minority_configured_to_favour_...

188844 of 220651 relevant lines covered (85.58%)

18975267.44 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.2
/stackslib/src/net/stackerdb/sync.rs
1
// Copyright (C) 2013-2020 Blockstack PBC, a public benefit corporation
2
// Copyright (C) 2020-2023 Stacks Open Internet Foundation
3
//
4
// This program is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8
//
9
// This program is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13
//
14
// You should have received a copy of the GNU General Public License
15
// along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

17
use std::collections::{HashMap, HashSet};
18
use std::mem;
19

20
use clarity::vm::types::QualifiedContractIdentifier;
21
use rand::prelude::SliceRandom;
22
use rand::{thread_rng, Rng};
23
use stacks_common::types::chainstate::ConsensusHash;
24
use stacks_common::util::get_epoch_time_secs;
25

26
use crate::net::db::PeerDB;
27
use crate::net::neighbors::comms::ToNeighborKey;
28
use crate::net::neighbors::NeighborComms;
29
use crate::net::p2p::PeerNetwork;
30
use crate::net::stackerdb::{
31
    StackerDBConfig, StackerDBSync, StackerDBSyncResult, StackerDBSyncState, StackerDBs,
32
};
33
use crate::net::{
34
    Error as net_error, NackErrorCodes, NeighborAddress, StackerDBChunkData, StackerDBChunkInvData,
35
    StackerDBGetChunkData, StackerDBGetChunkInvData, StackerDBPushChunkData, StacksMessageType,
36
};
37

38
const MAX_CHUNKS_IN_FLIGHT: usize = 6;
39
const MAX_DB_NEIGHBORS: usize = 32;
40

41
impl<NC: NeighborComms> StackerDBSync<NC> {
42
    pub fn new(
7,735✔
43
        smart_contract: QualifiedContractIdentifier,
7,735✔
44
        config: &StackerDBConfig,
7,735✔
45
        comms: NC,
7,735✔
46
        stackerdbs: StackerDBs,
7,735✔
47
    ) -> StackerDBSync<NC> {
7,735✔
48
        let mut dbsync = StackerDBSync {
7,735✔
49
            state: StackerDBSyncState::ConnectBegin,
7,735✔
50
            rc_consensus_hash: None,
7,735✔
51
            smart_contract_id: smart_contract,
7,735✔
52
            num_slots: config.num_slots() as usize,
7,735✔
53
            write_freq: config.write_freq,
7,735✔
54
            chunk_invs: HashMap::new(),
7,735✔
55
            chunk_fetch_priorities: vec![],
7,735✔
56
            chunk_push_priorities: vec![],
7,735✔
57
            chunk_push_receipts: HashMap::new(),
7,735✔
58
            next_chunk_fetch_priority: 0,
7,735✔
59
            next_chunk_push_priority: 0,
7,735✔
60
            expected_versions: vec![],
7,735✔
61
            downloaded_chunks: HashMap::new(),
7,735✔
62
            replicas: HashSet::new(),
7,735✔
63
            connected_replicas: HashSet::new(),
7,735✔
64
            comms,
7,735✔
65
            stackerdbs,
7,735✔
66
            request_capacity: MAX_CHUNKS_IN_FLIGHT,
7,735✔
67
            max_neighbors: MAX_DB_NEIGHBORS,
7,735✔
68
            total_stored: 0,
7,735✔
69
            total_pushed: 0,
7,735✔
70
            last_run_ts: 0,
7,735✔
71
            need_resync: false,
7,735✔
72
            stale_inv: false,
7,735✔
73
            stale_neighbors: HashSet::new(),
7,735✔
74
            num_connections: 0,
7,735✔
75
            num_attempted_connections: 0,
7,735✔
76
            rounds: 0,
7,735✔
77
            push_round: 0,
7,735✔
78
            last_eviction_time: get_epoch_time_secs(),
7,735✔
79
        };
7,735✔
80
        dbsync.reset(None, config);
7,735✔
81
        dbsync
7,735✔
82
    }
7,735✔
83

84
    /// Find stackerdb replicas and apply filtering rules
85
    fn find_qualified_replicas(
19,234,664✔
86
        &self,
19,234,664✔
87
        network: &PeerNetwork,
19,234,664✔
88
    ) -> Result<HashSet<NeighborAddress>, net_error> {
19,234,664✔
89
        let mut found = HashSet::new();
19,234,664✔
90
        let mut min_age =
19,234,664✔
91
            get_epoch_time_secs().saturating_sub(network.get_connection_opts().max_neighbor_age);
19,234,664✔
92

93
        let local_naddr = network.get_local_peer().to_neighbor_addr();
19,234,664✔
94

95
        while found.len() < self.max_neighbors {
38,469,328✔
96
            let peers_iter = PeerDB::find_stacker_db_replicas(
38,469,328✔
97
                network.peerdb_conn(),
38,469,328✔
98
                network.get_local_peer().network_id,
38,469,328✔
99
                &self.smart_contract_id,
38,469,328✔
100
                min_age,
38,469,328✔
101
                self.max_neighbors,
38,469,328✔
102
            )?
×
103
            .into_iter()
38,469,328✔
104
            .map(|neighbor| {
38,469,342✔
105
                (
8,695,462✔
106
                    NeighborAddress::from_neighbor(&neighbor),
8,695,462✔
107
                    neighbor.last_contact_time,
8,695,462✔
108
                )
8,695,462✔
109
            })
8,695,462✔
110
            .filter(|(naddr, _)| {
38,469,342✔
111
                if naddr.addrbytes.is_anynet() {
8,695,462✔
112
                    return false;
×
113
                }
8,695,462✔
114
                if naddr.public_key_hash == local_naddr.public_key_hash {
8,695,462✔
115
                    // don't talk to us by another address
116
                    return false;
×
117
                }
8,695,462✔
118
                if !network.get_connection_opts().private_neighbors
8,695,462✔
119
                    && naddr.addrbytes.is_in_private_range()
×
120
                {
121
                    return false;
×
122
                }
8,695,462✔
123
                true
8,695,462✔
124
            });
8,695,462✔
125

126
            for (peer, last_contact) in peers_iter {
38,469,342✔
127
                found.insert(peer);
8,695,462✔
128
                if found.len() >= self.max_neighbors {
8,695,462✔
129
                    break;
×
130
                }
8,695,462✔
131
                min_age = min_age.min(last_contact);
8,695,462✔
132
            }
133

134
            // search for older neighbors
135
            if min_age > 1 {
38,469,328✔
136
                min_age = 1;
19,234,664✔
137
            } else if min_age <= 1 {
19,234,664✔
138
                break;
19,234,664✔
139
            }
×
140
        }
141
        Ok(found)
19,234,664✔
142
    }
19,234,664✔
143

144
    /// Calculate the new set of replicas to contact.
145
    /// This is the same as the set that was connected on the last sync, plus any
146
    /// config hints and discovered nodes from the DB.
147
    fn find_new_replicas(
12,393,837✔
148
        &self,
12,393,837✔
149
        mut connected_replicas: HashSet<NeighborAddress>,
12,393,837✔
150
        network: Option<&PeerNetwork>,
12,393,837✔
151
        config: &StackerDBConfig,
12,393,837✔
152
    ) -> Result<HashSet<NeighborAddress>, net_error> {
12,393,837✔
153
        // keep all connected replicas, and replenish from config hints and the DB as needed
154
        let mut peers = config.hint_replicas.clone();
12,393,837✔
155
        if let Some(network) = network {
12,393,837✔
156
            let extra_peers = self.find_qualified_replicas(network)?;
12,332,956✔
157
            peers.extend(extra_peers);
12,332,956✔
158
        }
60,881✔
159

160
        peers.shuffle(&mut thread_rng());
12,393,837✔
161

162
        for peer in peers {
12,393,837✔
163
            if connected_replicas.len() >= config.max_neighbors {
4,272,806✔
164
                break;
×
165
            }
4,272,806✔
166
            connected_replicas.insert(peer);
4,272,806✔
167
        }
168
        Ok(connected_replicas)
12,393,837✔
169
    }
12,393,837✔
170

171
    /// Reset this state machine, and get the StackerDBSyncResult with newly-obtained chunk data
172
    /// and newly-learned information about connection statistics
173
    pub fn reset(
12,393,837✔
174
        &mut self,
12,393,837✔
175
        network: Option<&PeerNetwork>,
12,393,837✔
176
        config: &StackerDBConfig,
12,393,837✔
177
    ) -> StackerDBSyncResult {
12,393,837✔
178
        debug!(
12,393,837✔
179
            "{}: Reset with config {:?}",
180
            &self.smart_contract_id, config
×
181
        );
182
        let mut chunks = vec![];
12,393,837✔
183
        let downloaded_chunks = mem::replace(&mut self.downloaded_chunks, HashMap::new());
12,393,837✔
184
        for (_, mut data) in downloaded_chunks.into_iter() {
12,393,837✔
185
            chunks.append(&mut data);
50,022✔
186
        }
50,022✔
187

188
        let chunk_invs = mem::replace(&mut self.chunk_invs, HashMap::new());
12,393,837✔
189
        let result = StackerDBSyncResult {
12,393,837✔
190
            contract_id: self.smart_contract_id.clone(),
12,393,837✔
191
            chunk_invs,
12,393,837✔
192
            chunks_to_store: chunks,
12,393,837✔
193
            stale: std::mem::replace(&mut self.stale_neighbors, HashSet::new()),
12,393,837✔
194
            num_connections: self.num_connections,
12,393,837✔
195
            num_attempted_connections: self.num_attempted_connections,
12,393,837✔
196
        };
12,393,837✔
197

198
        // keep all connected replicas, and replenish from config hints and the DB as needed
199
        let connected_replicas = mem::replace(&mut self.connected_replicas, HashSet::new());
12,393,837✔
200
        let next_connected_replicas =
12,393,837✔
201
            if let Ok(new_replicas) = self.find_new_replicas(connected_replicas, network, config) {
12,393,837✔
202
                new_replicas
12,393,837✔
203
            } else {
204
                self.replicas.clone()
×
205
            };
206

207
        self.replicas = next_connected_replicas;
12,393,837✔
208

209
        self.chunk_fetch_priorities.clear();
12,393,837✔
210
        self.chunk_push_priorities.clear();
12,393,837✔
211
        self.next_chunk_fetch_priority = 0;
12,393,837✔
212
        self.next_chunk_push_priority = 0;
12,393,837✔
213
        self.chunk_push_receipts.clear();
12,393,837✔
214
        self.expected_versions.clear();
12,393,837✔
215
        self.downloaded_chunks.clear();
12,393,837✔
216

217
        // reset comms, but keep all connected replicas pinned.
218
        // Randomly evict one every so often.
219
        self.comms.reset();
12,393,837✔
220
        if let Some(network) = network {
12,393,837✔
221
            let mut eviction_index = None;
12,332,956✔
222
            if self.last_eviction_time + 60 < get_epoch_time_secs() {
12,332,956✔
223
                self.last_eviction_time = get_epoch_time_secs();
157,932✔
224
                if !self.replicas.is_empty() {
157,932✔
225
                    eviction_index = Some(thread_rng().gen_range(0..self.replicas.len()));
58,302✔
226
                }
147,483✔
227
            }
12,175,024✔
228

229
            let remove_naddr = eviction_index.and_then(|idx| {
12,332,956✔
230
                let removed = self.replicas.iter().nth(idx).cloned();
58,302✔
231
                if let Some(naddr) = removed.as_ref() {
58,302✔
232
                    debug!(
58,302✔
233
                        "{:?}: {}: don't reuse connection for replica {:?}",
234
                        network.get_local_peer(),
×
235
                        &self.smart_contract_id,
×
236
                        &naddr,
×
237
                    );
238
                }
×
239
                removed
58,302✔
240
            });
58,302✔
241

242
            if let Some(naddr) = remove_naddr {
12,332,956✔
243
                self.replicas.remove(&naddr);
58,302✔
244
            }
12,274,654✔
245

246
            // retain the remaining replica connections
247
            for naddr in self.replicas.iter() {
12,332,963✔
248
                if let Some(event_id) = network.get_event_id(&naddr.to_neighbor_key(network)) {
4,238,858✔
249
                    self.comms.pin_connection(event_id);
4,202,237✔
250
                    debug!(
4,202,237✔
251
                        "{:?}: {}: reuse connection for replica {:?} on event {}",
252
                        network.get_local_peer(),
×
253
                        &self.smart_contract_id,
×
254
                        &naddr,
×
255
                        event_id
256
                    );
257
                }
36,621✔
258
            }
259
        }
60,881✔
260

261
        // reload from config
262
        self.num_slots = config.num_slots() as usize;
12,393,837✔
263
        self.write_freq = config.write_freq;
12,393,837✔
264

265
        self.need_resync = false;
12,393,837✔
266
        self.stale_inv = false;
12,393,837✔
267
        self.last_run_ts = get_epoch_time_secs();
12,393,837✔
268

269
        self.state = StackerDBSyncState::ConnectBegin;
12,393,837✔
270
        self.num_connections = 0;
12,393,837✔
271
        self.num_attempted_connections = 0;
12,393,837✔
272
        self.rounds += 1;
12,393,837✔
273
        self.rc_consensus_hash = None;
12,393,837✔
274
        result
12,393,837✔
275
    }
12,393,837✔
276

277
    /// Get the set of connection IDs in use
278
    pub fn get_pinned_connections(&self) -> &HashSet<usize> {
70,667,018✔
279
        self.comms.get_pinned_connections()
70,667,018✔
280
    }
70,667,018✔
281

282
    /// Unpin and remove a connected replica by naddr
283
    pub fn unpin_connected_replica(&mut self, network: &PeerNetwork, naddr: &NeighborAddress) {
×
284
        let nk = naddr.to_neighbor_key(network);
×
285
        if let Some(event_id) = network.get_event_id(&nk) {
×
286
            self.comms.unpin_connection(event_id);
×
287
        }
×
288
        self.connected_replicas.remove(naddr);
×
289
    }
×
290

291
    /// Make a chunk inv request
292
    pub fn make_getchunkinv(&self, rc_consensus_hash: &ConsensusHash) -> StacksMessageType {
3,894,191✔
293
        StacksMessageType::StackerDBGetChunkInv(StackerDBGetChunkInvData {
3,894,191✔
294
            contract_id: self.smart_contract_id.clone(),
3,894,191✔
295
            rc_consensus_hash: rc_consensus_hash.clone(),
3,894,191✔
296
        })
3,894,191✔
297
    }
3,894,191✔
298

299
    /// Given the downloaded set of chunk inventories, identify:
300
    /// * which chunks we need to fetch, because they're newer than ours.
301
    /// * what order to fetch chunks in, in rarest-first order
302
    /// Returns a list of (chunk requests, list of neighbors that can service them), which is
303
    /// ordered from rarest chunk to most-common chunk.
304
    pub fn make_chunk_request_schedule(
3,844,157✔
305
        &self,
3,844,157✔
306
        network: &PeerNetwork,
3,844,157✔
307
        local_slot_versions_opt: Option<Vec<u32>>,
3,844,157✔
308
    ) -> Result<Vec<(StackerDBGetChunkData, Vec<NeighborAddress>)>, net_error> {
3,844,157✔
309
        let rc_consensus_hash = network.get_chain_view().rc_consensus_hash.clone();
3,844,157✔
310
        let local_slot_versions = if let Some(local_slot_versions) = local_slot_versions_opt {
3,844,157✔
311
            local_slot_versions
62,005✔
312
        } else {
313
            self.stackerdbs.get_slot_versions(&self.smart_contract_id)?
3,782,152✔
314
        };
315

316
        let local_write_timestamps = self
3,844,157✔
317
            .stackerdbs
3,844,157✔
318
            .get_slot_write_timestamps(&self.smart_contract_id)?;
3,844,157✔
319

320
        if local_slot_versions.len() != local_write_timestamps.len() {
3,844,157✔
321
            let msg = format!("{}: Local slot versions ({}) out of sync with DB slot versions ({}); abandoning sync and trying again", &self.smart_contract_id, local_slot_versions.len(), local_write_timestamps.len());
×
322
            warn!("{}", &msg);
×
323
            return Err(net_error::Transient(msg));
×
324
        }
3,844,157✔
325

326
        let mut need_chunks: HashMap<usize, (StackerDBGetChunkData, Vec<NeighborAddress>)> =
3,844,157✔
327
            HashMap::new();
3,844,157✔
328
        let now = get_epoch_time_secs();
3,844,157✔
329

330
        // who has data we need?
331
        for ((i, local_version), write_ts) in local_slot_versions
9,517,820✔
332
            .iter()
3,844,157✔
333
            .enumerate()
3,844,157✔
334
            .zip(local_write_timestamps.iter())
3,844,157✔
335
        {
336
            if self.write_freq > 0 && write_ts + self.write_freq > now {
9,396,644✔
337
                debug!(
×
338
                    "{:?}: {}: Chunk {} was written too frequently ({} + {} > {}) in {}, so will not fetch chunk",
339
                    network.get_local_peer(),
×
340
                    &self.smart_contract_id,
×
341
                    i,
342
                    write_ts,
343
                    self.write_freq,
344
                    now,
345
                    &self.smart_contract_id,
×
346
                );
347
                continue;
×
348
            }
9,396,644✔
349

350
            for (naddr, chunk_inv) in self.chunk_invs.iter() {
9,396,674✔
351
                if chunk_inv.slot_versions.len() != local_slot_versions.len() {
8,630,412✔
352
                    // remote peer and our DB are out of sync, so just skip this
353
                    continue;
468✔
354
                }
8,629,944✔
355

356
                let Some(remote_version) = chunk_inv.slot_versions.get(i) else {
8,629,944✔
357
                    // remote peer isn't tracking this chunk
358
                    continue;
×
359
                };
360

361
                if local_version >= remote_version {
8,629,944✔
362
                    // remote peer has same view as local peer, or stale
363
                    continue;
8,551,993✔
364
                }
77,951✔
365

366
                let (request, available) = if let Some(x) = need_chunks.get_mut(&i) {
77,951✔
367
                    // someone has this chunk already
368
                    x
×
369
                } else {
370
                    // haven't seen anyone with this data yet.
371
                    // Add a record for it
372
                    need_chunks.insert(
77,951✔
373
                        i,
77,951✔
374
                        (
77,951✔
375
                            StackerDBGetChunkData {
77,951✔
376
                                contract_id: self.smart_contract_id.clone(),
77,951✔
377
                                rc_consensus_hash: rc_consensus_hash.clone(),
77,951✔
378
                                slot_id: i as u32,
77,951✔
379
                                slot_version: *remote_version,
77,951✔
380
                            },
77,951✔
381
                            vec![naddr.clone()],
77,951✔
382
                        ),
77,951✔
383
                    );
384
                    continue;
77,951✔
385
                };
386

387
                if request.slot_version < *remote_version {
×
388
                    // this peer has a newer view
×
389
                    available.clear();
×
390
                    available.push(naddr.clone());
×
391
                    *request = StackerDBGetChunkData {
×
392
                        contract_id: self.smart_contract_id.clone(),
×
393
                        rc_consensus_hash: rc_consensus_hash.clone(),
×
394
                        slot_id: i as u32,
×
395
                        slot_version: *remote_version,
×
396
                    };
×
397
                } else if request.slot_version == *remote_version {
×
398
                    // this peer has the same view as a prior peer.
×
399
                    // just track how many times we see this
×
400
                    available.push(naddr.clone());
×
401
                }
×
402
            }
403
        }
404

405
        // prioritize requests by rarest-chunk-first order, but choose neighbors in random order
406
        let mut schedule: Vec<_> = need_chunks
3,844,157✔
407
            .into_iter()
3,844,157✔
408
            .map(|(_, (stackerdb_getchunkdata, mut neighbors))| {
3,844,157✔
409
                neighbors.shuffle(&mut thread_rng());
77,951✔
410
                (stackerdb_getchunkdata, neighbors)
77,951✔
411
            })
77,951✔
412
            .collect();
3,844,157✔
413

414
        schedule.sort_by(|item_1, item_2| item_1.1.len().cmp(&item_2.1.len()));
3,844,157✔
415
        schedule.reverse();
3,844,157✔
416

417
        debug!(
3,844,157✔
418
            "{:?}: {}: Will request up to {} chunks. Schedule: {:?}",
419
            network.get_local_peer(),
×
420
            &self.smart_contract_id,
×
421
            &schedule.len(),
×
422
            &schedule
×
423
        );
424
        Ok(schedule)
3,844,157✔
425
    }
3,844,157✔
426

427
    /// Given the downloaded set of chunk inventories, identify:
428
    /// * which chunks we need to push, because we have them and the neighbor does not
429
    /// * what order to push them in, in rarest-first order
430
    pub fn make_chunk_push_schedule(
3,781,439✔
431
        &self,
3,781,439✔
432
        network: &PeerNetwork,
3,781,439✔
433
    ) -> Result<Vec<(StackerDBPushChunkData, Vec<NeighborAddress>)>, net_error> {
3,781,439✔
434
        let rc_consensus_hash = network.get_chain_view().rc_consensus_hash.clone();
3,781,439✔
435
        let local_slot_versions = self.stackerdbs.get_slot_versions(&self.smart_contract_id)?;
3,781,439✔
436

437
        let mut need_chunks: HashMap<usize, (StackerDBPushChunkData, Vec<NeighborAddress>)> =
3,781,439✔
438
            HashMap::new();
3,781,439✔
439

440
        // who needs data we can serve?
441
        for (i, local_version) in local_slot_versions.iter().enumerate() {
9,213,587✔
442
            let mut local_chunk = None;
9,086,012✔
443
            for (naddr, chunk_inv) in self.chunk_invs.iter() {
9,086,022✔
444
                if chunk_inv.slot_versions.len() != local_slot_versions.len() {
8,319,760✔
445
                    // remote peer and our DB are out of sync, so just skip this
446
                    continue;
468✔
447
                }
8,319,292✔
448

449
                let Some(remote_version) = chunk_inv.slot_versions.get(i) else {
8,319,292✔
450
                    // remote peer isn't tracking this chunk
451
                    continue;
×
452
                };
453

454
                if local_version <= remote_version {
8,319,292✔
455
                    // remote peer has same or newer view than local peer
456
                    continue;
8,206,972✔
457
                }
112,320✔
458

459
                if local_chunk.is_none() {
112,320✔
460
                    let chunk_data = if let Some(chunk_data) = self.stackerdbs.get_chunk(
112,320✔
461
                        &self.smart_contract_id,
112,320✔
462
                        i as u32,
112,320✔
463
                        *local_version,
112,320✔
464
                    )? {
×
465
                        chunk_data
112,320✔
466
                    } else {
467
                        // we don't have this chunk
468
                        break;
×
469
                    };
470
                    local_chunk = Some(StackerDBPushChunkData {
112,320✔
471
                        contract_id: self.smart_contract_id.clone(),
112,320✔
472
                        rc_consensus_hash: rc_consensus_hash.clone(),
112,320✔
473
                        chunk_data,
112,320✔
474
                    });
112,320✔
475
                }
×
476

477
                let our_chunk = if let Some(chunk) = local_chunk.as_ref() {
112,320✔
478
                    chunk
112,320✔
479
                } else {
480
                    // we don't have this chunk
481
                    break;
×
482
                };
483

484
                // replicate with probability 1/num-outbound-replicas
485
                let do_replicate = if chunk_inv.num_outbound_replicas == 0 {
112,320✔
486
                    true
23,302✔
487
                } else {
488
                    thread_rng().gen::<u32>() % chunk_inv.num_outbound_replicas == 0
89,018✔
489
                };
490

491
                debug!(
112,320✔
492
                    "{:?}: {}: Can push chunk StackerDBChunk(id={},ver={}) to {}. Replicate? {}",
493
                    &network.get_local_peer(),
×
494
                    &self.smart_contract_id,
×
495
                    our_chunk.chunk_data.slot_id,
496
                    our_chunk.chunk_data.slot_version,
497
                    &naddr,
×
498
                    do_replicate
499
                );
500

501
                if !do_replicate {
112,320✔
502
                    continue;
13✔
503
                }
112,307✔
504

505
                if let Some((_, receivers)) = need_chunks.get_mut(&i) {
112,307✔
506
                    // someone needs this chunk already
×
507
                    receivers.push(naddr.clone());
×
508
                } else {
112,307✔
509
                    // haven't seen anyone that needs this data yet.
112,307✔
510
                    // Add a record for it.
112,307✔
511
                    need_chunks.insert(i, (our_chunk.clone(), vec![naddr.clone()]));
112,307✔
512
                };
112,307✔
513
            }
514
        }
515

516
        // prioritize requests by rarest-chunk-first order.
517
        // no need to randomize; we'll pick recipients at random
518
        let mut schedule: Vec<_> = need_chunks
3,781,439✔
519
            .into_iter()
3,781,439✔
520
            .map(|(_, (stackerdb_chunkdata, neighbors))| (stackerdb_chunkdata, neighbors))
3,781,465✔
521
            .collect();
3,781,439✔
522

523
        schedule.sort_by(|item_1, item_2| item_1.1.len().cmp(&item_2.1.len()));
3,781,459✔
524
        debug!(
3,781,439✔
525
            "{:?}: {}: Will push up to {} chunks",
526
            network.get_local_peer(),
×
527
            &self.smart_contract_id,
×
528
            &schedule.len(),
×
529
        );
530
        Ok(schedule)
3,781,439✔
531
    }
3,781,439✔
532

533
    /// Validate a downloaded chunk
534
    pub fn validate_downloaded_chunk(
77,149✔
535
        &self,
77,149✔
536
        network: &PeerNetwork,
77,149✔
537
        config: &StackerDBConfig,
77,149✔
538
        data: &StackerDBChunkData,
77,149✔
539
    ) -> Result<bool, net_error> {
77,149✔
540
        // validate -- must be a valid chunk
541
        if !network.validate_received_chunk(
77,149✔
542
            &self.smart_contract_id,
77,149✔
543
            config,
77,149✔
544
            data,
77,149✔
545
            &self.expected_versions,
77,149✔
546
        )? {
×
547
            return Ok(false);
×
548
        }
77,149✔
549

550
        // no need to validate the timestamp, because we already skipped requesting it if it was
551
        // written too recently.
552

553
        Ok(true)
77,149✔
554
    }
77,149✔
555

556
    /// Store a downloaded chunk to RAM, and update bookkeeping
557
    pub fn add_downloaded_chunk(&mut self, naddr: NeighborAddress, data: StackerDBChunkData) {
77,149✔
558
        let slot_id = data.slot_id;
77,149✔
559
        let _slot_version = data.slot_version;
77,149✔
560

561
        if let Some(data_list) = self.downloaded_chunks.get_mut(&naddr) {
77,149✔
562
            data_list.push(data);
27,090✔
563
        } else {
50,779✔
564
            self.downloaded_chunks.insert(naddr.clone(), vec![data]);
50,059✔
565
        }
50,059✔
566

567
        self.chunk_fetch_priorities
77,149✔
568
            .retain(|(chunk, ..)| chunk.slot_id != slot_id);
112,852✔
569

570
        if !self.chunk_fetch_priorities.is_empty() {
77,149✔
571
            let next_chunk_fetch_priority =
27,369✔
572
                self.next_chunk_fetch_priority % self.chunk_fetch_priorities.len();
27,369✔
573
            self.next_chunk_fetch_priority = next_chunk_fetch_priority;
27,369✔
574
        }
50,680✔
575

576
        self.total_stored += 1;
77,149✔
577
    }
77,149✔
578

579
    /// Update bookkeeping about which chunks we have pushed.
580
    /// Stores the new chunk inventory to RAM.
581
    /// Returns true if the inventory changed (indicating that we need to resync)
582
    /// Returns false otherwise
583
    pub fn add_pushed_chunk(
62,005✔
584
        &mut self,
62,005✔
585
        _network: &PeerNetwork,
62,005✔
586
        naddr: NeighborAddress,
62,005✔
587
        new_inv: StackerDBChunkInvData,
62,005✔
588
        slot_id: u32,
62,005✔
589
    ) -> bool {
62,005✔
590
        // safety (should already be checked) -- don't accept if the size is wrong
591
        if new_inv.slot_versions.len() != self.num_slots {
62,005✔
592
            return false;
×
593
        }
62,005✔
594

595
        let need_resync = if let Some(old_inv) = self.chunk_invs.get(&naddr) {
62,005✔
596
            let mut resync = false;
62,005✔
597
            for (old_slot_id, (old_version, new_version)) in old_inv
148,443✔
598
                .slot_versions
62,005✔
599
                .iter()
62,005✔
600
                .zip(new_inv.slot_versions.iter())
62,005✔
601
                .enumerate()
62,005✔
602
            {
603
                if old_version < new_version {
148,443✔
604
                    // remote peer indicated that it has a newer version of this chunk.
605
                    debug!(
62,005✔
606
                        "{:?}: {}: peer {:?} has a newer version of slot {} ({} < {})",
607
                        _network.get_local_peer(),
×
608
                        &self.smart_contract_id,
×
609
                        &naddr,
×
610
                        old_slot_id,
611
                        old_version,
612
                        new_version,
613
                    );
614
                    resync = true;
62,005✔
615
                    break;
62,005✔
616
                }
86,438✔
617
            }
618
            resync
62,005✔
619
        } else {
620
            false
×
621
        };
622

623
        self.chunk_invs.insert(naddr, new_inv);
62,005✔
624

625
        self.chunk_push_priorities
62,005✔
626
            .retain(|(chunk, ..)| chunk.chunk_data.slot_id != slot_id);
111,516✔
627

628
        if !self.chunk_push_priorities.is_empty() {
62,005✔
629
            let next_chunk_push_priority =
35,797✔
630
                self.next_chunk_push_priority % self.chunk_push_priorities.len();
35,797✔
631
            self.next_chunk_push_priority = next_chunk_push_priority;
35,797✔
632
        }
39,928✔
633

634
        self.total_pushed += 1;
62,005✔
635
        need_resync
62,005✔
636
    }
62,005✔
637

638
    /// Ask inbound neighbors who replicate this DB for their chunk inventories.
639
    /// Don't send them a message if they're also outbound.
640
    /// Logs errors but does not return them.
641
    fn send_getchunkinv_to_inbound_neighbors(
3,874,277✔
642
        &mut self,
3,874,277✔
643
        network: &mut PeerNetwork,
3,874,277✔
644
        already_sent: &[NeighborAddress],
3,874,277✔
645
    ) {
3,874,277✔
646
        let sent_naddr_set: HashSet<_> = already_sent.iter().collect();
3,874,277✔
647
        let mut to_send = vec![];
3,874,277✔
648
        for event_id in network.iter_peer_event_ids() {
7,393,825✔
649
            let convo = if let Some(c) = network.get_p2p_convo(*event_id) {
7,393,825✔
650
                c
7,393,825✔
651
            } else {
652
                continue;
×
653
            };
654

655
            // only want inbound peers that replicate this DB
656
            if convo.is_outbound() {
7,393,825✔
657
                continue;
3,874,275✔
658
            }
3,519,550✔
659
            if !convo.replicates_stackerdb(&self.smart_contract_id) {
3,519,550✔
660
                continue;
2,321,497✔
661
            }
1,198,053✔
662

663
            let naddr = convo.to_neighbor_address();
1,198,053✔
664
            if sent_naddr_set.contains(&naddr) {
1,198,053✔
665
                continue;
19,260✔
666
            }
1,178,793✔
667

668
            let has_reciprocal_outbound = network
1,178,793✔
669
                .get_pubkey_events(&naddr.public_key_hash)
1,178,793✔
670
                .iter()
1,178,793✔
671
                .find(|event_id| {
1,803,060✔
672
                    if let Some(convo) = network.get_p2p_convo(**event_id) {
1,803,060✔
673
                        if !convo.is_outbound() {
1,803,060✔
674
                            return false;
624,267✔
675
                        }
1,178,793✔
676
                        let other_naddr = convo.to_neighbor_address();
1,178,793✔
677
                        if sent_naddr_set.contains(&other_naddr) {
1,178,793✔
678
                            return true;
1,174,518✔
679
                        }
4,275✔
680
                    }
×
681
                    return false;
4,275✔
682
                })
1,803,060✔
683
                .is_some();
1,178,793✔
684

685
            if has_reciprocal_outbound {
1,178,793✔
686
                // this inbound neighbor is also connected to us as an outbound neighbor, and we
687
                // already sent it a getchunkinv request
688
                continue;
1,174,518✔
689
            }
4,275✔
690

691
            let chunks_req = self.make_getchunkinv(&network.get_chain_view().rc_consensus_hash);
4,275✔
692
            to_send.push((naddr, chunks_req));
4,275✔
693
        }
694

695
        for (naddr, chunks_req) in to_send.into_iter() {
3,874,277✔
696
            debug!("{:?}: {}: send_getchunksinv_to_inbound_neighbors: Send StackerDBGetChunkInv at {} to inbound {:?}", network.get_local_peer(), &self.smart_contract_id, &network.get_chain_view().rc_consensus_hash, &naddr);
4,275✔
697
            if let Err(_e) = self.comms.neighbor_send(network, &naddr, chunks_req) {
4,275✔
698
                info!(
×
699
                    "{:?}: {}: Failed to send StackerDBGetChunkInv to inbound {:?}: {:?}",
700
                    network.get_local_peer(),
×
701
                    &self.smart_contract_id,
×
702
                    &naddr,
×
703
                    &_e
×
704
                );
705
            }
4,275✔
706
        }
707
    }
3,874,277✔
708

709
    /// Establish sessions with remote replicas.
710
    /// We might not be connected to any yet.
711
    /// Clears self.replicas, and fills in self.connected_replicas with already-connected neighbors
712
    /// Returns Ok(true) if we can proceed to sync
713
    /// Returns Ok(false) if we should try this again
714
    /// Returns Err(NoSuchNeighbor) if we don't have anyone to talk to
715
    /// Returns Err(..) on DB query error
716
    pub fn connect_begin(&mut self, network: &mut PeerNetwork) -> Result<bool, net_error> {
10,725,190✔
717
        if self.replicas.is_empty() {
10,725,190✔
718
            // find some from the peer DB
719
            let replicas = self.find_qualified_replicas(network)?;
6,901,708✔
720
            self.replicas = replicas;
6,901,708✔
721
        }
3,823,482✔
722
        debug!(
10,725,190✔
723
            "{:?}: {}: connect_begin: establish StackerDB sessions to {} neighbors (out of {} p2p peers)",
724
            network.get_local_peer(),
×
725
            &self.smart_contract_id,
×
726
            self.replicas.len(),
×
727
            network.get_num_p2p_convos();
×
728
            "replicas" => ?self.replicas
729
        );
730
        if self.replicas.is_empty() {
10,725,190✔
731
            // nothing to do
732
            return Err(net_error::NoSuchNeighbor);
6,826,783✔
733
        }
3,898,407✔
734

735
        let naddrs = mem::replace(&mut self.replicas, HashSet::new());
3,898,407✔
736
        for naddr in naddrs.into_iter() {
3,918,672✔
737
            if self.comms.is_neighbor_connecting(network, &naddr) {
3,918,672✔
738
                debug!(
21,186✔
739
                    "{:?}: {}: connect_begin: already connecting to StackerDB peer {:?}",
740
                    network.get_local_peer(),
×
741
                    &self.smart_contract_id,
×
742
                    &naddr
×
743
                );
744
                self.replicas.insert(naddr);
21,186✔
745
                continue;
21,186✔
746
            }
3,897,486✔
747
            if self.comms.has_neighbor_session(network, &naddr) {
3,897,486✔
748
                debug!(
3,857,112✔
749
                    "{:?}: {}: connect_begin: already connected to StackerDB peer {:?}",
750
                    network.get_local_peer(),
×
751
                    &self.smart_contract_id,
×
752
                    &naddr
×
753
                );
754
                self.connected_replicas.insert(naddr);
3,857,112✔
755
                continue;
3,857,112✔
756
            }
40,374✔
757

758
            debug!(
40,374✔
759
                "{:?}: {}: connect_begin: Send Handshake to StackerDB peer {:?}",
760
                network.get_local_peer(),
×
761
                &self.smart_contract_id,
×
762
                &naddr
×
763
            );
764
            match self.comms.neighbor_session_begin(network, &naddr) {
40,374✔
765
                Ok(true) => {
766
                    // connected!
767
                    debug!(
39,195✔
768
                        "{:?}: {}: connect_begin: connected to StackerDB peer {:?}",
769
                        network.get_local_peer(),
×
770
                        &self.smart_contract_id,
×
771
                        &naddr
×
772
                    );
773
                    self.num_attempted_connections += 1;
39,195✔
774
                    self.num_connections += 1;
39,195✔
775
                    self.connected_replicas.insert(naddr);
39,195✔
776
                }
777
                Ok(false) => {
1,089✔
778
                    // need to retry
1,089✔
779
                    self.num_attempted_connections += 1;
1,089✔
780
                    self.replicas.insert(naddr);
1,089✔
781
                }
1,089✔
782
                Err(_e) => {
90✔
783
                    debug!(
90✔
784
                        "{:?}: {}: Failed to begin session with {:?}: {:?}",
785
                        &network.get_local_peer(),
×
786
                        &self.smart_contract_id,
×
787
                        &naddr,
×
788
                        &_e
×
789
                    );
790
                }
791
            }
792
        }
793
        Ok(!self.connected_replicas.is_empty())
3,898,407✔
794
    }
10,725,190✔
795

796
    /// Finish up connecting to our replicas.
797
    /// Fills in self.connected_replicas based on receipt of a handshake accept.
798
    /// Returns true if we've received all pending messages
799
    /// Returns false otherwise
800
    pub fn connect_try_finish(&mut self, network: &mut PeerNetwork) -> Result<bool, net_error> {
4,046,555✔
801
        for (naddr, message) in self.comms.collect_replies(network).into_iter() {
4,046,555✔
802
            let data = match message.payload {
37,980✔
803
                StacksMessageType::StackerDBHandshakeAccept(_, db_data) => {
37,980✔
804
                    if network.get_chain_view().rc_consensus_hash != db_data.rc_consensus_hash {
37,980✔
805
                        // stale or inconsistent view. Do not proceed
806
                        debug!(
3,771✔
807
                            "{:?}: {}: remote peer {:?} has stale view ({} != {})",
808
                            network.get_local_peer(),
×
809
                            &self.smart_contract_id,
×
810
                            &naddr,
×
811
                            &network.get_chain_view().rc_consensus_hash,
×
812
                            &db_data.rc_consensus_hash
×
813
                        );
814
                        // don't unpin, since it's usually transient
815
                        self.connected_replicas.remove(&naddr);
3,771✔
816
                        continue;
3,771✔
817
                    }
34,209✔
818
                    db_data
34,209✔
819
                }
820
                StacksMessageType::Nack(data) => {
×
821
                    debug!(
×
822
                        "{:?}: {}: remote peer {:?} NACK'ed our StackerDBHandshake with code {}",
823
                        &network.get_local_peer(),
×
824
                        &self.smart_contract_id,
×
825
                        &naddr,
×
826
                        data.error_code
827
                    );
828
                    if data.error_code == NackErrorCodes::StaleView
×
829
                        || data.error_code == NackErrorCodes::FutureView
×
830
                    {
×
831
                        self.connected_replicas.remove(&naddr);
×
832
                        self.stale_neighbors.insert(naddr);
×
833
                    } else {
×
834
                        self.unpin_connected_replica(network, &naddr);
×
835
                    }
×
836
                    continue;
×
837
                }
838
                x => {
×
839
                    info!(
×
840
                        "{:?}: {}: Received unexpected message {:?}",
841
                        &network.get_local_peer(),
×
842
                        &self.smart_contract_id,
×
843
                        &x
×
844
                    );
845
                    continue;
×
846
                }
847
            };
848

849
            if data
34,209✔
850
                .smart_contracts
34,209✔
851
                .iter()
34,209✔
852
                .find(|db_id| *db_id == &self.smart_contract_id)
478,467✔
853
                .is_none()
34,209✔
854
            {
855
                debug!(
×
856
                    "{:?}: {}: remote peer does not replicate",
857
                    network.get_local_peer(),
×
858
                    &self.smart_contract_id
×
859
                );
860

861
                // disconnect
862
                self.unpin_connected_replica(network, &naddr);
×
863
                continue;
×
864
            }
34,209✔
865

866
            debug!(
34,209✔
867
                "{:?}: {}: connect_try_finish: Received StackerDBHandshakeAccept from {:?} for {:?}",
868
                network.get_local_peer(),
×
869
                &self.smart_contract_id,
×
870
                &naddr,
×
871
                &data
×
872
            );
873

874
            // this neighbor is good
875
            self.connected_replicas.insert(naddr);
34,209✔
876
        }
877

878
        if self.comms.count_inflight() > 0 {
4,046,555✔
879
            // still blocked
880
            return Ok(false);
171,909✔
881
        }
3,874,646✔
882

883
        if self.connected_replicas.is_empty() {
3,874,646✔
884
            // no one to talk to
885
            debug!(
369✔
886
                "{:?}: {}: connect_try_finish: no valid replicas",
887
                &self.smart_contract_id,
×
888
                network.get_local_peer()
×
889
            );
890
            return Err(net_error::PeerNotConnected(format!(
369✔
891
                "StackerDB connect_try_finish: no valid replicas for {}",
369✔
892
                &self.smart_contract_id
369✔
893
            )));
369✔
894
        }
3,874,277✔
895

896
        Ok(true)
3,874,277✔
897
    }
4,046,555✔
898

899
    /// Ask each replica for its chunk inventories.
900
    /// Also ask each inbound neighbor.
901
    /// Clears self.connected_replicas.
902
    /// StackerDBGetChunksInv
903
    /// Always succeeds; does not block.
904
    pub fn getchunksinv_begin(&mut self, network: &mut PeerNetwork) {
3,874,277✔
905
        let naddrs = mem::replace(&mut self.connected_replicas, HashSet::new());
3,874,277✔
906
        let mut already_sent = vec![];
3,874,277✔
907
        debug!(
3,874,277✔
908
            "{:?}: {}: getchunksinv_begin: Send StackerDBGetChunksInv to {} replicas",
909
            network.get_local_peer(),
×
910
            &self.smart_contract_id,
×
911
            naddrs.len();
×
912
            "connected_replicas" => ?naddrs,
913
        );
914
        for naddr in naddrs.into_iter() {
3,889,916✔
915
            debug!(
3,889,916✔
916
                "{:?}: {}: getchunksinv_begin: Send StackerDBGetChunksInv at {} to {:?}",
917
                network.get_local_peer(),
×
918
                &self.smart_contract_id,
×
919
                &network.get_chain_view().rc_consensus_hash,
×
920
                &naddr,
×
921
            );
922
            let chunks_req = self.make_getchunkinv(&network.get_chain_view().rc_consensus_hash);
3,889,916✔
923
            if let Err(e) = self.comms.neighbor_send(network, &naddr, chunks_req) {
3,889,916✔
924
                debug!(
20,151✔
925
                    "{:?}: {}: failed to send StackerDBGetChunkInv to {:?}: {:?}",
926
                    network.get_local_peer(),
×
927
                    &self.smart_contract_id,
×
928
                    &naddr,
×
929
                    &e
×
930
                );
931
                continue;
20,151✔
932
            }
3,869,765✔
933
            already_sent.push(naddr);
3,869,765✔
934
        }
935
        self.send_getchunkinv_to_inbound_neighbors(network, &already_sent);
3,874,277✔
936
    }
3,874,277✔
937

938
    /// Collect each chunk inventory request.
939
    /// Restores self.connected_replicas based on messages received.
940
    /// Return Ok(true) if we've received all pending messages
941
    /// Return Ok(false) if not
942
    pub fn getchunksinv_try_finish(
17,177,695✔
943
        &mut self,
17,177,695✔
944
        network: &mut PeerNetwork,
17,177,695✔
945
    ) -> Result<bool, net_error> {
17,177,695✔
946
        for (naddr, message) in self.comms.collect_replies(network).into_iter() {
17,177,695✔
947
            let chunk_inv_opt = match message.payload {
3,770,260✔
948
                StacksMessageType::StackerDBChunkInv(data) => {
3,476,961✔
949
                    if data.slot_versions.len() != self.num_slots {
3,476,961✔
950
                        info!("{:?}: {}: Received malformed StackerDBChunkInv from {:?}: expected {} chunks, got {}", network.get_local_peer(), &self.smart_contract_id, &naddr, self.num_slots, data.slot_versions.len());
2,727✔
951
                        None
2,727✔
952
                    } else {
953
                        Some(data)
3,474,234✔
954
                    }
955
                }
956
                StacksMessageType::Nack(data) => {
293,299✔
957
                    debug!(
293,299✔
958
                        "{:?}: {}: remote peer {:?} NACK'ed our StackerDBGetChunksInv with code {}",
959
                        network.get_local_peer(),
×
960
                        &self.smart_contract_id,
×
961
                        &naddr,
×
962
                        data.error_code
963
                    );
964
                    if data.error_code == NackErrorCodes::StaleView
293,299✔
965
                        || data.error_code == NackErrorCodes::FutureView
90,672✔
966
                    {
293,299✔
967
                        self.connected_replicas.remove(&naddr);
293,299✔
968
                        self.stale_neighbors.insert(naddr);
293,299✔
969
                    } else {
293,299✔
970
                        self.unpin_connected_replica(network, &naddr);
×
971
                    }
×
972
                    continue;
293,299✔
973
                }
974
                x => {
×
975
                    info!(
×
976
                        "{:?}: {}: Received unexpected message {:?}",
977
                        network.get_local_peer(),
×
978
                        &self.smart_contract_id,
×
979
                        &x
×
980
                    );
981
                    self.unpin_connected_replica(network, &naddr);
×
982
                    continue;
×
983
                }
984
            };
985
            debug!(
3,476,961✔
986
                "{:?}: {}: getchunksinv_try_finish: Received StackerDBChunkInv from {:?}: {:?}",
987
                network.get_local_peer(),
×
988
                &self.smart_contract_id,
×
989
                &naddr,
×
990
                &chunk_inv_opt
×
991
            );
992

993
            if let Some(chunk_inv) = chunk_inv_opt {
3,476,961✔
994
                self.chunk_invs.insert(naddr.clone(), chunk_inv);
3,474,234✔
995
                self.connected_replicas.insert(naddr);
3,474,234✔
996
            }
3,474,234✔
997
        }
998
        if self.comms.count_inflight() > 0 {
17,177,695✔
999
            // not done yet, so blocked
1000
            return Ok(false);
13,395,543✔
1001
        }
3,782,152✔
1002

1003
        // got everything. Calculate download priority
1004
        let priorities = self.make_chunk_request_schedule(network, None)?;
3,782,152✔
1005
        let expected_versions = self.stackerdbs.get_slot_versions(&self.smart_contract_id)?;
3,782,152✔
1006

1007
        self.chunk_fetch_priorities = priorities;
3,782,152✔
1008
        self.expected_versions = expected_versions;
3,782,152✔
1009
        Ok(true)
3,782,152✔
1010
    }
17,177,695✔
1011

1012
    /// Ask each prioritized replica for some chunks we need.
1013
    /// Return Ok(true) if we processed all requested chunks
1014
    /// Return Ok(false) if there are still some requests to make
1015
    pub fn getchunks_begin(&mut self, network: &mut PeerNetwork) -> Result<bool, net_error> {
4,119,063✔
1016
        if self.chunk_fetch_priorities.is_empty() {
4,119,063✔
1017
            // done
1018
            debug!(
3,843,444✔
1019
                "{:?}: {}: getchunks_begin: no chunks prioritized",
1020
                network.get_local_peer(),
×
1021
                &self.smart_contract_id
×
1022
            );
1023
            return Ok(true);
3,843,444✔
1024
        }
275,619✔
1025

1026
        let mut cur_priority = self.next_chunk_fetch_priority % self.chunk_fetch_priorities.len();
275,619✔
1027

1028
        debug!(
275,619✔
1029
            "{:?}: {}: getchunks_begin: Issue up to {} StackerDBGetChunk requests",
1030
            &network.get_local_peer(),
×
1031
            &self.smart_contract_id,
×
1032
            self.request_capacity;
1033
            "chunk_fetch_priorities" => ?self.chunk_fetch_priorities,
1034
        );
1035

1036
        let mut requested = 0;
275,619✔
1037
        let mut unpin = HashSet::new();
275,619✔
1038

1039
        // fill up our comms with $capacity requests
1040
        for _i in 0..self.request_capacity {
1,653,714✔
1041
            if self.comms.count_inflight() >= self.request_capacity {
1,653,714✔
1042
                break;
×
1043
            }
1,653,714✔
1044
            let cur_fetch_priority = self
1,653,714✔
1045
                .chunk_fetch_priorities
1,653,714✔
1046
                .get_mut(cur_priority)
1,653,714✔
1047
                .ok_or_else(|| {
1,653,714✔
1048
                    error!(
×
1049
                        "Error setting chunk fetch priories. Priority index out of bounds";
1050
                        "cur_priority" => cur_priority,
×
1051
                    );
1052
                    net_error::InvalidState
×
1053
                })?;
×
1054

1055
            let chunk_request = cur_fetch_priority.0.clone();
1,653,714✔
1056
            let selected_neighbor_opt = cur_fetch_priority
1,653,714✔
1057
                .1
1,653,714✔
1058
                .iter()
1,653,714✔
1059
                .enumerate()
1,653,714✔
1060
                .find(|(_i, naddr)| !self.comms.has_inflight(naddr));
1,653,714✔
1061

1062
            let (idx, selected_neighbor) = if let Some(x) = selected_neighbor_opt {
1,653,714✔
1063
                x
77,978✔
1064
            } else {
1065
                continue;
1,575,736✔
1066
            };
1067

1068
            debug!(
77,978✔
1069
                "{:?}: {}: getchunks_begin: Send StackerDBGetChunk(id={},ver={}) at {} to {}",
1070
                &network.get_local_peer(),
×
1071
                &self.smart_contract_id,
×
1072
                chunk_request.slot_id,
1073
                chunk_request.slot_version,
1074
                &chunk_request.rc_consensus_hash,
×
1075
                &selected_neighbor
×
1076
            );
1077

1078
            if let Err(e) = self.comms.neighbor_send(
77,978✔
1079
                network,
77,978✔
1080
                selected_neighbor,
77,978✔
1081
                StacksMessageType::StackerDBGetChunk(chunk_request.clone()),
77,978✔
1082
            ) {
77,978✔
1083
                info!(
162✔
1084
                    "{:?}: {} Failed to request chunk {} from {:?}: {:?}",
1085
                    network.get_local_peer(),
162✔
1086
                    &self.smart_contract_id,
162✔
1087
                    chunk_request.slot_id,
1088
                    selected_neighbor,
1089
                    &e
162✔
1090
                );
1091
                unpin.insert(selected_neighbor.clone());
162✔
1092
                continue;
162✔
1093
            }
77,816✔
1094

1095
            requested += 1;
77,816✔
1096

1097
            // don't ask this neighbor again
1098
            cur_fetch_priority.1.remove(idx);
77,816✔
1099

1100
            // next-prioritized chunk
1101
            cur_priority = (cur_priority + 1) % self.chunk_fetch_priorities.len();
77,816✔
1102
        }
1103
        let _ = unpin
275,619✔
1104
            .into_iter()
275,619✔
1105
            .map(|naddr| self.unpin_connected_replica(network, &naddr));
275,619✔
1106

1107
        if requested == 0 && self.comms.count_inflight() == 0 {
275,619✔
1108
            return Err(net_error::PeerNotConnected(format!(
324✔
1109
                "StackerDB getchunks_begin: no chunks to request"
324✔
1110
            )));
324✔
1111
        }
275,295✔
1112

1113
        self.next_chunk_fetch_priority = cur_priority;
275,295✔
1114

1115
        Ok(self.chunk_fetch_priorities.is_empty())
275,295✔
1116
    }
4,119,063✔
1117

1118
    /// Collect chunk replies from neighbors
1119
    /// Returns Ok(true) if all inflight messages have been received (or dealt with)
1120
    /// Returns Ok(false) otherwise
1121
    pub fn getchunks_try_finish(
4,118,739✔
1122
        &mut self,
4,118,739✔
1123
        network: &mut PeerNetwork,
4,118,739✔
1124
        config: &StackerDBConfig,
4,118,739✔
1125
    ) -> Result<bool, net_error> {
4,118,739✔
1126
        for (naddr, message) in self.comms.collect_replies(network).into_iter() {
4,118,739✔
1127
            let data = match message.payload {
77,500✔
1128
                StacksMessageType::StackerDBChunk(data) => data,
77,149✔
1129
                StacksMessageType::Nack(data) => {
351✔
1130
                    debug!(
351✔
1131
                        "{:?}: {}: remote peer {:?} NACK'ed our StackerDBGetChunk with code {}",
1132
                        network.get_local_peer(),
×
1133
                        &self.smart_contract_id,
×
1134
                        &naddr,
×
1135
                        data.error_code
1136
                    );
1137
                    if data.error_code == NackErrorCodes::StaleView
351✔
1138
                        || data.error_code == NackErrorCodes::FutureView
135✔
1139
                    {
216✔
1140
                        self.stale_neighbors.insert(naddr);
216✔
1141
                    } else if data.error_code == NackErrorCodes::StaleVersion {
288✔
1142
                        // try again immediately, without throttling
99✔
1143
                        self.stale_inv = true;
99✔
1144
                    }
135✔
1145
                    continue;
351✔
1146
                }
1147
                x => {
×
1148
                    info!(
×
1149
                        "{:?}: {}: Received unexpected message {:?}",
1150
                        network.get_local_peer(),
×
1151
                        &self.smart_contract_id,
×
1152
                        &x
×
1153
                    );
1154
                    self.unpin_connected_replica(network, &naddr);
×
1155
                    continue;
×
1156
                }
1157
            };
1158

1159
            // validate
1160
            if !self.validate_downloaded_chunk(network, config, &data)? {
77,149✔
1161
                info!(
×
1162
                    "{:?}: {}: Remote neighbor {:?} served an invalid chunk for ID {}",
1163
                    network.get_local_peer(),
×
1164
                    &self.smart_contract_id,
×
1165
                    &naddr,
×
1166
                    data.slot_id
1167
                );
1168
                self.unpin_connected_replica(network, &naddr);
×
1169
                continue;
×
1170
            }
77,149✔
1171

1172
            // update bookkeeping
1173
            debug!(
77,149✔
1174
                "{:?}: {}, getchunks_try_finish: Received StackerDBChunk from {:?}",
1175
                network.get_local_peer(),
×
1176
                &self.smart_contract_id,
×
1177
                &naddr
×
1178
            );
1179
            self.add_downloaded_chunk(naddr, data);
77,149✔
1180
        }
1181

1182
        Ok(self.comms.count_inflight() == 0)
4,118,739✔
1183
    }
4,118,739✔
1184

1185
    /// Push out chunks to peers
1186
    /// Returns true if there are no more chunks to push.
1187
    /// Returns false if there are
1188
    pub fn pushchunks_begin(&mut self, network: &mut PeerNetwork) -> Result<bool, net_error> {
4,064,689✔
1189
        if self.chunk_push_priorities.is_empty() && self.push_round != self.rounds {
4,064,689✔
1190
            // only do this once per round
1191
            let priorities = self.make_chunk_push_schedule(network)?;
3,781,439✔
1192
            self.chunk_push_priorities = priorities;
3,781,439✔
1193
            self.push_round = self.rounds;
3,781,439✔
1194
        }
283,250✔
1195
        if self.chunk_push_priorities.is_empty() {
4,064,689✔
1196
            // done
1197
            debug!(
3,745,197✔
1198
                "{:?}:{}: pushchunks_begin: no chunks prioritized",
1199
                network.get_local_peer(),
×
1200
                &self.smart_contract_id
×
1201
            );
1202
            return Ok(true);
3,745,197✔
1203
        }
319,492✔
1204

1205
        let mut cur_priority = self.next_chunk_push_priority % self.chunk_push_priorities.len();
319,492✔
1206

1207
        debug!(
319,492✔
1208
            "{:?}: {}: pushchunks_begin: Send up to {} StackerDBChunk pushes",
1209
            &network.get_local_peer(),
×
1210
            &self.smart_contract_id,
×
1211
            self.chunk_push_priorities.len();
×
1212
            "chunk_push_priorities" => ?self.chunk_push_priorities
1213
        );
1214

1215
        // fill up our comms with $capacity requests
1216
        let mut num_sent = 0;
319,492✔
1217
        for _i in 0..self.chunk_push_priorities.len() {
583,484✔
1218
            if self.comms.count_inflight() >= self.request_capacity {
583,484✔
1219
                break;
×
1220
            }
583,484✔
1221
            let cur_push_priority = self
583,484✔
1222
                .chunk_push_priorities
583,484✔
1223
                .get_mut(cur_priority)
583,484✔
1224
                .ok_or_else(|| {
583,484✔
1225
                    error!(
×
1226
                        "Error setting chunk push priories. Priority index out of bounds";
1227
                        "cur_priority" => cur_priority,
×
1228
                    );
1229
                    net_error::InvalidState
×
1230
                })?;
×
1231

1232
            let chunk_push = cur_push_priority.0.clone();
583,484✔
1233
            // try the first neighbor in the chunk_push_priorities list
1234
            let selected_neighbor_opt = cur_push_priority.1.first().map(|neighbor| (0, neighbor));
583,484✔
1235

1236
            let Some((idx, selected_neighbor)) = selected_neighbor_opt else {
583,484✔
1237
                debug!("{:?}: {}: pushchunks_begin: no available neighbor to send StackerDBChunk(id={},ver={}) to",
471,123✔
1238
                    &network.get_local_peer(),
×
1239
                    &self.smart_contract_id,
×
1240
                    chunk_push.chunk_data.slot_id,
1241
                    chunk_push.chunk_data.slot_version,
1242
                );
1243

1244
                // next-prioritized chunk
1245
                cur_priority = (cur_priority + 1) % self.chunk_push_priorities.len();
471,123✔
1246
                continue;
471,123✔
1247
            };
1248

1249
            debug!(
112,361✔
1250
                "{:?}: {}: pushchunks_begin: Send StackerDBChunk(id={},ver={}) at {} to {}",
1251
                &network.get_local_peer(),
×
1252
                &self.smart_contract_id,
×
1253
                chunk_push.chunk_data.slot_id,
1254
                chunk_push.chunk_data.slot_version,
1255
                &chunk_push.rc_consensus_hash,
×
1256
                &selected_neighbor
×
1257
            );
1258

1259
            let slot_id = chunk_push.chunk_data.slot_id;
112,361✔
1260
            let slot_version = chunk_push.chunk_data.slot_version;
112,361✔
1261
            if let Err(e) = self.comms.neighbor_send(
112,361✔
1262
                network,
112,361✔
1263
                selected_neighbor,
112,361✔
1264
                StacksMessageType::StackerDBPushChunk(chunk_push),
112,361✔
1265
            ) {
112,361✔
1266
                info!(
54✔
1267
                    "{:?}: {}: Failed to send chunk {} from {:?}: {:?}",
1268
                    network.get_local_peer(),
54✔
1269
                    &self.smart_contract_id,
54✔
1270
                    slot_id,
1271
                    selected_neighbor,
1272
                    &e
54✔
1273
                );
1274
                continue;
54✔
1275
            }
112,307✔
1276

1277
            // record what we just sent
1278
            self.chunk_push_receipts
112,307✔
1279
                .insert(selected_neighbor.clone(), (slot_id, slot_version));
112,307✔
1280

1281
            // don't send to this neighbor again
1282
            cur_push_priority.1.remove(idx);
112,307✔
1283

1284
            // next-prioritized chunk
1285
            cur_priority = (cur_priority + 1) % self.chunk_push_priorities.len();
112,307✔
1286

1287
            num_sent += 1;
112,307✔
1288
            if num_sent > self.request_capacity {
112,307✔
1289
                break;
3✔
1290
            }
112,304✔
1291
        }
1292
        self.next_chunk_push_priority = cur_priority;
319,492✔
1293
        Ok(self
319,492✔
1294
            .chunk_push_priorities
319,492✔
1295
            .iter()
319,492✔
1296
            .fold(0usize, |acc, (_chunk, num_naddrs)| {
583,490✔
1297
                acc.saturating_add(num_naddrs.len())
583,490✔
1298
            })
583,490✔
1299
            == 0)
1300
    }
4,064,689✔
1301

1302
    /// Collect push-chunk replies from neighbors.
1303
    /// If a remote neighbor replies with a chunk-inv for a pushed chunk which contains newer data
1304
    /// than we have, then set `self.need_resync` to true.
1305
    /// Returns true if all inflight messages have been received (or dealt with)
1306
    /// Returns false otherwise
1307
    pub fn pushchunks_try_finish(&mut self, network: &mut PeerNetwork) -> bool {
4,064,689✔
1308
        for (naddr, message) in self.comms.collect_replies(network).into_iter() {
4,064,689✔
1309
            let new_chunk_inv = match message.payload {
62,005✔
1310
                StacksMessageType::StackerDBChunkInv(data) => data,
62,005✔
1311
                StacksMessageType::Nack(data) => {
×
1312
                    debug!(
×
1313
                        "{:?}: {}: remote peer {:?} NACK'ed our StackerDBChunk with code {}",
1314
                        network.get_local_peer(),
×
1315
                        &self.smart_contract_id,
×
1316
                        &naddr,
×
1317
                        data.error_code
1318
                    );
1319
                    if data.error_code == NackErrorCodes::StaleView
×
1320
                        || data.error_code == NackErrorCodes::FutureView
×
1321
                    {
×
1322
                        self.stale_neighbors.insert(naddr);
×
1323
                    }
×
1324
                    continue;
×
1325
                }
1326
                x => {
×
1327
                    info!(
×
1328
                        "{:?}: {}: Received unexpected message {:?}",
1329
                        network.get_local_peer(),
×
1330
                        &self.smart_contract_id,
×
1331
                        &x
×
1332
                    );
1333
                    continue;
×
1334
                }
1335
            };
1336

1337
            // must be well-formed
1338
            if new_chunk_inv.slot_versions.len() != self.num_slots {
62,005✔
1339
                info!("{:?}: {}: Received malformed StackerDBChunkInv from {:?}: expected {} chunks, got {}", network.get_local_peer(), &self.smart_contract_id, &naddr, self.num_slots, new_chunk_inv.slot_versions.len());
×
1340
                continue;
×
1341
            }
62,005✔
1342

1343
            // update bookkeeping
1344
            debug!(
62,005✔
1345
                "{:?}: {}: pushchunks_try_finish: Received StackerDBChunkInv from {:?}",
1346
                network.get_local_peer(),
×
1347
                &self.smart_contract_id,
×
1348
                &naddr
×
1349
            );
1350

1351
            if let Some((slot_id, _)) = self.chunk_push_receipts.get(&naddr) {
62,005✔
1352
                self.need_resync = self.need_resync
62,005✔
1353
                    || self.add_pushed_chunk(network, naddr, new_chunk_inv, *slot_id);
62,005✔
1354
            }
×
1355
        }
1356

1357
        let inflight = self.comms.count_inflight();
4,064,689✔
1358
        debug!(
4,064,689✔
1359
            "{:?}: {}: inflight messages: {:?}",
1360
            network.get_local_peer(),
×
1361
            &self.smart_contract_id,
×
1362
            inflight
1363
        );
1364
        inflight == 0
4,064,689✔
1365
    }
4,064,689✔
1366

1367
    /// Recalculate the download schedule based on chunkinvs received on push
1368
    pub fn recalculate_chunk_request_schedule(
62,005✔
1369
        &mut self,
62,005✔
1370
        network: &PeerNetwork,
62,005✔
1371
    ) -> Result<(), net_error> {
62,005✔
1372
        // figure out the new expected versions
1373
        let mut expected_versions = vec![0u32; self.num_slots];
62,005✔
1374
        for (_, chunk_inv) in self.chunk_invs.iter() {
62,007✔
1375
            for (slot_version, expected_version) in chunk_inv
307,113✔
1376
                .slot_versions
62,007✔
1377
                .iter()
62,007✔
1378
                .zip(expected_versions.iter_mut())
62,007✔
1379
            {
307,113✔
1380
                *expected_version = (*slot_version).max(*expected_version);
307,113✔
1381
            }
307,113✔
1382
        }
1383

1384
        let priorities =
62,005✔
1385
            self.make_chunk_request_schedule(network, Some(expected_versions.clone()))?;
62,005✔
1386

1387
        self.chunk_fetch_priorities = priorities;
62,005✔
1388
        self.expected_versions = expected_versions;
62,005✔
1389
        Ok(())
62,005✔
1390
    }
62,005✔
1391

1392
    /// Forcibly wake up the state machine if it is throttled
1393
    pub fn wakeup(&mut self) {
2,196,546✔
1394
        debug!("wake up StackerDB sync for {}", &self.smart_contract_id);
2,196,546✔
1395
        self.last_run_ts = 0;
2,196,546✔
1396
    }
2,196,546✔
1397

1398
    /// Run the state machine.
1399
    /// If we run to completion, then reset and return the sync result.
1400
    /// Otherwise, if there's still more work to do, then return None
1401
    pub fn run(
213,736,981✔
1402
        &mut self,
213,736,981✔
1403
        network: &mut PeerNetwork,
213,736,981✔
1404
        config: &StackerDBConfig,
213,736,981✔
1405
    ) -> Result<Option<StackerDBSyncResult>, net_error> {
213,736,981✔
1406
        if network.get_connection_opts().disable_stackerdb_sync {
213,736,981✔
1407
            test_debug!(
61✔
1408
                "{:?}: stacker DB sync is disabled",
1409
                network.get_local_peer()
×
1410
            );
1411
            return Ok(None);
61✔
1412
        }
213,736,920✔
1413

1414
        // make sure we have an up-to-date chain view.
1415
        // If not, then abort and immediately retry the sync (since any queued messages we have are
1416
        // likely gonna fail)
1417
        if let Some(rc_consensus_hash) = self.rc_consensus_hash.as_ref() {
213,736,920✔
1418
            if network.get_chain_view().rc_consensus_hash != *rc_consensus_hash {
203,095,777✔
1419
                debug!("{:?}: {}: Resetting and restarting running StackerDB sync due to chain view change", network.get_local_peer(), &self.smart_contract_id);
1,724,364✔
1420
                let result = self.reset(Some(network), config);
1,724,364✔
1421
                self.state = StackerDBSyncState::ConnectBegin;
1,724,364✔
1422
                self.rc_consensus_hash = Some(network.get_chain_view().rc_consensus_hash.clone());
1,724,364✔
1423
                self.wakeup();
1,724,364✔
1424
                return Ok(Some(result));
1,724,364✔
1425
            }
201,371,413✔
1426
        } else {
10,641,143✔
1427
            self.rc_consensus_hash = Some(network.get_chain_view().rc_consensus_hash.clone());
10,641,143✔
1428
        }
10,641,143✔
1429

1430
        // throttle to write_freq
1431
        if self.last_run_ts + config.write_freq.max(1) > get_epoch_time_secs() {
212,012,556✔
1432
            debug!(
187,317,297✔
1433
                "{:?}: {}: stacker DB sync is throttled until {}",
1434
                network.get_local_peer(),
×
1435
                &self.smart_contract_id,
×
1436
                self.last_run_ts + config.write_freq
×
1437
            );
1438
            return Ok(None);
187,317,297✔
1439
        }
24,695,259✔
1440

1441
        loop {
1442
            debug!(
54,615,174✔
1443
                "{:?}: {}: stacker DB sync state is {:?}",
1444
                network.get_local_peer(),
×
1445
                &self.smart_contract_id,
×
1446
                &self.state
×
1447
            );
1448

1449
            let mut blocked = true;
54,615,174✔
1450
            match self.state {
54,615,174✔
1451
                StackerDBSyncState::ConnectBegin => {
1452
                    let done = match self.connect_begin(network) {
10,724,996✔
1453
                        Ok(done) => done,
3,898,217✔
1454
                        Err(net_error::NoSuchNeighbor) => {
1455
                            // nothing to do
1456
                            self.state = StackerDBSyncState::Finished;
6,826,779✔
1457
                            blocked = false;
6,826,779✔
1458
                            false
6,826,779✔
1459
                        }
1460
                        Err(e) => {
×
1461
                            return Err(e);
×
1462
                        }
1463
                    };
1464
                    if done {
10,724,996✔
1465
                        self.state = StackerDBSyncState::ConnectFinish;
3,875,861✔
1466
                        blocked = false;
3,875,861✔
1467
                    }
9,925,499✔
1468
                }
1469
                StackerDBSyncState::ConnectFinish => {
1470
                    let done = self.connect_try_finish(network)?;
4,046,555✔
1471
                    if done {
4,046,186✔
1472
                        self.state = StackerDBSyncState::GetChunksInvBegin;
3,874,277✔
1473
                        blocked = false;
3,874,277✔
1474
                    }
3,907,946✔
1475
                }
1476
                StackerDBSyncState::GetChunksInvBegin => {
3,874,277✔
1477
                    // does not block
3,874,277✔
1478
                    self.getchunksinv_begin(network);
3,874,277✔
1479
                    self.state = StackerDBSyncState::GetChunksInvFinish;
3,874,277✔
1480
                    blocked = false;
3,874,277✔
1481
                }
3,874,277✔
1482
                StackerDBSyncState::GetChunksInvFinish => {
1483
                    let done = self.getchunksinv_try_finish(network)?;
17,177,695✔
1484
                    if done {
17,177,695✔
1485
                        self.state = StackerDBSyncState::GetChunks;
3,782,152✔
1486
                        blocked = false;
3,782,152✔
1487
                    }
13,395,543✔
1488
                }
1489
                StackerDBSyncState::GetChunks => {
1490
                    if network.get_connection_opts().disable_stackerdb_get_chunks {
4,119,063✔
1491
                        // fault injection -- force the system to rely exclusively on push-chunk
1492
                        // behavior
1493
                        self.state = StackerDBSyncState::PushChunks;
×
1494
                        continue;
×
1495
                    }
4,119,063✔
1496

1497
                    let requests_finished = self.getchunks_begin(network)?;
4,119,063✔
1498
                    let inflight_finished = self.getchunks_try_finish(network, config)?;
4,118,739✔
1499
                    let done = requests_finished && inflight_finished;
4,118,739✔
1500
                    if done {
4,118,739✔
1501
                        self.state = StackerDBSyncState::PushChunks;
3,843,444✔
1502
                        blocked = false;
3,843,444✔
1503
                    }
3,843,444✔
1504
                }
1505
                StackerDBSyncState::PushChunks => {
1506
                    let pushes_finished = self.pushchunks_begin(network)?;
4,064,689✔
1507
                    let inflight_finished = self.pushchunks_try_finish(network);
4,064,689✔
1508
                    let done = pushes_finished && inflight_finished;
4,064,689✔
1509
                    if done {
4,064,689✔
1510
                        if self.need_resync
3,843,125✔
1511
                            && !network.get_connection_opts().disable_stackerdb_get_chunks
62,005✔
1512
                        {
1513
                            // someone pushed newer chunk data to us, and getting chunks is
1514
                            // enabled, so immediately go request them
1515
                            debug!(
62,005✔
1516
                                "{:?}: {}: immediately retry StackerDB GetChunks due to PushChunk NACK",
1517
                                network.get_local_peer(),
×
1518
                                &self.smart_contract_id
×
1519
                            );
1520
                            self.recalculate_chunk_request_schedule(network)?;
62,005✔
1521
                            self.state = StackerDBSyncState::GetChunks;
62,005✔
1522
                        } else {
3,781,120✔
1523
                            // done syncing
3,781,120✔
1524
                            self.state = StackerDBSyncState::Finished;
3,781,120✔
1525
                        }
3,781,120✔
1526
                        self.need_resync = false;
3,843,125✔
1527
                        blocked = false;
3,843,125✔
1528
                    }
221,564✔
1529
                }
1530
                StackerDBSyncState::Finished => {
1531
                    let stale_inv = self.stale_inv;
10,607,899✔
1532

1533
                    let result = self.reset(Some(network), config);
10,607,899✔
1534
                    self.state = StackerDBSyncState::ConnectBegin;
10,607,899✔
1535

1536
                    if stale_inv {
10,607,899✔
1537
                        debug!(
×
1538
                            "{:?}: {}: immediately retry StackerDB sync due to stale inventory",
1539
                            network.get_local_peer(),
×
1540
                            &self.smart_contract_id
×
1541
                        );
1542
                        self.wakeup();
×
1543
                    }
10,607,899✔
1544
                    return Ok(Some(result));
10,607,899✔
1545
                }
1546
            };
1547

1548
            if blocked {
44,006,582✔
1549
                return Ok(None);
14,086,667✔
1550
            }
29,919,915✔
1551
        }
1552
    }
213,736,981✔
1553
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc