• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

stacks-network / stacks-core / 24833235977-1

23 Apr 2026 11:42AM UTC coverage: 85.744% (+0.03%) from 85.712%
24833235977-1

Pull #7068

github

8b8063
web-flow
Merge df9ed4b6b into 69fc5e169
Pull Request #7068: chore: refactor special_map to simplify logic

14 of 22 new or added lines in 1 file covered. (63.64%)

3174 existing lines in 85 files now uncovered.

187182 of 218304 relevant lines covered (85.74%)

17361502.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.06
/stackslib/src/net/stackerdb/sync.rs
1
// Copyright (C) 2013-2020 Blockstack PBC, a public benefit corporation
2
// Copyright (C) 2020-2023 Stacks Open Internet Foundation
3
//
4
// This program is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8
//
9
// This program is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13
//
14
// You should have received a copy of the GNU General Public License
15
// along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

17
use std::collections::{HashMap, HashSet};
18
use std::mem;
19

20
use clarity::vm::types::QualifiedContractIdentifier;
21
use rand::prelude::SliceRandom;
22
use rand::{thread_rng, Rng};
23
use stacks_common::types::chainstate::ConsensusHash;
24
use stacks_common::util::get_epoch_time_secs;
25

26
use crate::net::db::PeerDB;
27
use crate::net::neighbors::comms::ToNeighborKey;
28
use crate::net::neighbors::NeighborComms;
29
use crate::net::p2p::PeerNetwork;
30
use crate::net::stackerdb::{
31
    StackerDBConfig, StackerDBSync, StackerDBSyncResult, StackerDBSyncState, StackerDBs,
32
};
33
use crate::net::{
34
    Error as net_error, NackErrorCodes, NeighborAddress, StackerDBChunkData, StackerDBChunkInvData,
35
    StackerDBGetChunkData, StackerDBGetChunkInvData, StackerDBPushChunkData, StacksMessageType,
36
};
37

38
const MAX_CHUNKS_IN_FLIGHT: usize = 6;
39
const MAX_DB_NEIGHBORS: usize = 32;
40

41
impl<NC: NeighborComms> StackerDBSync<NC> {
42
    pub fn new(
7,698✔
43
        smart_contract: QualifiedContractIdentifier,
7,698✔
44
        config: &StackerDBConfig,
7,698✔
45
        comms: NC,
7,698✔
46
        stackerdbs: StackerDBs,
7,698✔
47
    ) -> StackerDBSync<NC> {
7,698✔
48
        let mut dbsync = StackerDBSync {
7,698✔
49
            state: StackerDBSyncState::ConnectBegin,
7,698✔
50
            rc_consensus_hash: None,
7,698✔
51
            smart_contract_id: smart_contract,
7,698✔
52
            num_slots: config.num_slots() as usize,
7,698✔
53
            write_freq: config.write_freq,
7,698✔
54
            chunk_invs: HashMap::new(),
7,698✔
55
            chunk_fetch_priorities: vec![],
7,698✔
56
            chunk_push_priorities: vec![],
7,698✔
57
            chunk_push_receipts: HashMap::new(),
7,698✔
58
            next_chunk_fetch_priority: 0,
7,698✔
59
            next_chunk_push_priority: 0,
7,698✔
60
            expected_versions: vec![],
7,698✔
61
            downloaded_chunks: HashMap::new(),
7,698✔
62
            replicas: HashSet::new(),
7,698✔
63
            connected_replicas: HashSet::new(),
7,698✔
64
            comms,
7,698✔
65
            stackerdbs,
7,698✔
66
            request_capacity: MAX_CHUNKS_IN_FLIGHT,
7,698✔
67
            max_neighbors: MAX_DB_NEIGHBORS,
7,698✔
68
            total_stored: 0,
7,698✔
69
            total_pushed: 0,
7,698✔
70
            last_run_ts: 0,
7,698✔
71
            need_resync: false,
7,698✔
72
            stale_inv: false,
7,698✔
73
            stale_neighbors: HashSet::new(),
7,698✔
74
            num_connections: 0,
7,698✔
75
            num_attempted_connections: 0,
7,698✔
76
            rounds: 0,
7,698✔
77
            push_round: 0,
7,698✔
78
            last_eviction_time: get_epoch_time_secs(),
7,698✔
79
        };
7,698✔
80
        dbsync.reset(None, config);
7,698✔
81
        dbsync
7,698✔
82
    }
7,698✔
83

84
    /// Find stackerdb replicas and apply filtering rules
85
    fn find_qualified_replicas(
18,631,590✔
86
        &self,
18,631,590✔
87
        network: &PeerNetwork,
18,631,590✔
88
    ) -> Result<HashSet<NeighborAddress>, net_error> {
18,631,590✔
89
        let mut found = HashSet::new();
18,631,590✔
90
        let mut min_age =
18,631,590✔
91
            get_epoch_time_secs().saturating_sub(network.get_connection_opts().max_neighbor_age);
18,631,590✔
92

93
        let local_naddr = network.get_local_peer().to_neighbor_addr();
18,631,590✔
94

95
        while found.len() < self.max_neighbors {
37,263,171✔
96
            let peers_iter = PeerDB::find_stacker_db_replicas(
37,263,171✔
97
                network.peerdb_conn(),
37,263,171✔
98
                network.get_local_peer().network_id,
37,263,171✔
99
                &self.smart_contract_id,
37,263,171✔
100
                min_age,
37,263,171✔
101
                self.max_neighbors,
37,263,171✔
102
            )?
×
103
            .into_iter()
37,263,171✔
104
            .map(|neighbor| {
37,263,181✔
105
                (
8,725,686✔
106
                    NeighborAddress::from_neighbor(&neighbor),
8,725,686✔
107
                    neighbor.last_contact_time,
8,725,686✔
108
                )
8,725,686✔
109
            })
8,725,686✔
110
            .filter(|(naddr, _)| {
37,263,181✔
111
                if naddr.addrbytes.is_anynet() {
8,725,686✔
112
                    return false;
×
113
                }
8,725,686✔
114
                if naddr.public_key_hash == local_naddr.public_key_hash {
8,725,686✔
115
                    // don't talk to us by another address
116
                    return false;
×
117
                }
8,725,686✔
118
                if !network.get_connection_opts().private_neighbors
8,725,686✔
119
                    && naddr.addrbytes.is_in_private_range()
×
120
                {
121
                    return false;
×
122
                }
8,725,686✔
123
                true
8,725,686✔
124
            });
8,725,686✔
125

126
            for (peer, last_contact) in peers_iter {
37,263,181✔
127
                found.insert(peer);
8,725,686✔
128
                if found.len() >= self.max_neighbors {
8,725,686✔
129
                    break;
18✔
130
                }
4,668,644✔
131
                min_age = min_age.min(last_contact);
4,668,644✔
132
            }
133

134
            // search for older neighbors
135
            if min_age > 1 {
37,263,171✔
136
                min_age = 1;
18,631,581✔
137
            } else if min_age <= 1 {
18,631,590✔
138
                break;
18,631,590✔
139
            }
×
140
        }
141
        Ok(found)
18,631,590✔
142
    }
18,631,590✔
143

144
    /// Calculate the new set of replicas to contact.
145
    /// This is the same as the set that was connected on the last sync, plus any
146
    /// config hints and discovered nodes from the DB.
147
    fn find_new_replicas(
12,119,940✔
148
        &self,
12,119,940✔
149
        mut connected_replicas: HashSet<NeighborAddress>,
12,119,940✔
150
        network: Option<&PeerNetwork>,
12,119,940✔
151
        config: &StackerDBConfig,
12,119,940✔
152
    ) -> Result<HashSet<NeighborAddress>, net_error> {
12,119,940✔
153
        // keep all connected replicas, and replenish from config hints and the DB as needed
154
        let mut peers = config.hint_replicas.clone();
12,119,940✔
155
        if let Some(network) = network {
12,119,940✔
156
            let extra_peers = self.find_qualified_replicas(network)?;
12,058,880✔
157
            peers.extend(extra_peers);
12,058,880✔
158
        }
61,060✔
159

160
        peers.shuffle(&mut thread_rng());
12,119,940✔
161

162
        for peer in peers {
12,119,940✔
163
            if connected_replicas.len() >= config.max_neighbors {
4,293,575✔
164
                break;
18✔
165
            }
2,297,781✔
166
            connected_replicas.insert(peer);
2,297,781✔
167
        }
168
        Ok(connected_replicas)
12,119,940✔
169
    }
12,119,940✔
170

171
    /// Reset this state machine, and get the StackerDBSyncResult with newly-obtained chunk data
172
    /// and newly-learned information about connection statistics
173
    pub fn reset(
12,119,940✔
174
        &mut self,
12,119,940✔
175
        network: Option<&PeerNetwork>,
12,119,940✔
176
        config: &StackerDBConfig,
12,119,940✔
177
    ) -> StackerDBSyncResult {
12,119,940✔
178
        debug!(
12,119,940✔
179
            "{}: Reset with config {:?}",
180
            &self.smart_contract_id, config
×
181
        );
182
        let mut chunks = vec![];
12,119,940✔
183
        let downloaded_chunks = mem::replace(&mut self.downloaded_chunks, HashMap::new());
12,119,940✔
184
        for (_, mut data) in downloaded_chunks.into_iter() {
12,119,940✔
185
            chunks.append(&mut data);
48,952✔
186
        }
48,952✔
187

188
        let chunk_invs = mem::replace(&mut self.chunk_invs, HashMap::new());
12,119,940✔
189
        let result = StackerDBSyncResult {
12,119,940✔
190
            contract_id: self.smart_contract_id.clone(),
12,119,940✔
191
            chunk_invs,
12,119,940✔
192
            chunks_to_store: chunks,
12,119,940✔
193
            stale: std::mem::replace(&mut self.stale_neighbors, HashSet::new()),
12,119,940✔
194
            num_connections: self.num_connections,
12,119,940✔
195
            num_attempted_connections: self.num_attempted_connections,
12,119,940✔
196
        };
12,119,940✔
197

198
        // keep all connected replicas, and replenish from config hints and the DB as needed
199
        let connected_replicas = mem::replace(&mut self.connected_replicas, HashSet::new());
12,119,940✔
200
        let next_connected_replicas =
12,119,940✔
201
            if let Ok(new_replicas) = self.find_new_replicas(connected_replicas, network, config) {
12,119,940✔
202
                new_replicas
12,119,940✔
203
            } else {
204
                self.replicas.clone()
×
205
            };
206

207
        self.replicas = next_connected_replicas;
12,119,940✔
208

209
        self.chunk_fetch_priorities.clear();
12,119,940✔
210
        self.chunk_push_priorities.clear();
12,119,940✔
211
        self.next_chunk_fetch_priority = 0;
12,119,940✔
212
        self.next_chunk_push_priority = 0;
12,119,940✔
213
        self.chunk_push_receipts.clear();
12,119,940✔
214
        self.expected_versions.clear();
12,119,940✔
215
        self.downloaded_chunks.clear();
12,119,940✔
216

217
        // reset comms, but keep all connected replicas pinned.
218
        // Randomly evict one every so often.
219
        self.comms.reset();
12,119,940✔
220
        if let Some(network) = network {
12,119,940✔
221
            let mut eviction_index = None;
12,058,880✔
222
            if self.last_eviction_time + 60 < get_epoch_time_secs() {
12,058,880✔
223
                self.last_eviction_time = get_epoch_time_secs();
145,548✔
224
                if !self.replicas.is_empty() {
145,548✔
225
                    eviction_index = Some(thread_rng().gen_range(0..self.replicas.len()));
55,638✔
226
                }
135,099✔
227
            }
11,913,332✔
228

229
            let remove_naddr = eviction_index.and_then(|idx| {
12,058,880✔
230
                let removed = self.replicas.iter().nth(idx).cloned();
55,638✔
231
                if let Some(naddr) = removed.as_ref() {
55,638✔
232
                    debug!(
55,638✔
233
                        "{:?}: {}: don't reuse connection for replica {:?}",
234
                        network.get_local_peer(),
×
235
                        &self.smart_contract_id,
×
236
                        &naddr,
×
237
                    );
238
                }
×
239
                removed
55,638✔
240
            });
55,638✔
241

242
            if let Some(naddr) = remove_naddr {
12,058,880✔
243
                self.replicas.remove(&naddr);
55,656✔
244
            }
12,003,224✔
245

246
            // retain the remaining replica connections
247
            for naddr in self.replicas.iter() {
5,378,807✔
248
                if let Some(event_id) = network.get_event_id(&naddr.to_neighbor_key(network)) {
2,270,286✔
249
                    self.comms.pin_connection(event_id);
2,264,148✔
250
                    debug!(
2,264,148✔
251
                        "{:?}: {}: reuse connection for replica {:?} on event {}",
252
                        network.get_local_peer(),
×
253
                        &self.smart_contract_id,
×
254
                        &naddr,
×
255
                        event_id
256
                    );
257
                }
40,572✔
258
            }
259
        }
61,060✔
260

261
        // reload from config
262
        self.num_slots = config.num_slots() as usize;
12,119,940✔
263
        self.write_freq = config.write_freq;
12,119,940✔
264

265
        self.need_resync = false;
12,119,940✔
266
        self.stale_inv = false;
12,119,940✔
267
        self.last_run_ts = get_epoch_time_secs();
12,119,940✔
268

269
        self.state = StackerDBSyncState::ConnectBegin;
12,119,940✔
270
        self.num_connections = 0;
12,119,940✔
271
        self.num_attempted_connections = 0;
12,119,940✔
272
        self.rounds += 1;
12,119,940✔
273
        self.rc_consensus_hash = None;
12,119,940✔
274
        result
12,119,940✔
275
    }
12,119,940✔
276

277
    /// Get the set of connection IDs in use
278
    pub fn get_pinned_connections(&self) -> &HashSet<usize> {
84,418,034✔
279
        self.comms.get_pinned_connections()
84,418,034✔
280
    }
84,418,034✔
281

282
    /// Unpin and remove a connected replica by naddr
283
    pub fn unpin_connected_replica(&mut self, network: &PeerNetwork, naddr: &NeighborAddress) {
288✔
284
        let nk = naddr.to_neighbor_key(network);
288✔
285
        if let Some(event_id) = network.get_event_id(&nk) {
288✔
286
            self.comms.unpin_connection(event_id);
54✔
287
        }
288✔
288
        self.connected_replicas.remove(naddr);
288✔
289
    }
288✔
290

291
    /// Make a chunk inv request
292
    pub fn make_getchunkinv(&self, rc_consensus_hash: &ConsensusHash) -> StacksMessageType {
3,852,970✔
293
        StacksMessageType::StackerDBGetChunkInv(StackerDBGetChunkInvData {
3,852,970✔
294
            contract_id: self.smart_contract_id.clone(),
3,852,970✔
295
            rc_consensus_hash: rc_consensus_hash.clone(),
3,852,970✔
296
        })
3,852,970✔
297
    }
3,852,970✔
298

299
    /// Given the downloaded set of chunk inventories, identify:
300
    /// * which chunks we need to fetch, because they're newer than ours.
301
    /// * what order to fetch chunks in, in rarest-first order
302
    /// Returns a list of (chunk requests, list of neighbors that can service them), which is
303
    /// ordered from rarest chunk to most-common chunk.
304
    pub fn make_chunk_request_schedule(
3,833,384✔
305
        &self,
3,833,384✔
306
        network: &PeerNetwork,
3,833,384✔
307
        local_slot_versions_opt: Option<Vec<u32>>,
3,833,384✔
308
    ) -> Result<Vec<(StackerDBGetChunkData, Vec<NeighborAddress>)>, net_error> {
3,833,384✔
309
        let rc_consensus_hash = network.get_chain_view().rc_consensus_hash.clone();
3,833,384✔
310
        let local_slot_versions = if let Some(local_slot_versions) = local_slot_versions_opt {
3,833,384✔
311
            local_slot_versions
58,190✔
312
        } else {
313
            self.stackerdbs.get_slot_versions(&self.smart_contract_id)?
3,775,194✔
314
        };
315

316
        let local_write_timestamps = self
3,833,384✔
317
            .stackerdbs
3,833,384✔
318
            .get_slot_write_timestamps(&self.smart_contract_id)?;
3,833,384✔
319

320
        if local_slot_versions.len() != local_write_timestamps.len() {
3,833,384✔
321
            let msg = format!("{}: Local slot versions ({}) out of sync with DB slot versions ({}); abandoning sync and trying again", &self.smart_contract_id, local_slot_versions.len(), local_write_timestamps.len());
×
322
            warn!("{}", &msg);
×
323
            return Err(net_error::Transient(msg));
×
324
        }
3,833,384✔
325

326
        let mut need_chunks: HashMap<usize, (StackerDBGetChunkData, Vec<NeighborAddress>)> =
3,833,384✔
327
            HashMap::new();
3,833,384✔
328
        let now = get_epoch_time_secs();
3,833,384✔
329

330
        // who has data we need?
331
        for ((i, local_version), write_ts) in local_slot_versions
9,493,394✔
332
            .iter()
3,833,384✔
333
            .enumerate()
3,833,384✔
334
            .zip(local_write_timestamps.iter())
3,833,384✔
335
        {
336
            if self.write_freq > 0 && write_ts + self.write_freq > now {
9,381,623✔
337
                debug!(
×
338
                    "{:?}: {}: Chunk {} was written too frequently ({} + {} > {}) in {}, so will not fetch chunk",
339
                    network.get_local_peer(),
×
340
                    &self.smart_contract_id,
×
341
                    i,
342
                    write_ts,
343
                    self.write_freq,
344
                    now,
345
                    &self.smart_contract_id,
×
346
                );
347
                continue;
×
348
            }
9,381,623✔
349

350
            for (naddr, chunk_inv) in self.chunk_invs.iter() {
9,381,693✔
351
                if chunk_inv.slot_versions.len() != local_slot_versions.len() {
8,251,973✔
352
                    // remote peer and our DB are out of sync, so just skip this
353
                    continue;
468✔
354
                }
8,251,505✔
355

356
                let Some(remote_version) = chunk_inv.slot_versions.get(i) else {
8,251,505✔
357
                    // remote peer isn't tracking this chunk
358
                    continue;
×
359
                };
360

361
                if local_version >= remote_version {
8,251,505✔
362
                    // remote peer has same view as local peer, or stale
363
                    continue;
8,172,571✔
364
                }
78,934✔
365

366
                let (request, available) = if let Some(x) = need_chunks.get_mut(&i) {
78,934✔
367
                    // someone has this chunk already
368
                    x
×
369
                } else {
370
                    // haven't seen anyone with this data yet.
371
                    // Add a record for it
372
                    need_chunks.insert(
78,934✔
373
                        i,
78,934✔
374
                        (
78,934✔
375
                            StackerDBGetChunkData {
78,934✔
376
                                contract_id: self.smart_contract_id.clone(),
78,934✔
377
                                rc_consensus_hash: rc_consensus_hash.clone(),
78,934✔
378
                                slot_id: i as u32,
78,934✔
379
                                slot_version: *remote_version,
78,934✔
380
                            },
78,934✔
381
                            vec![naddr.clone()],
78,934✔
382
                        ),
78,934✔
383
                    );
384
                    continue;
78,934✔
385
                };
386

387
                if request.slot_version < *remote_version {
×
388
                    // this peer has a newer view
×
389
                    available.clear();
×
390
                    available.push(naddr.clone());
×
391
                    *request = StackerDBGetChunkData {
×
392
                        contract_id: self.smart_contract_id.clone(),
×
393
                        rc_consensus_hash: rc_consensus_hash.clone(),
×
394
                        slot_id: i as u32,
×
395
                        slot_version: *remote_version,
×
396
                    };
×
397
                } else if request.slot_version == *remote_version {
×
398
                    // this peer has the same view as a prior peer.
×
399
                    // just track how many times we see this
×
400
                    available.push(naddr.clone());
×
401
                }
×
402
            }
403
        }
404

405
        // prioritize requests by rarest-chunk-first order, but choose neighbors in random order
406
        let mut schedule: Vec<_> = need_chunks
3,833,384✔
407
            .into_iter()
3,833,384✔
408
            .map(|(_, (stackerdb_getchunkdata, mut neighbors))| {
3,833,391✔
409
                neighbors.shuffle(&mut thread_rng());
78,934✔
410
                (stackerdb_getchunkdata, neighbors)
78,934✔
411
            })
78,934✔
412
            .collect();
3,833,384✔
413

414
        schedule.sort_by(|item_1, item_2| item_1.1.len().cmp(&item_2.1.len()));
3,833,390✔
415
        schedule.reverse();
3,833,384✔
416

417
        debug!(
3,833,384✔
418
            "{:?}: {}: Will request up to {} chunks. Schedule: {:?}",
419
            network.get_local_peer(),
×
420
            &self.smart_contract_id,
×
421
            &schedule.len(),
×
422
            &schedule
×
423
        );
424
        Ok(schedule)
3,833,384✔
425
    }
3,833,384✔
426

427
    /// Given the downloaded set of chunk inventories, identify:
428
    /// * which chunks we need to push, because we have them and the neighbor does not
429
    /// * what order to push them in, in rarest-first order
430
    pub fn make_chunk_push_schedule(
3,774,282✔
431
        &self,
3,774,282✔
432
        network: &PeerNetwork,
3,774,282✔
433
    ) -> Result<Vec<(StackerDBPushChunkData, Vec<NeighborAddress>)>, net_error> {
3,774,282✔
434
        let rc_consensus_hash = network.get_chain_view().rc_consensus_hash.clone();
3,774,282✔
435
        let local_slot_versions = self.stackerdbs.get_slot_versions(&self.smart_contract_id)?;
3,774,282✔
436

437
        let mut need_chunks: HashMap<usize, (StackerDBPushChunkData, Vec<NeighborAddress>)> =
3,774,282✔
438
            HashMap::new();
3,774,282✔
439

440
        // who needs data we can serve?
441
        for (i, local_version) in local_slot_versions.iter().enumerate() {
9,205,746✔
442
            let mut local_chunk = None;
9,088,710✔
443
            for (naddr, chunk_inv) in self.chunk_invs.iter() {
9,088,750✔
444
                if chunk_inv.slot_versions.len() != local_slot_versions.len() {
7,959,030✔
445
                    // remote peer and our DB are out of sync, so just skip this
446
                    continue;
468✔
447
                }
7,958,562✔
448

449
                let Some(remote_version) = chunk_inv.slot_versions.get(i) else {
7,958,562✔
450
                    // remote peer isn't tracking this chunk
451
                    continue;
×
452
                };
453

454
                if local_version <= remote_version {
7,958,562✔
455
                    // remote peer has same or newer view than local peer
456
                    continue;
7,856,237✔
457
                }
102,325✔
458

459
                if local_chunk.is_none() {
102,325✔
460
                    let chunk_data = if let Some(chunk_data) = self.stackerdbs.get_chunk(
102,325✔
461
                        &self.smart_contract_id,
102,325✔
462
                        i as u32,
102,325✔
463
                        *local_version,
102,325✔
464
                    )? {
×
465
                        chunk_data
102,325✔
466
                    } else {
467
                        // we don't have this chunk
468
                        break;
×
469
                    };
470
                    local_chunk = Some(StackerDBPushChunkData {
102,325✔
471
                        contract_id: self.smart_contract_id.clone(),
102,325✔
472
                        rc_consensus_hash: rc_consensus_hash.clone(),
102,325✔
473
                        chunk_data,
102,325✔
474
                    });
102,325✔
UNCOV
475
                }
×
476

477
                let our_chunk = if let Some(chunk) = local_chunk.as_ref() {
102,325✔
478
                    chunk
102,325✔
479
                } else {
480
                    // we don't have this chunk
481
                    break;
×
482
                };
483

484
                // replicate with probability 1/num-outbound-replicas
485
                let do_replicate = if chunk_inv.num_outbound_replicas == 0 {
102,325✔
486
                    true
23,544✔
487
                } else {
488
                    thread_rng().gen::<u32>() % chunk_inv.num_outbound_replicas == 0
78,781✔
489
                };
490

491
                debug!(
102,325✔
492
                    "{:?}: {}: Can push chunk StackerDBChunk(id={},ver={}) to {}. Replicate? {}",
493
                    &network.get_local_peer(),
×
494
                    &self.smart_contract_id,
×
495
                    our_chunk.chunk_data.slot_id,
496
                    our_chunk.chunk_data.slot_version,
497
                    &naddr,
×
498
                    do_replicate
499
                );
500

501
                if !do_replicate {
102,325✔
502
                    continue;
15✔
503
                }
102,310✔
504

505
                if let Some((_, receivers)) = need_chunks.get_mut(&i) {
102,310✔
UNCOV
506
                    // someone needs this chunk already
×
UNCOV
507
                    receivers.push(naddr.clone());
×
508
                } else {
102,310✔
509
                    // haven't seen anyone that needs this data yet.
102,310✔
510
                    // Add a record for it.
102,310✔
511
                    need_chunks.insert(i, (our_chunk.clone(), vec![naddr.clone()]));
102,310✔
512
                };
102,310✔
513
            }
514
        }
515

516
        // prioritize requests by rarest-chunk-first order.
517
        // no need to randomize; we'll pick recipients at random
518
        let mut schedule: Vec<_> = need_chunks
3,774,282✔
519
            .into_iter()
3,774,282✔
520
            .map(|(_, (stackerdb_chunkdata, neighbors))| (stackerdb_chunkdata, neighbors))
3,774,305✔
521
            .collect();
3,774,282✔
522

523
        schedule.sort_by(|item_1, item_2| item_1.1.len().cmp(&item_2.1.len()));
3,774,299✔
524
        debug!(
3,774,282✔
525
            "{:?}: {}: Will push up to {} chunks",
526
            network.get_local_peer(),
×
527
            &self.smart_contract_id,
×
528
            &schedule.len(),
×
529
        );
530
        Ok(schedule)
3,774,282✔
531
    }
3,774,282✔
532

533
    /// Validate a downloaded chunk
534
    pub fn validate_downloaded_chunk(
77,845✔
535
        &self,
77,845✔
536
        network: &PeerNetwork,
77,845✔
537
        config: &StackerDBConfig,
77,845✔
538
        data: &StackerDBChunkData,
77,845✔
539
    ) -> Result<bool, net_error> {
77,845✔
540
        // validate -- must be a valid chunk
541
        if !network.validate_received_chunk(
77,845✔
542
            &self.smart_contract_id,
77,845✔
543
            config,
77,845✔
544
            data,
77,845✔
545
            &self.expected_versions,
77,845✔
546
        )? {
×
547
            return Ok(false);
54✔
548
        }
77,791✔
549

550
        // no need to validate the timestamp, because we already skipped requesting it if it was
551
        // written too recently.
552

553
        Ok(true)
77,791✔
554
    }
77,845✔
555

556
    /// Store a downloaded chunk to RAM, and update bookkeeping
557
    pub fn add_downloaded_chunk(&mut self, naddr: NeighborAddress, data: StackerDBChunkData) {
77,791✔
558
        let slot_id = data.slot_id;
77,791✔
559
        let _slot_version = data.slot_version;
77,791✔
560

561
        if let Some(data_list) = self.downloaded_chunks.get_mut(&naddr) {
77,791✔
562
            data_list.push(data);
28,810✔
563
        } else {
49,943✔
564
            self.downloaded_chunks.insert(naddr.clone(), vec![data]);
48,981✔
565
        }
48,981✔
566

567
        self.chunk_fetch_priorities
77,791✔
568
            .retain(|(chunk, ..)| chunk.slot_id != slot_id);
117,021✔
569

570
        if !self.chunk_fetch_priorities.is_empty() {
77,791✔
571
            let next_chunk_fetch_priority =
29,315✔
572
                self.next_chunk_fetch_priority % self.chunk_fetch_priorities.len();
29,315✔
573
            self.next_chunk_fetch_priority = next_chunk_fetch_priority;
29,315✔
574
        }
50,124✔
575

576
        self.total_stored += 1;
77,791✔
577
    }
77,791✔
578

579
    /// Update bookkeeping about which chunks we have pushed.
580
    /// Stores the new chunk inventory to RAM.
581
    /// Returns true if the inventory changed (indicating that we need to resync)
582
    /// Returns false otherwise
583
    pub fn add_pushed_chunk(
58,190✔
584
        &mut self,
58,190✔
585
        _network: &PeerNetwork,
58,190✔
586
        naddr: NeighborAddress,
58,190✔
587
        new_inv: StackerDBChunkInvData,
58,190✔
588
        slot_id: u32,
58,190✔
589
    ) -> bool {
58,190✔
590
        // safety (should already be checked) -- don't accept if the size is wrong
591
        if new_inv.slot_versions.len() != self.num_slots {
58,190✔
592
            return false;
×
593
        }
58,190✔
594

595
        let need_resync = if let Some(old_inv) = self.chunk_invs.get(&naddr) {
58,190✔
596
            let mut resync = false;
58,190✔
597
            for (old_slot_id, (old_version, new_version)) in old_inv
143,121✔
598
                .slot_versions
58,190✔
599
                .iter()
58,190✔
600
                .zip(new_inv.slot_versions.iter())
58,190✔
601
                .enumerate()
58,190✔
602
            {
603
                if old_version < new_version {
143,121✔
604
                    // remote peer indicated that it has a newer version of this chunk.
605
                    debug!(
58,190✔
606
                        "{:?}: {}: peer {:?} has a newer version of slot {} ({} < {})",
607
                        _network.get_local_peer(),
×
608
                        &self.smart_contract_id,
×
609
                        &naddr,
×
610
                        old_slot_id,
611
                        old_version,
612
                        new_version,
613
                    );
614
                    resync = true;
58,190✔
615
                    break;
58,190✔
616
                }
84,931✔
617
            }
618
            resync
58,190✔
619
        } else {
620
            false
×
621
        };
622

623
        self.chunk_invs.insert(naddr, new_inv);
58,190✔
624

625
        self.chunk_push_priorities
58,190✔
626
            .retain(|(chunk, ..)| chunk.chunk_data.slot_id != slot_id);
101,713✔
627

628
        if !self.chunk_push_priorities.is_empty() {
58,190✔
629
            let next_chunk_push_priority =
31,982✔
630
                self.next_chunk_push_priority % self.chunk_push_priorities.len();
31,982✔
631
            self.next_chunk_push_priority = next_chunk_push_priority;
31,982✔
632
        }
35,789✔
633

634
        self.total_pushed += 1;
58,190✔
635
        need_resync
58,190✔
636
    }
58,190✔
637

638
    /// Ask inbound neighbors who replicate this DB for their chunk inventories.
639
    /// Don't send them a message if they're also outbound.
640
    /// Logs errors but does not return them.
641
    fn send_getchunkinv_to_inbound_neighbors(
3,831,005✔
642
        &mut self,
3,831,005✔
643
        network: &mut PeerNetwork,
3,831,005✔
644
        already_sent: &[NeighborAddress],
3,831,005✔
645
    ) {
3,831,005✔
646
        let sent_naddr_set: HashSet<_> = already_sent.iter().collect();
3,831,005✔
647
        let mut to_send = vec![];
3,831,005✔
648
        for event_id in network.iter_peer_event_ids() {
7,265,359✔
649
            let convo = if let Some(c) = network.get_p2p_convo(*event_id) {
7,265,359✔
650
                c
7,265,359✔
651
            } else {
652
                continue;
×
653
            };
654

655
            // only want inbound peers that replicate this DB
656
            if convo.is_outbound() {
7,265,359✔
657
                continue;
3,830,768✔
658
            }
3,434,591✔
659
            if !convo.replicates_stackerdb(&self.smart_contract_id) {
3,434,591✔
660
                continue;
2,332,872✔
661
            }
1,101,719✔
662

663
            let naddr = convo.to_neighbor_address();
1,101,719✔
664
            if sent_naddr_set.contains(&naddr) {
1,101,719✔
665
                continue;
21,519✔
666
            }
1,080,200✔
667

668
            let has_reciprocal_outbound = network
1,080,200✔
669
                .get_pubkey_events(&naddr.public_key_hash)
1,080,200✔
670
                .iter()
1,080,200✔
671
                .find(|event_id| {
1,726,798✔
672
                    if let Some(convo) = network.get_p2p_convo(**event_id) {
1,726,798✔
673
                        if !convo.is_outbound() {
1,726,798✔
674
                            return false;
646,598✔
675
                        }
1,080,200✔
676
                        let other_naddr = convo.to_neighbor_address();
1,080,200✔
677
                        if sent_naddr_set.contains(&other_naddr) {
1,080,200✔
678
                            return true;
1,074,926✔
679
                        }
5,274✔
680
                    }
×
681
                    return false;
5,274✔
682
                })
1,726,798✔
683
                .is_some();
1,080,200✔
684

685
            if has_reciprocal_outbound {
1,080,200✔
686
                // this inbound neighbor is also connected to us as an outbound neighbor, and we
687
                // already sent it a getchunkinv request
688
                continue;
1,074,926✔
689
            }
5,274✔
690

691
            let chunks_req = self.make_getchunkinv(&network.get_chain_view().rc_consensus_hash);
5,274✔
692
            to_send.push((naddr, chunks_req));
5,274✔
693
        }
694

695
        for (naddr, chunks_req) in to_send.into_iter() {
3,831,005✔
696
            debug!("{:?}: {}: send_getchunksinv_to_inbound_neighbors: Send StackerDBGetChunkInv at {} to inbound {:?}", network.get_local_peer(), &self.smart_contract_id, &network.get_chain_view().rc_consensus_hash, &naddr);
5,274✔
697
            if let Err(_e) = self.comms.neighbor_send(network, &naddr, chunks_req) {
5,274✔
698
                info!(
×
699
                    "{:?}: {}: Failed to send StackerDBGetChunkInv to inbound {:?}: {:?}",
700
                    network.get_local_peer(),
×
701
                    &self.smart_contract_id,
×
702
                    &naddr,
×
703
                    &_e
×
704
                );
705
            }
5,274✔
706
        }
707
    }
3,831,005✔
708

709
    /// Establish sessions with remote replicas.
710
    /// We might not be connected to any yet.
711
    /// Clears self.replicas, and fills in self.connected_replicas with already-connected neighbors
712
    /// Returns Ok(true) if we can proceed to sync
713
    /// Returns Ok(false) if we should try this again
714
    /// Returns Err(NoSuchNeighbor) if we don't have anyone to talk to
715
    /// Returns Err(..) on DB query error
716
    pub fn connect_begin(&mut self, network: &mut PeerNetwork) -> Result<bool, net_error> {
10,357,402✔
717
        if self.replicas.is_empty() {
10,357,402✔
718
            // find some from the peer DB
719
            let replicas = self.find_qualified_replicas(network)?;
6,572,710✔
720
            self.replicas = replicas;
6,572,710✔
721
        }
3,784,692✔
722
        debug!(
10,357,402✔
723
            "{:?}: {}: connect_begin: establish StackerDB sessions to {} neighbors (out of {} p2p peers)",
724
            network.get_local_peer(),
×
725
            &self.smart_contract_id,
×
726
            self.replicas.len(),
×
727
            network.get_num_p2p_convos();
×
728
            "replicas" => ?self.replicas
729
        );
730
        if self.replicas.is_empty() {
10,357,402✔
731
            // nothing to do
732
            return Err(net_error::NoSuchNeighbor);
6,503,442✔
733
        }
3,853,960✔
734

735
        let naddrs = mem::replace(&mut self.replicas, HashSet::new());
3,853,960✔
736
        for naddr in naddrs.into_iter() {
3,875,745✔
737
            if self.comms.is_neighbor_connecting(network, &naddr) {
3,875,745✔
738
                debug!(
21,429✔
739
                    "{:?}: {}: connect_begin: already connecting to StackerDB peer {:?}",
740
                    network.get_local_peer(),
×
741
                    &self.smart_contract_id,
×
742
                    &naddr
×
743
                );
744
                self.replicas.insert(naddr);
21,429✔
745
                continue;
21,429✔
746
            }
3,854,316✔
747
            if self.comms.has_neighbor_session(network, &naddr) {
3,854,316✔
748
                debug!(
3,813,366✔
749
                    "{:?}: {}: connect_begin: already connected to StackerDB peer {:?}",
750
                    network.get_local_peer(),
×
751
                    &self.smart_contract_id,
×
752
                    &naddr
×
753
                );
754
                self.connected_replicas.insert(naddr);
3,813,366✔
755
                continue;
3,813,366✔
756
            }
40,950✔
757

758
            debug!(
40,950✔
759
                "{:?}: {}: connect_begin: Send Handshake to StackerDB peer {:?}",
760
                network.get_local_peer(),
×
761
                &self.smart_contract_id,
×
762
                &naddr
×
763
            );
764
            match self.comms.neighbor_session_begin(network, &naddr) {
40,950✔
765
                Ok(true) => {
766
                    // connected!
767
                    debug!(
39,789✔
768
                        "{:?}: {}: connect_begin: connected to StackerDB peer {:?}",
769
                        network.get_local_peer(),
×
770
                        &self.smart_contract_id,
×
771
                        &naddr
×
772
                    );
773
                    self.num_attempted_connections += 1;
39,789✔
774
                    self.num_connections += 1;
39,789✔
775
                    self.connected_replicas.insert(naddr);
39,789✔
776
                }
777
                Ok(false) => {
1,062✔
778
                    // need to retry
1,062✔
779
                    self.num_attempted_connections += 1;
1,062✔
780
                    self.replicas.insert(naddr);
1,062✔
781
                }
1,062✔
782
                Err(_e) => {
99✔
783
                    debug!(
99✔
784
                        "{:?}: {}: Failed to begin session with {:?}: {:?}",
785
                        &network.get_local_peer(),
×
786
                        &self.smart_contract_id,
×
787
                        &naddr,
×
788
                        &_e
×
789
                    );
790
                }
791
            }
792
        }
793
        Ok(!self.connected_replicas.is_empty())
3,853,960✔
794
    }
10,357,402✔
795

796
    /// Finish up connecting to our replicas.
797
    /// Fills in self.connected_replicas based on receipt of a handshake accept.
798
    /// Returns true if we've received all pending messages
799
    /// Returns false otherwise
800
    pub fn connect_try_finish(&mut self, network: &mut PeerNetwork) -> Result<bool, net_error> {
4,029,617✔
801
        for (naddr, message) in self.comms.collect_replies(network).into_iter() {
4,029,617✔
802
            let data = match message.payload {
39,555✔
803
                StacksMessageType::StackerDBHandshakeAccept(_, db_data) => {
39,555✔
804
                    if network.get_chain_view().rc_consensus_hash != db_data.rc_consensus_hash {
39,555✔
805
                        // stale or inconsistent view. Do not proceed
806
                        debug!(
4,527✔
807
                            "{:?}: {}: remote peer {:?} has stale view ({} != {})",
808
                            network.get_local_peer(),
×
809
                            &self.smart_contract_id,
×
810
                            &naddr,
×
811
                            &network.get_chain_view().rc_consensus_hash,
×
812
                            &db_data.rc_consensus_hash
×
813
                        );
814
                        // don't unpin, since it's usually transient
815
                        self.connected_replicas.remove(&naddr);
4,527✔
816
                        continue;
4,527✔
817
                    }
35,028✔
818
                    db_data
35,028✔
819
                }
820
                StacksMessageType::Nack(data) => {
×
821
                    debug!(
×
822
                        "{:?}: {}: remote peer {:?} NACK'ed our StackerDBHandshake with code {}",
823
                        &network.get_local_peer(),
×
824
                        &self.smart_contract_id,
×
825
                        &naddr,
×
826
                        data.error_code
827
                    );
828
                    if data.error_code == NackErrorCodes::StaleView
×
829
                        || data.error_code == NackErrorCodes::FutureView
×
830
                    {
×
831
                        self.connected_replicas.remove(&naddr);
×
832
                        self.stale_neighbors.insert(naddr);
×
833
                    } else {
×
834
                        self.unpin_connected_replica(network, &naddr);
×
835
                    }
×
836
                    continue;
×
837
                }
838
                x => {
×
839
                    info!(
×
840
                        "{:?}: {}: Received unexpected message {:?}",
841
                        &network.get_local_peer(),
×
842
                        &self.smart_contract_id,
×
843
                        &x
×
844
                    );
845
                    continue;
×
846
                }
847
            };
848

849
            if data
35,028✔
850
                .smart_contracts
35,028✔
851
                .iter()
35,028✔
852
                .find(|db_id| *db_id == &self.smart_contract_id)
494,352✔
853
                .is_none()
35,028✔
854
            {
855
                debug!(
234✔
856
                    "{:?}: {}: remote peer does not replicate",
857
                    network.get_local_peer(),
×
858
                    &self.smart_contract_id
×
859
                );
860

861
                // disconnect
862
                self.unpin_connected_replica(network, &naddr);
234✔
863
                continue;
234✔
864
            }
34,794✔
865

866
            debug!(
34,794✔
867
                "{:?}: {}: connect_try_finish: Received StackerDBHandshakeAccept from {:?} for {:?}",
868
                network.get_local_peer(),
×
869
                &self.smart_contract_id,
×
870
                &naddr,
×
871
                &data
×
872
            );
873

874
            // this neighbor is good
875
            self.connected_replicas.insert(naddr);
34,794✔
876
        }
877

878
        if self.comms.count_inflight() > 0 {
4,029,617✔
879
            // still blocked
880
            return Ok(false);
198,243✔
881
        }
3,831,374✔
882

883
        if self.connected_replicas.is_empty() {
3,831,374✔
884
            // no one to talk to
885
            debug!(
369✔
886
                "{:?}: {}: connect_try_finish: no valid replicas",
887
                &self.smart_contract_id,
×
888
                network.get_local_peer()
×
889
            );
890
            return Err(net_error::PeerNotConnected(format!(
369✔
891
                "StackerDB connect_try_finish: no valid replicas for {}",
369✔
892
                &self.smart_contract_id
369✔
893
            )));
369✔
894
        }
3,831,005✔
895

896
        Ok(true)
3,831,005✔
897
    }
4,029,617✔
898

899
    /// Ask each replica for its chunk inventories.
900
    /// Also ask each inbound neighbor.
901
    /// Clears self.connected_replicas.
902
    /// StackerDBGetChunksInv
903
    /// Always succeeds; does not block.
904
    pub fn getchunksinv_begin(&mut self, network: &mut PeerNetwork) {
3,831,005✔
905
        let naddrs = mem::replace(&mut self.connected_replicas, HashSet::new());
3,831,005✔
906
        let mut already_sent = vec![];
3,831,005✔
907
        debug!(
3,831,005✔
908
            "{:?}: {}: getchunksinv_begin: Send StackerDBGetChunksInv to {} replicas",
909
            network.get_local_peer(),
×
910
            &self.smart_contract_id,
×
911
            naddrs.len();
×
912
            "connected_replicas" => ?naddrs,
913
        );
914
        for naddr in naddrs.into_iter() {
3,847,696✔
915
            debug!(
3,847,696✔
916
                "{:?}: {}: getchunksinv_begin: Send StackerDBGetChunksInv at {} to {:?}",
917
                network.get_local_peer(),
×
918
                &self.smart_contract_id,
×
919
                &network.get_chain_view().rc_consensus_hash,
×
920
                &naddr,
×
921
            );
922
            let chunks_req = self.make_getchunkinv(&network.get_chain_view().rc_consensus_hash);
3,847,696✔
923
            if let Err(e) = self.comms.neighbor_send(network, &naddr, chunks_req) {
3,847,696✔
924
                debug!(
22,203✔
925
                    "{:?}: {}: failed to send StackerDBGetChunkInv to {:?}: {:?}",
926
                    network.get_local_peer(),
×
927
                    &self.smart_contract_id,
×
928
                    &naddr,
×
929
                    &e
×
930
                );
931
                continue;
22,203✔
932
            }
3,825,493✔
933
            already_sent.push(naddr);
3,825,493✔
934
        }
935
        self.send_getchunkinv_to_inbound_neighbors(network, &already_sent);
3,831,005✔
936
    }
3,831,005✔
937

938
    /// Collect each chunk inventory request.
939
    /// Restores self.connected_replicas based on messages received.
940
    /// Return Ok(true) if we've received all pending messages
941
    /// Return Ok(false) if not
942
    pub fn getchunksinv_try_finish(
17,165,514✔
943
        &mut self,
17,165,514✔
944
        network: &mut PeerNetwork,
17,165,514✔
945
    ) -> Result<bool, net_error> {
17,165,514✔
946
        for (naddr, message) in self.comms.collect_replies(network).into_iter() {
17,165,514✔
947
            let chunk_inv_opt = match message.payload {
3,763,310✔
948
                StacksMessageType::StackerDBChunkInv(data) => {
3,325,782✔
949
                    if data.slot_versions.len() != self.num_slots {
3,325,782✔
950
                        info!("{:?}: {}: Received malformed StackerDBChunkInv from {:?}: expected {} chunks, got {}", network.get_local_peer(), &self.smart_contract_id, &naddr, self.num_slots, data.slot_versions.len());
2,486✔
951
                        None
2,486✔
952
                    } else {
953
                        Some(data)
3,323,296✔
954
                    }
955
                }
956
                StacksMessageType::Nack(data) => {
437,528✔
957
                    debug!(
437,528✔
958
                        "{:?}: {}: remote peer {:?} NACK'ed our StackerDBGetChunksInv with code {}",
959
                        network.get_local_peer(),
×
960
                        &self.smart_contract_id,
×
961
                        &naddr,
×
962
                        data.error_code
963
                    );
964
                    if data.error_code == NackErrorCodes::StaleView
437,528✔
965
                        || data.error_code == NackErrorCodes::FutureView
140,365✔
966
                    {
437,528✔
967
                        self.connected_replicas.remove(&naddr);
437,528✔
968
                        self.stale_neighbors.insert(naddr);
437,528✔
969
                    } else {
437,528✔
970
                        self.unpin_connected_replica(network, &naddr);
×
971
                    }
×
972
                    continue;
437,528✔
973
                }
974
                x => {
×
975
                    info!(
×
976
                        "{:?}: {}: Received unexpected message {:?}",
977
                        network.get_local_peer(),
×
978
                        &self.smart_contract_id,
×
979
                        &x
×
980
                    );
981
                    self.unpin_connected_replica(network, &naddr);
×
982
                    continue;
×
983
                }
984
            };
985
            debug!(
3,325,782✔
986
                "{:?}: {}: getchunksinv_try_finish: Received StackerDBChunkInv from {:?}: {:?}",
987
                network.get_local_peer(),
×
988
                &self.smart_contract_id,
×
989
                &naddr,
×
990
                &chunk_inv_opt
×
991
            );
992

993
            if let Some(chunk_inv) = chunk_inv_opt {
3,325,782✔
994
                self.chunk_invs.insert(naddr.clone(), chunk_inv);
3,323,296✔
995
                self.connected_replicas.insert(naddr);
3,323,296✔
996
            }
3,323,296✔
997
        }
998
        if self.comms.count_inflight() > 0 {
17,165,514✔
999
            // not done yet, so blocked
1000
            return Ok(false);
13,390,320✔
1001
        }
3,775,194✔
1002

1003
        // got everything. Calculate download priority
1004
        let priorities = self.make_chunk_request_schedule(network, None)?;
3,775,194✔
1005
        let expected_versions = self.stackerdbs.get_slot_versions(&self.smart_contract_id)?;
3,775,194✔
1006

1007
        self.chunk_fetch_priorities = priorities;
3,775,194✔
1008
        self.expected_versions = expected_versions;
3,775,194✔
1009
        Ok(true)
3,775,194✔
1010
    }
17,165,514✔
1011

1012
    /// Ask each prioritized replica for some chunks we need.
1013
    /// Return Ok(true) if we processed all requested chunks
1014
    /// Return Ok(false) if there are still some requests to make
1015
    pub fn getchunks_begin(&mut self, network: &mut PeerNetwork) -> Result<bool, net_error> {
4,111,204✔
1016
        if self.chunk_fetch_priorities.is_empty() {
4,111,204✔
1017
            // done
1018
            debug!(
3,832,472✔
1019
                "{:?}: {}: getchunks_begin: no chunks prioritized",
1020
                network.get_local_peer(),
×
1021
                &self.smart_contract_id
×
1022
            );
1023
            return Ok(true);
3,832,472✔
1024
        }
278,732✔
1025

1026
        let mut cur_priority = self.next_chunk_fetch_priority % self.chunk_fetch_priorities.len();
278,732✔
1027

1028
        debug!(
278,732✔
1029
            "{:?}: {}: getchunks_begin: Issue up to {} StackerDBGetChunk requests",
1030
            &network.get_local_peer(),
×
1031
            &self.smart_contract_id,
×
1032
            self.request_capacity;
1033
            "chunk_fetch_priorities" => ?self.chunk_fetch_priorities,
1034
        );
1035

1036
        let mut requested = 0;
278,732✔
1037
        let mut unpin = HashSet::new();
278,732✔
1038

1039
        // fill up our comms with $capacity requests
1040
        for _i in 0..self.request_capacity {
1,672,392✔
1041
            if self.comms.count_inflight() >= self.request_capacity {
1,672,392✔
1042
                break;
×
1043
            }
1,672,392✔
1044
            let cur_fetch_priority = self
1,672,392✔
1045
                .chunk_fetch_priorities
1,672,392✔
1046
                .get_mut(cur_priority)
1,672,392✔
1047
                .ok_or_else(|| {
1,672,392✔
1048
                    error!(
×
1049
                        "Error setting chunk fetch priories. Priority index out of bounds";
1050
                        "cur_priority" => cur_priority,
×
1051
                    );
1052
                    net_error::InvalidState
×
1053
                })?;
×
1054

1055
            let chunk_request = cur_fetch_priority.0.clone();
1,672,392✔
1056
            let selected_neighbor_opt = cur_fetch_priority
1,672,392✔
1057
                .1
1,672,392✔
1058
                .iter()
1,672,392✔
1059
                .enumerate()
1,672,392✔
1060
                .find(|(_i, naddr)| !self.comms.has_inflight(naddr));
1,672,392✔
1061

1062
            let (idx, selected_neighbor) = if let Some(x) = selected_neighbor_opt {
1,672,392✔
1063
                x
79,062✔
1064
            } else {
1065
                continue;
1,593,330✔
1066
            };
1067

1068
            debug!(
79,062✔
1069
                "{:?}: {}: getchunks_begin: Send StackerDBGetChunk(id={},ver={}) at {} to {}",
1070
                &network.get_local_peer(),
×
1071
                &self.smart_contract_id,
×
1072
                chunk_request.slot_id,
1073
                chunk_request.slot_version,
1074
                &chunk_request.rc_consensus_hash,
×
1075
                &selected_neighbor
×
1076
            );
1077

1078
            if let Err(e) = self.comms.neighbor_send(
79,062✔
1079
                network,
79,062✔
1080
                selected_neighbor,
79,062✔
1081
                StacksMessageType::StackerDBGetChunk(chunk_request.clone()),
79,062✔
1082
            ) {
79,062✔
1083
                info!(
378✔
1084
                    "{:?}: {} Failed to request chunk {} from {:?}: {:?}",
1085
                    network.get_local_peer(),
378✔
1086
                    &self.smart_contract_id,
378✔
1087
                    chunk_request.slot_id,
1088
                    selected_neighbor,
1089
                    &e
378✔
1090
                );
1091
                unpin.insert(selected_neighbor.clone());
378✔
1092
                continue;
378✔
1093
            }
78,684✔
1094

1095
            requested += 1;
78,684✔
1096

1097
            // don't ask this neighbor again
1098
            cur_fetch_priority.1.remove(idx);
78,684✔
1099

1100
            // next-prioritized chunk
1101
            cur_priority = (cur_priority + 1) % self.chunk_fetch_priorities.len();
78,684✔
1102
        }
1103
        let _ = unpin
278,732✔
1104
            .into_iter()
278,732✔
1105
            .map(|naddr| self.unpin_connected_replica(network, &naddr));
278,732✔
1106

1107
        if requested == 0 && self.comms.count_inflight() == 0 {
278,732✔
1108
            return Err(net_error::PeerNotConnected(format!(
558✔
1109
                "StackerDB getchunks_begin: no chunks to request"
558✔
1110
            )));
558✔
1111
        }
278,174✔
1112

1113
        self.next_chunk_fetch_priority = cur_priority;
278,174✔
1114

1115
        Ok(self.chunk_fetch_priorities.is_empty())
278,174✔
1116
    }
4,111,204✔
1117

1118
    /// Collect chunk replies from neighbors
1119
    /// Returns Ok(true) if all inflight messages have been received (or dealt with)
1120
    /// Returns Ok(false) otherwise
1121
    pub fn getchunks_try_finish(
4,110,646✔
1122
        &mut self,
4,110,646✔
1123
        network: &mut PeerNetwork,
4,110,646✔
1124
        config: &StackerDBConfig,
4,110,646✔
1125
    ) -> Result<bool, net_error> {
4,110,646✔
1126
        for (naddr, message) in self.comms.collect_replies(network).into_iter() {
4,110,646✔
1127
            let data = match message.payload {
78,358✔
1128
                StacksMessageType::StackerDBChunk(data) => data,
77,845✔
1129
                StacksMessageType::Nack(data) => {
513✔
1130
                    debug!(
513✔
1131
                        "{:?}: {}: remote peer {:?} NACK'ed our StackerDBGetChunk with code {}",
1132
                        network.get_local_peer(),
×
1133
                        &self.smart_contract_id,
×
1134
                        &naddr,
×
1135
                        data.error_code
1136
                    );
1137
                    if data.error_code == NackErrorCodes::StaleView
513✔
1138
                        || data.error_code == NackErrorCodes::FutureView
315✔
1139
                    {
198✔
1140
                        self.stale_neighbors.insert(naddr);
198✔
1141
                    } else if data.error_code == NackErrorCodes::StaleVersion {
468✔
1142
                        // try again immediately, without throttling
297✔
1143
                        self.stale_inv = true;
297✔
1144
                    }
315✔
1145
                    continue;
513✔
1146
                }
1147
                x => {
×
1148
                    info!(
×
1149
                        "{:?}: {}: Received unexpected message {:?}",
1150
                        network.get_local_peer(),
×
1151
                        &self.smart_contract_id,
×
1152
                        &x
×
1153
                    );
1154
                    self.unpin_connected_replica(network, &naddr);
×
1155
                    continue;
×
1156
                }
1157
            };
1158

1159
            // validate
1160
            if !self.validate_downloaded_chunk(network, config, &data)? {
77,845✔
1161
                info!(
54✔
1162
                    "{:?}: {}: Remote neighbor {:?} served an invalid chunk for ID {}",
1163
                    network.get_local_peer(),
54✔
1164
                    &self.smart_contract_id,
54✔
1165
                    &naddr,
54✔
1166
                    data.slot_id
1167
                );
1168
                self.unpin_connected_replica(network, &naddr);
54✔
1169
                continue;
54✔
1170
            }
77,791✔
1171

1172
            // update bookkeeping
1173
            debug!(
77,791✔
1174
                "{:?}: {}, getchunks_try_finish: Received StackerDBChunk from {:?}",
1175
                network.get_local_peer(),
×
1176
                &self.smart_contract_id,
×
1177
                &naddr
×
1178
            );
1179
            self.add_downloaded_chunk(naddr, data);
77,791✔
1180
        }
1181

1182
        Ok(self.comms.count_inflight() == 0)
4,110,646✔
1183
    }
4,110,646✔
1184

1185
    /// Push out chunks to peers
1186
    /// Returns true if there are no more chunks to push.
1187
    /// Returns false if there are
1188
    pub fn pushchunks_begin(&mut self, network: &mut PeerNetwork) -> Result<bool, net_error> {
4,042,000✔
1189
        if self.chunk_push_priorities.is_empty() && self.push_round != self.rounds {
4,042,000✔
1190
            // only do this once per round
1191
            let priorities = self.make_chunk_push_schedule(network)?;
3,774,282✔
1192
            self.chunk_push_priorities = priorities;
3,774,282✔
1193
            self.push_round = self.rounds;
3,774,282✔
1194
        }
267,718✔
1195
        if self.chunk_push_priorities.is_empty() {
4,042,000✔
1196
            // done
1197
            debug!(
3,741,901✔
1198
                "{:?}:{}: pushchunks_begin: no chunks prioritized",
1199
                network.get_local_peer(),
×
1200
                &self.smart_contract_id
×
1201
            );
1202
            return Ok(true);
3,741,901✔
1203
        }
300,099✔
1204

1205
        let mut cur_priority = self.next_chunk_push_priority % self.chunk_push_priorities.len();
300,099✔
1206

1207
        debug!(
300,099✔
1208
            "{:?}: {}: pushchunks_begin: Send up to {} StackerDBChunk pushes",
1209
            &network.get_local_peer(),
×
1210
            &self.smart_contract_id,
×
1211
            self.chunk_push_priorities.len();
×
1212
            "chunk_push_priorities" => ?self.chunk_push_priorities
1213
        );
1214

1215
        // fill up our comms with $capacity requests
1216
        let mut num_sent = 0;
300,099✔
1217
        for _i in 0..self.chunk_push_priorities.len() {
541,871✔
1218
            if self.comms.count_inflight() >= self.request_capacity {
541,871✔
1219
                break;
×
1220
            }
541,871✔
1221
            let cur_push_priority = self
541,871✔
1222
                .chunk_push_priorities
541,871✔
1223
                .get_mut(cur_priority)
541,871✔
1224
                .ok_or_else(|| {
541,871✔
1225
                    error!(
×
1226
                        "Error setting chunk push priories. Priority index out of bounds";
1227
                        "cur_priority" => cur_priority,
×
1228
                    );
1229
                    net_error::InvalidState
×
1230
                })?;
×
1231

1232
            let chunk_push = cur_push_priority.0.clone();
541,871✔
1233
            // try the first neighbor in the chunk_push_priorities list
1234
            let selected_neighbor_opt = cur_push_priority.1.first().map(|neighbor| (0, neighbor));
541,871✔
1235

1236
            let Some((idx, selected_neighbor)) = selected_neighbor_opt else {
541,871✔
1237
                debug!("{:?}: {}: pushchunks_begin: no available neighbor to send StackerDBChunk(id={},ver={}) to",
439,471✔
1238
                    &network.get_local_peer(),
×
1239
                    &self.smart_contract_id,
×
1240
                    chunk_push.chunk_data.slot_id,
1241
                    chunk_push.chunk_data.slot_version,
1242
                );
1243

1244
                // next-prioritized chunk
1245
                cur_priority = (cur_priority + 1) % self.chunk_push_priorities.len();
439,471✔
1246
                continue;
439,471✔
1247
            };
1248

1249
            debug!(
102,400✔
1250
                "{:?}: {}: pushchunks_begin: Send StackerDBChunk(id={},ver={}) at {} to {}",
1251
                &network.get_local_peer(),
×
1252
                &self.smart_contract_id,
×
1253
                chunk_push.chunk_data.slot_id,
1254
                chunk_push.chunk_data.slot_version,
1255
                &chunk_push.rc_consensus_hash,
×
1256
                &selected_neighbor
×
1257
            );
1258

1259
            let slot_id = chunk_push.chunk_data.slot_id;
102,400✔
1260
            let slot_version = chunk_push.chunk_data.slot_version;
102,400✔
1261
            if let Err(e) = self.comms.neighbor_send(
102,400✔
1262
                network,
102,400✔
1263
                selected_neighbor,
102,400✔
1264
                StacksMessageType::StackerDBPushChunk(chunk_push),
102,400✔
1265
            ) {
102,400✔
1266
                info!(
90✔
1267
                    "{:?}: {}: Failed to send chunk {} from {:?}: {:?}",
1268
                    network.get_local_peer(),
90✔
1269
                    &self.smart_contract_id,
90✔
1270
                    slot_id,
1271
                    selected_neighbor,
1272
                    &e
90✔
1273
                );
1274
                continue;
90✔
1275
            }
102,310✔
1276

1277
            // record what we just sent
1278
            self.chunk_push_receipts
102,310✔
1279
                .insert(selected_neighbor.clone(), (slot_id, slot_version));
102,310✔
1280

1281
            // don't send to this neighbor again
1282
            cur_push_priority.1.remove(idx);
102,310✔
1283

1284
            // next-prioritized chunk
1285
            cur_priority = (cur_priority + 1) % self.chunk_push_priorities.len();
102,310✔
1286

1287
            num_sent += 1;
102,310✔
1288
            if num_sent > self.request_capacity {
102,310✔
1289
                break;
2✔
1290
            }
102,308✔
1291
        }
1292
        self.next_chunk_push_priority = cur_priority;
300,099✔
1293
        Ok(self
300,099✔
1294
            .chunk_push_priorities
300,099✔
1295
            .iter()
300,099✔
1296
            .fold(0usize, |acc, (_chunk, num_naddrs)| {
541,877✔
1297
                acc.saturating_add(num_naddrs.len())
541,877✔
1298
            })
541,877✔
1299
            == 0)
1300
    }
4,042,000✔
1301

1302
    /// Collect push-chunk replies from neighbors.
1303
    /// If a remote neighbor replies with a chunk-inv for a pushed chunk which contains newer data
1304
    /// than we have, then set `self.need_resync` to true.
1305
    /// Returns true if all inflight messages have been received (or dealt with)
1306
    /// Returns false otherwise
1307
    pub fn pushchunks_try_finish(&mut self, network: &mut PeerNetwork) -> bool {
4,042,000✔
1308
        for (naddr, message) in self.comms.collect_replies(network).into_iter() {
4,042,000✔
1309
            let new_chunk_inv = match message.payload {
58,190✔
1310
                StacksMessageType::StackerDBChunkInv(data) => data,
58,190✔
1311
                StacksMessageType::Nack(data) => {
×
1312
                    debug!(
×
1313
                        "{:?}: {}: remote peer {:?} NACK'ed our StackerDBChunk with code {}",
1314
                        network.get_local_peer(),
×
1315
                        &self.smart_contract_id,
×
1316
                        &naddr,
×
1317
                        data.error_code
1318
                    );
1319
                    if data.error_code == NackErrorCodes::StaleView
×
1320
                        || data.error_code == NackErrorCodes::FutureView
×
1321
                    {
×
1322
                        self.stale_neighbors.insert(naddr);
×
1323
                    }
×
1324
                    continue;
×
1325
                }
1326
                x => {
×
1327
                    info!(
×
1328
                        "{:?}: {}: Received unexpected message {:?}",
1329
                        network.get_local_peer(),
×
1330
                        &self.smart_contract_id,
×
1331
                        &x
×
1332
                    );
1333
                    continue;
×
1334
                }
1335
            };
1336

1337
            // must be well-formed
1338
            if new_chunk_inv.slot_versions.len() != self.num_slots {
58,190✔
1339
                info!("{:?}: {}: Received malformed StackerDBChunkInv from {:?}: expected {} chunks, got {}", network.get_local_peer(), &self.smart_contract_id, &naddr, self.num_slots, new_chunk_inv.slot_versions.len());
×
1340
                continue;
×
1341
            }
58,190✔
1342

1343
            // update bookkeeping
1344
            debug!(
58,190✔
1345
                "{:?}: {}: pushchunks_try_finish: Received StackerDBChunkInv from {:?}",
1346
                network.get_local_peer(),
×
1347
                &self.smart_contract_id,
×
1348
                &naddr
×
1349
            );
1350

1351
            if let Some((slot_id, _)) = self.chunk_push_receipts.get(&naddr) {
58,190✔
1352
                self.need_resync = self.need_resync
58,190✔
1353
                    || self.add_pushed_chunk(network, naddr, new_chunk_inv, *slot_id);
58,190✔
1354
            }
×
1355
        }
1356

1357
        let inflight = self.comms.count_inflight();
4,042,000✔
1358
        debug!(
4,042,000✔
1359
            "{:?}: {}: inflight messages: {:?}",
1360
            network.get_local_peer(),
×
1361
            &self.smart_contract_id,
×
1362
            inflight
1363
        );
1364
        inflight == 0
4,042,000✔
1365
    }
4,042,000✔
1366

1367
    /// Recalculate the download schedule based on chunkinvs received on push
1368
    pub fn recalculate_chunk_request_schedule(
58,190✔
1369
        &mut self,
58,190✔
1370
        network: &PeerNetwork,
58,190✔
1371
    ) -> Result<(), net_error> {
58,190✔
1372
        // figure out the new expected versions
1373
        let mut expected_versions = vec![0u32; self.num_slots];
58,190✔
1374
        for (_, chunk_inv) in self.chunk_invs.iter() {
58,193✔
1375
            for (slot_version, expected_version) in chunk_inv
288,449✔
1376
                .slot_versions
58,193✔
1377
                .iter()
58,193✔
1378
                .zip(expected_versions.iter_mut())
58,193✔
1379
            {
288,449✔
1380
                *expected_version = (*slot_version).max(*expected_version);
288,449✔
1381
            }
288,449✔
1382
        }
1383

1384
        let priorities =
58,190✔
1385
            self.make_chunk_request_schedule(network, Some(expected_versions.clone()))?;
58,190✔
1386

1387
        self.chunk_fetch_priorities = priorities;
58,190✔
1388
        self.expected_versions = expected_versions;
58,190✔
1389
        Ok(())
58,190✔
1390
    }
58,190✔
1391

1392
    /// Forcibly wake up the state machine if it is throttled
1393
    pub fn wakeup(&mut self) {
2,232,611✔
1394
        debug!("wake up StackerDB sync for {}", &self.smart_contract_id);
2,232,611✔
1395
        self.last_run_ts = 0;
2,232,611✔
1396
    }
2,232,611✔
1397

1398
    /// Run the state machine.
1399
    /// If we run to completion, then reset and return the sync result.
1400
    /// Otherwise, if there's still more work to do, then return None
1401
    pub fn run(
227,665,597✔
1402
        &mut self,
227,665,597✔
1403
        network: &mut PeerNetwork,
227,665,597✔
1404
        config: &StackerDBConfig,
227,665,597✔
1405
    ) -> Result<Option<StackerDBSyncResult>, net_error> {
227,665,597✔
1406
        if network.get_connection_opts().disable_stackerdb_sync {
227,665,597✔
1407
            test_debug!(
45✔
1408
                "{:?}: stacker DB sync is disabled",
1409
                network.get_local_peer()
×
1410
            );
1411
            return Ok(None);
45✔
1412
        }
227,665,552✔
1413

1414
        // make sure we have an up-to-date chain view.
1415
        // If not, then abort and immediately retry the sync (since any queued messages we have are
1416
        // likely gonna fail)
1417
        if let Some(rc_consensus_hash) = self.rc_consensus_hash.as_ref() {
227,665,552✔
1418
            if network.get_chain_view().rc_consensus_hash != *rc_consensus_hash {
217,352,421✔
1419
                debug!("{:?}: {}: Resetting and restarting running StackerDB sync due to chain view change", network.get_local_peer(), &self.smart_contract_id);
1,780,560✔
1420
                let result = self.reset(Some(network), config);
1,780,560✔
1421
                self.state = StackerDBSyncState::ConnectBegin;
1,780,560✔
1422
                self.rc_consensus_hash = Some(network.get_chain_view().rc_consensus_hash.clone());
1,780,560✔
1423
                self.wakeup();
1,780,560✔
1424
                return Ok(Some(result));
1,780,560✔
1425
            }
215,571,861✔
1426
        } else {
10,313,131✔
1427
            self.rc_consensus_hash = Some(network.get_chain_view().rc_consensus_hash.clone());
10,313,131✔
1428
        }
10,313,131✔
1429

1430
        // throttle to write_freq
1431
        if self.last_run_ts + config.write_freq.max(1) > get_epoch_time_secs() {
225,884,992✔
1432
            debug!(
201,507,967✔
1433
                "{:?}: {}: stacker DB sync is throttled until {}",
1434
                network.get_local_peer(),
×
1435
                &self.smart_contract_id,
×
1436
                self.last_run_ts + config.write_freq
×
1437
            );
1438
            return Ok(None);
201,507,967✔
1439
        }
24,377,025✔
1440

1441
        loop {
1442
            debug!(
53,813,883✔
1443
                "{:?}: {}: stacker DB sync state is {:?}",
1444
                network.get_local_peer(),
×
1445
                &self.smart_contract_id,
×
1446
                &self.state
×
1447
            );
1448

1449
            let mut blocked = true;
53,813,883✔
1450
            match self.state {
53,813,883✔
1451
                StackerDBSyncState::ConnectBegin => {
1452
                    let done = match self.connect_begin(network) {
10,357,150✔
1453
                        Ok(done) => done,
3,853,712✔
1454
                        Err(net_error::NoSuchNeighbor) => {
1455
                            // nothing to do
1456
                            self.state = StackerDBSyncState::Finished;
6,503,438✔
1457
                            blocked = false;
6,503,438✔
1458
                            false
6,503,438✔
1459
                        }
1460
                        Err(e) => {
×
1461
                            return Err(e);
×
1462
                        }
1463
                    };
1464
                    if done {
10,357,150✔
1465
                        self.state = StackerDBSyncState::ConnectFinish;
3,831,599✔
1466
                        blocked = false;
3,831,599✔
1467
                    }
9,543,278✔
1468
                }
1469
                StackerDBSyncState::ConnectFinish => {
1470
                    let done = self.connect_try_finish(network)?;
4,029,617✔
1471
                    if done {
4,029,248✔
1472
                        self.state = StackerDBSyncState::GetChunksInvBegin;
3,831,005✔
1473
                        blocked = false;
3,831,005✔
1474
                    }
3,890,639✔
1475
                }
1476
                StackerDBSyncState::GetChunksInvBegin => {
3,831,005✔
1477
                    // does not block
3,831,005✔
1478
                    self.getchunksinv_begin(network);
3,831,005✔
1479
                    self.state = StackerDBSyncState::GetChunksInvFinish;
3,831,005✔
1480
                    blocked = false;
3,831,005✔
1481
                }
3,831,005✔
1482
                StackerDBSyncState::GetChunksInvFinish => {
1483
                    let done = self.getchunksinv_try_finish(network)?;
17,165,514✔
1484
                    if done {
17,165,514✔
1485
                        self.state = StackerDBSyncState::GetChunks;
3,775,194✔
1486
                        blocked = false;
3,775,194✔
1487
                    }
13,390,320✔
1488
                }
1489
                StackerDBSyncState::GetChunks => {
1490
                    if network.get_connection_opts().disable_stackerdb_get_chunks {
4,111,204✔
1491
                        // fault injection -- force the system to rely exclusively on push-chunk
1492
                        // behavior
1493
                        self.state = StackerDBSyncState::PushChunks;
×
1494
                        continue;
×
1495
                    }
4,111,204✔
1496

1497
                    let requests_finished = self.getchunks_begin(network)?;
4,111,204✔
1498
                    let inflight_finished = self.getchunks_try_finish(network, config)?;
4,110,646✔
1499
                    let done = requests_finished && inflight_finished;
4,110,646✔
1500
                    if done {
4,110,646✔
1501
                        self.state = StackerDBSyncState::PushChunks;
3,832,472✔
1502
                        blocked = false;
3,832,472✔
1503
                    }
3,832,484✔
1504
                }
1505
                StackerDBSyncState::PushChunks => {
1506
                    let pushes_finished = self.pushchunks_begin(network)?;
4,042,000✔
1507
                    let inflight_finished = self.pushchunks_try_finish(network);
4,042,000✔
1508
                    let done = pushes_finished && inflight_finished;
4,042,000✔
1509
                    if done {
4,042,000✔
1510
                        if self.need_resync
3,832,145✔
1511
                            && !network.get_connection_opts().disable_stackerdb_get_chunks
58,190✔
1512
                        {
1513
                            // someone pushed newer chunk data to us, and getting chunks is
1514
                            // enabled, so immediately go request them
1515
                            debug!(
58,190✔
1516
                                "{:?}: {}: immediately retry StackerDB GetChunks due to PushChunk NACK",
1517
                                network.get_local_peer(),
×
1518
                                &self.smart_contract_id
×
1519
                            );
1520
                            self.recalculate_chunk_request_schedule(network)?;
58,190✔
1521
                            self.state = StackerDBSyncState::GetChunks;
58,190✔
1522
                        } else {
3,773,955✔
1523
                            // done syncing
3,773,955✔
1524
                            self.state = StackerDBSyncState::Finished;
3,773,955✔
1525
                        }
3,773,955✔
1526
                        self.need_resync = false;
3,832,145✔
1527
                        blocked = false;
3,832,145✔
1528
                    }
209,855✔
1529
                }
1530
                StackerDBSyncState::Finished => {
1531
                    let stale_inv = self.stale_inv;
10,277,393✔
1532

1533
                    let result = self.reset(Some(network), config);
10,277,393✔
1534
                    self.state = StackerDBSyncState::ConnectBegin;
10,277,393✔
1535

1536
                    if stale_inv {
10,277,393✔
1537
                        debug!(
×
1538
                            "{:?}: {}: immediately retry StackerDB sync due to stale inventory",
1539
                            network.get_local_peer(),
×
1540
                            &self.smart_contract_id
×
1541
                        );
1542
                        self.wakeup();
×
1543
                    }
10,277,393✔
1544
                    return Ok(Some(result));
10,277,393✔
1545
                }
1546
            };
1547

1548
            if blocked {
43,535,563✔
1549
                return Ok(None);
14,098,705✔
1550
            }
29,436,858✔
1551
        }
1552
    }
227,665,597✔
1553
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc