• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 15484013348

06 Jun 2025 06:08AM UTC coverage: 72.04% (+0.3%) from 71.789%
15484013348

push

github

web-flow
fix(network-discovery): add back idle event handling (#7194)

Description
---
fix(network-discovery): add back idle event handling

Motivation and Context
---
network discovery was spinning at full speed because the Idle event
transition was removed. Network logs would rotate < 1s.

```
[comms::dht::network_discovery::ready] [Thread:123190302967360] DEBUG NetworkDiscovery::Ready: Peer list contains 759 entries. Current discovery rounds in this cycle: 0.
[comms::dht::network_discovery::ready] [Thread:123190302967360] DEBUG First active round (current_num_rounds = 0) and num_peers (759) >= min_desired_peers (16). Forcing DHT discovery.
 [comms::dht::network_discovery::ready] [Thread:123190302967360] DEBUG Selecting 5 random peers for discovery (last round info available: false, new peers in last round: false).
[comms::dht::network_discovery::ready] [Thread:123190302967360] DEBUG No suitable peers found for the forced DHT discovery round (current_num_rounds = 0 path). Transitioning to Idle.
 [comms::dht::network_discovery] [Thread:123190302967360] DEBUG Transition triggered from current state `Ready` by event `Idle`
comms::dht::network_discovery] [Thread:123190302967360] DEBUG No state transition for event `Idle`. The current state is `Ready`

...instant rinse and repeat...
```

This PR adds the idle state transition back. Note that idle will idle
for 30 minutes so should only transition when all work is done and we
have downloaded sufficient peers.

How Has This Been Tested?
---
Manually - console wallet with empty peer db

What process can a PR reviewer use to test or verify this change?
---

<!-- Checklist -->
<!-- 1. Is the title of your PR in the form that would make nice release
notes? The title, excluding the conventional commit
tag, will be included exactly as is in the CHANGELOG, so please think
about it carefully. -->


Breaking Changes
---

- [x] None
- [ ] Requires data directory on base node to be deleted
- [ ] Requires hard fork
- [ ] Other... (continued)

3 of 4 new or added lines in 2 files covered. (75.0%)

412 existing lines in 30 files now uncovered.

80882 of 112274 relevant lines covered (72.04%)

242938.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

13.92
/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs
1
//  Copyright 2020, The Tari Project
2
//
3
//  Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
//  following conditions are met:
5
//
6
//  1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
//  disclaimer.
8
//
9
//  2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
//  following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
//  3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
//  products derived from this software without specific prior written permission.
14
//
15
//  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
//  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
//  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
//  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
//  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
//  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
//  USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22
use std::{
23
    convert::TryFrom,
24
    sync::Arc,
25
    time::{Duration, Instant},
26
};
27

28
use futures::StreamExt;
29
use log::*;
30
use primitive_types::U512;
31
use tari_common_types::{chain_metadata::ChainMetadata, types::HashOutput};
32
use tari_comms::{
33
    connectivity::ConnectivityRequester,
34
    peer_manager::NodeId,
35
    protocol::rpc::{RpcClient, RpcError},
36
    PeerConnection,
37
};
38
use tari_utilities::hex::Hex;
39

40
use super::{validator::BlockHeaderSyncValidator, BlockHeaderSyncError};
41
use crate::{
42
    base_node::sync::{
43
        ban::PeerBanManager,
44
        header_sync::HEADER_SYNC_INITIAL_MAX_HEADERS,
45
        hooks::Hooks,
46
        rpc,
47
        BlockchainSyncConfig,
48
        SyncPeer,
49
    },
50
    blocks::{BlockHeader, ChainBlock, ChainHeader},
51
    chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend, ChainStorageError},
52
    common::{rolling_avg::RollingAverageTime, BanPeriod},
53
    consensus::ConsensusManager,
54
    proof_of_work::randomx_factory::RandomXFactory,
55
    proto::{
56
        base_node::{FindChainSplitRequest, SyncHeadersRequest},
57
        core::BlockHeader as ProtoBlockHeader,
58
    },
59
};
60

61
const LOG_TARGET: &str = "c::bn::header_sync";
62

63
const MAX_LATENCY_INCREASES: usize = 5;
64

65
pub struct HeaderSynchronizer<'a, B> {
66
    config: BlockchainSyncConfig,
67
    db: AsyncBlockchainDb<B>,
68
    header_validator: BlockHeaderSyncValidator<B>,
69
    connectivity: ConnectivityRequester,
70
    sync_peers: &'a mut Vec<SyncPeer>,
71
    hooks: Hooks,
72
    local_cached_metadata: &'a ChainMetadata,
73
    peer_ban_manager: PeerBanManager,
74
}
75

76
impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {
77
    pub fn new(
10✔
78
        config: BlockchainSyncConfig,
10✔
79
        db: AsyncBlockchainDb<B>,
10✔
80
        consensus_rules: ConsensusManager,
10✔
81
        connectivity: ConnectivityRequester,
10✔
82
        sync_peers: &'a mut Vec<SyncPeer>,
10✔
83
        randomx_factory: RandomXFactory,
10✔
84
        local_metadata: &'a ChainMetadata,
10✔
85
    ) -> Self {
10✔
86
        let peer_ban_manager = PeerBanManager::new(config.clone(), connectivity.clone());
10✔
87
        Self {
10✔
88
            config,
10✔
89
            header_validator: BlockHeaderSyncValidator::new(db.clone(), consensus_rules, randomx_factory),
10✔
90
            db,
10✔
91
            connectivity,
10✔
92
            sync_peers,
10✔
93
            hooks: Default::default(),
10✔
94
            local_cached_metadata: local_metadata,
10✔
95
            peer_ban_manager,
10✔
96
        }
10✔
97
    }
10✔
98

99
    pub fn on_starting<H>(&mut self, hook: H)
10✔
100
    where for<'r> H: FnOnce(&SyncPeer) + Send + Sync + 'static {
10✔
101
        self.hooks.add_on_starting_hook(hook);
10✔
102
    }
10✔
103

104
    pub fn on_progress<H>(&mut self, hook: H)
10✔
105
    where H: Fn(u64, u64, &SyncPeer) + Send + Sync + 'static {
10✔
106
        self.hooks.add_on_progress_header_hook(hook);
10✔
107
    }
10✔
108

109
    pub fn on_rewind<H>(&mut self, hook: H)
10✔
110
    where H: Fn(Vec<Arc<ChainBlock>>) + Send + Sync + 'static {
10✔
111
        self.hooks.add_on_rewind_hook(hook);
10✔
112
    }
10✔
113

114
    pub async fn synchronize(&mut self) -> Result<(SyncPeer, AttemptSyncResult), BlockHeaderSyncError> {
10✔
115
        debug!(target: LOG_TARGET, "Starting header sync.",);
10✔
116

117
        info!(
10✔
118
            target: LOG_TARGET,
×
119
            "Synchronizing headers ({} candidate peers selected)",
×
120
            self.sync_peers.len()
×
121
        );
122
        let mut max_latency = self.config.initial_max_sync_latency;
10✔
123
        let mut latency_increases_counter = 0;
10✔
124
        loop {
125
            match self.try_sync_from_all_peers(max_latency).await {
10✔
UNCOV
126
                Ok((peer, sync_result)) => break Ok((peer, sync_result)),
×
127
                Err(err @ BlockHeaderSyncError::AllSyncPeersExceedLatency) => {
×
128
                    // If we have few sync peers, throw this out to be retried later
×
129
                    if self.sync_peers.len() < 2 {
×
130
                        return Err(err);
×
131
                    }
×
132
                    max_latency += self.config.max_latency_increase;
×
133
                    latency_increases_counter += 1;
×
134
                    if latency_increases_counter > MAX_LATENCY_INCREASES {
×
135
                        return Err(err);
×
136
                    }
×
137
                },
138
                Err(err) => break Err(err),
10✔
139
            }
140
        }
141
    }
10✔
142

143
    #[allow(clippy::too_many_lines)]
144
    pub async fn try_sync_from_all_peers(
10✔
145
        &mut self,
10✔
146
        max_latency: Duration,
10✔
147
    ) -> Result<(SyncPeer, AttemptSyncResult), BlockHeaderSyncError> {
10✔
148
        let sync_peer_node_ids = self.sync_peers.iter().map(|p| p.node_id()).cloned().collect::<Vec<_>>();
10✔
149
        info!(
10✔
150
            target: LOG_TARGET,
×
151
            "Attempting to sync headers ({} sync peers)",
×
152
            sync_peer_node_ids.len()
×
153
        );
154
        let mut latency_counter = 0usize;
10✔
155
        for node_id in sync_peer_node_ids {
20✔
156
            match self.connect_and_attempt_sync(&node_id, max_latency).await {
10✔
UNCOV
157
                Ok((peer, sync_result)) => return Ok((peer, sync_result)),
×
158
                Err(err) => {
10✔
159
                    let ban_reason = BlockHeaderSyncError::get_ban_reason(&err);
10✔
160
                    if let Some(reason) = ban_reason {
10✔
161
                        warn!(target: LOG_TARGET, "{}", err);
×
162
                        let duration = match reason.ban_duration {
×
163
                            BanPeriod::Short => self.config.short_ban_period,
×
164
                            BanPeriod::Long => self.config.ban_period,
×
165
                        };
166
                        self.peer_ban_manager
×
167
                            .ban_peer_if_required(&node_id, reason.reason, duration)
×
168
                            .await;
×
169
                    }
10✔
170
                    if let BlockHeaderSyncError::MaxLatencyExceeded { .. } = err {
10✔
171
                        latency_counter += 1;
×
172
                    } else {
10✔
173
                        self.remove_sync_peer(&node_id);
10✔
174
                    }
10✔
175
                },
176
            }
177
        }
178

179
        if self.sync_peers.is_empty() {
10✔
180
            Err(BlockHeaderSyncError::NoMoreSyncPeers("Header sync failed".to_string()))
10✔
181
        } else if latency_counter >= self.sync_peers.len() {
×
182
            Err(BlockHeaderSyncError::AllSyncPeersExceedLatency)
×
183
        } else {
184
            Err(BlockHeaderSyncError::SyncFailedAllPeers)
×
185
        }
186
    }
10✔
187

188
    async fn connect_and_attempt_sync(
10✔
189
        &mut self,
10✔
190
        node_id: &NodeId,
10✔
191
        max_latency: Duration,
10✔
192
    ) -> Result<(SyncPeer, AttemptSyncResult), BlockHeaderSyncError> {
10✔
193
        let peer_index = self
10✔
194
            .get_sync_peer_index(node_id)
10✔
195
            .ok_or(BlockHeaderSyncError::PeerNotFound)?;
10✔
196
        let sync_peer = &self.sync_peers[peer_index];
10✔
197
        self.hooks.call_on_starting_hook(sync_peer);
10✔
198

199
        let mut conn = self.dial_sync_peer(node_id).await?;
10✔
UNCOV
200
        debug!(
×
201
            target: LOG_TARGET,
×
202
            "Attempting to synchronize headers with `{}`", node_id
×
203
        );
204

UNCOV
205
        let config = RpcClient::builder()
×
UNCOV
206
            .with_deadline(self.config.rpc_deadline)
×
UNCOV
207
            .with_deadline_grace_period(Duration::from_secs(5));
×
UNCOV
208
        let mut client = conn
×
UNCOV
209
            .connect_rpc_using_builder::<rpc::BaseNodeSyncRpcClient>(config)
×
UNCOV
210
            .await?;
×
211

UNCOV
212
        let latency = client
×
UNCOV
213
            .get_last_request_latency()
×
UNCOV
214
            .expect("unreachable panic: last request latency must be set after connect");
×
UNCOV
215
        self.sync_peers[peer_index].set_latency(latency);
×
UNCOV
216
        if latency > max_latency {
×
217
            return Err(BlockHeaderSyncError::MaxLatencyExceeded {
×
218
                peer: conn.peer_node_id().clone(),
×
219
                latency,
×
220
                max_latency,
×
221
            });
×
UNCOV
222
        }
×
UNCOV
223

×
UNCOV
224
        debug!(target: LOG_TARGET, "Sync peer latency is {:.2?}", latency);
×
UNCOV
225
        let sync_peer = self.sync_peers[peer_index].clone();
×
UNCOV
226
        let sync_result = self.attempt_sync(&sync_peer, client, max_latency).await?;
×
UNCOV
227
        Ok((sync_peer, sync_result))
×
228
    }
10✔
229

230
    async fn dial_sync_peer(&self, node_id: &NodeId) -> Result<PeerConnection, BlockHeaderSyncError> {
10✔
231
        let timer = Instant::now();
10✔
232
        debug!(target: LOG_TARGET, "Dialing {} sync peer", node_id);
10✔
233
        let conn = self.connectivity.dial_peer(node_id.clone()).await?;
10✔
UNCOV
234
        info!(
×
235
            target: LOG_TARGET,
×
236
            "Successfully dialed sync peer {} in {:.2?}",
×
237
            node_id,
×
238
            timer.elapsed()
×
239
        );
UNCOV
240
        Ok(conn)
×
241
    }
10✔
242

UNCOV
243
    async fn attempt_sync(
×
UNCOV
244
        &mut self,
×
UNCOV
245
        sync_peer: &SyncPeer,
×
UNCOV
246
        mut client: rpc::BaseNodeSyncRpcClient,
×
UNCOV
247
        max_latency: Duration,
×
UNCOV
248
    ) -> Result<AttemptSyncResult, BlockHeaderSyncError> {
×
UNCOV
249
        let latency = client.get_last_request_latency();
×
UNCOV
250
        debug!(
×
251
            target: LOG_TARGET,
×
252
            "Initiating header sync with peer `{}` (sync latency = {}ms)",
×
253
            sync_peer.node_id(),
×
254
            latency.unwrap_or_default().as_millis()
×
255
        );
256

257
        // Fetch best local data at the beginning of the sync process
UNCOV
258
        let best_block_metadata = self.db.get_chain_metadata().await?;
×
UNCOV
259
        let best_header = self.db.fetch_last_chain_header().await?;
×
UNCOV
260
        let best_block_header = self
×
UNCOV
261
            .db
×
UNCOV
262
            .fetch_chain_header(best_block_metadata.best_block_height())
×
UNCOV
263
            .await?;
×
UNCOV
264
        let best_header_height = best_header.height();
×
UNCOV
265
        let best_block_height = best_block_header.height();
×
UNCOV
266

×
UNCOV
267
        if best_header_height < best_block_height || best_block_height < self.local_cached_metadata.best_block_height()
×
268
        {
269
            return Err(BlockHeaderSyncError::ChainStorageError(
×
270
                ChainStorageError::CorruptedDatabase("Inconsistent block and header data".to_string()),
×
271
            ));
×
UNCOV
272
        }
×
273

274
        // - At this point we may have more (InSyncOrAhead), equal (InSyncOrAhead), or less headers (Lagging) than the
275
        //   peer, but they claimed better POW before we attempted sync.
276
        // - This method will return ban-able errors for certain offenses.
UNCOV
277
        let (header_sync_status, peer_response) = self
×
UNCOV
278
            .determine_sync_status(sync_peer, best_header.clone(), best_block_header.clone(), &mut client)
×
UNCOV
279
            .await?;
×
280

UNCOV
281
        match header_sync_status.clone() {
×
282
            HeaderSyncStatus::InSyncOrAhead => {
UNCOV
283
                debug!(
×
284
                    target: LOG_TARGET,
×
285
                    "Headers are in sync at height {} but tip is {}. Proceeding to archival/pruned block sync",
×
286
                    best_header_height,
287
                    best_block_height
288
                );
289

UNCOV
290
                Ok(AttemptSyncResult {
×
UNCOV
291
                    headers_returned: peer_response.peer_headers.len() as u64,
×
UNCOV
292
                    peer_fork_hash_index: peer_response.peer_fork_hash_index,
×
UNCOV
293
                    header_sync_status,
×
UNCOV
294
                })
×
295
            },
UNCOV
296
            HeaderSyncStatus::Lagging(split_info) => {
×
UNCOV
297
                self.hooks.call_on_progress_header_hooks(
×
UNCOV
298
                    split_info
×
UNCOV
299
                        .best_block_header
×
UNCOV
300
                        .height()
×
UNCOV
301
                        .checked_sub(split_info.reorg_steps_back)
×
UNCOV
302
                        .unwrap_or_default(),
×
UNCOV
303
                    sync_peer.claimed_chain_metadata().best_block_height(),
×
UNCOV
304
                    sync_peer,
×
UNCOV
305
                );
×
UNCOV
306
                self.synchronize_headers(sync_peer.clone(), &mut client, *split_info, max_latency)
×
UNCOV
307
                    .await?;
×
UNCOV
308
                Ok(AttemptSyncResult {
×
UNCOV
309
                    headers_returned: peer_response.peer_headers.len() as u64,
×
UNCOV
310
                    peer_fork_hash_index: peer_response.peer_fork_hash_index,
×
UNCOV
311
                    header_sync_status,
×
UNCOV
312
                })
×
313
            },
314
        }
UNCOV
315
    }
×
316

317
    #[allow(clippy::too_many_lines)]
UNCOV
318
    async fn find_chain_split(
×
UNCOV
319
        &mut self,
×
UNCOV
320
        peer_node_id: &NodeId,
×
UNCOV
321
        client: &mut rpc::BaseNodeSyncRpcClient,
×
UNCOV
322
        header_count: u64,
×
UNCOV
323
    ) -> Result<FindChainSplitResult, BlockHeaderSyncError> {
×
324
        const NUM_CHAIN_SPLIT_HEADERS: usize = 500;
325
        // Limit how far back we're willing to go. A peer might just say it does not have a chain split
326
        // and keep us busy going back until the genesis.
327
        // 20 x 500 = max 10,000 block split can be detected
328
        const MAX_CHAIN_SPLIT_ITERS: usize = 20;
329

UNCOV
330
        let mut offset = 0;
×
UNCOV
331
        let mut iter_count = 0;
×
332
        loop {
UNCOV
333
            iter_count += 1;
×
UNCOV
334
            if iter_count > MAX_CHAIN_SPLIT_ITERS {
×
335
                warn!(
×
336
                    target: LOG_TARGET,
×
337
                    "Peer `{}` did not provide a chain split after {} headers requested. Peer will be banned.",
×
338
                    peer_node_id,
×
339
                    NUM_CHAIN_SPLIT_HEADERS * MAX_CHAIN_SPLIT_ITERS,
×
340
                );
341
                return Err(BlockHeaderSyncError::ChainSplitNotFound(peer_node_id.clone()));
×
UNCOV
342
            }
×
343

UNCOV
344
            let block_hashes = self
×
UNCOV
345
                .db
×
UNCOV
346
                .fetch_block_hashes_from_header_tip(NUM_CHAIN_SPLIT_HEADERS, offset)
×
UNCOV
347
                .await?;
×
UNCOV
348
            debug!(
×
349
                target: LOG_TARGET,
×
350
                "Determining if chain splits between {} and {} headers back from the tip (peer: `{}`, {} hashes sent)",
×
351
                offset,
×
352
                offset + NUM_CHAIN_SPLIT_HEADERS,
×
353
                peer_node_id,
×
354
                block_hashes.len()
×
355
            );
356

357
            // No further hashes to send.
UNCOV
358
            if block_hashes.is_empty() {
×
359
                warn!(
×
360
                    target: LOG_TARGET,
×
361
                    "Peer `{}` did not provide a chain split after {} headers requested. Peer will be banned.",
×
362
                    peer_node_id,
×
363
                    NUM_CHAIN_SPLIT_HEADERS * MAX_CHAIN_SPLIT_ITERS,
×
364
                );
365
                return Err(BlockHeaderSyncError::ChainSplitNotFound(peer_node_id.clone()));
×
UNCOV
366
            }
×
UNCOV
367

×
UNCOV
368
            let request = FindChainSplitRequest {
×
UNCOV
369
                block_hashes: block_hashes.clone().iter().map(|v| v.to_vec()).collect(),
×
UNCOV
370
                header_count,
×
UNCOV
371
            };
×
372

UNCOV
373
            let resp = match client.find_chain_split(request).await {
×
UNCOV
374
                Ok(r) => r,
×
375
                Err(RpcError::RequestFailed(err)) if err.as_status_code().is_not_found() => {
×
376
                    // This round we sent less hashes than the max, so the next round will not have any more hashes to
×
377
                    // send. Exit early in this case.
×
378
                    if block_hashes.len() < NUM_CHAIN_SPLIT_HEADERS {
×
379
                        warn!(
×
380
                            target: LOG_TARGET,
×
381
                            "Peer `{}` did not provide a chain split after {} headers requested. Peer will be banned.",
×
382
                            peer_node_id,
×
383
                            NUM_CHAIN_SPLIT_HEADERS * MAX_CHAIN_SPLIT_ITERS,
×
384
                        );
385
                        return Err(BlockHeaderSyncError::ChainSplitNotFound(peer_node_id.clone()));
×
386
                    }
×
387
                    // Chain split not found, let's go further back
×
388
                    offset = NUM_CHAIN_SPLIT_HEADERS * iter_count;
×
389
                    continue;
×
390
                },
391
                Err(err) => {
×
392
                    return Err(err.into());
×
393
                },
394
            };
UNCOV
395
            if resp.headers.len() > HEADER_SYNC_INITIAL_MAX_HEADERS {
×
396
                warn!(
×
397
                    target: LOG_TARGET,
×
398
                    "Peer `{}` sent too many headers {}, only requested {}. Peer will be banned.",
×
399
                    peer_node_id,
×
400
                    resp.headers.len(),
×
401
                    HEADER_SYNC_INITIAL_MAX_HEADERS,
402
                );
403
                return Err(BlockHeaderSyncError::PeerSentTooManyHeaders(resp.headers.len()));
×
UNCOV
404
            }
×
UNCOV
405
            if resp.fork_hash_index >= block_hashes.len() as u64 {
×
406
                warn!(
×
407
                    target: LOG_TARGET,
×
408
                    "Peer `{}` sent hash index {} out of range {}. Peer will be banned.",
×
409
                    peer_node_id,
×
410
                    resp.fork_hash_index,
×
411
                    block_hashes.len(),
×
412
                );
413
                return Err(BlockHeaderSyncError::FoundHashIndexOutOfRange(
×
414
                    block_hashes.len() as u64,
×
415
                    resp.fork_hash_index,
×
416
                ));
×
UNCOV
417
            }
×
UNCOV
418
            #[allow(clippy::cast_possible_truncation)]
×
UNCOV
419
            if !resp.headers.is_empty() && resp.headers[0].prev_hash != block_hashes[resp.fork_hash_index as usize] {
×
420
                warn!(
×
421
                    target: LOG_TARGET,
×
422
                    "Peer `{}` sent hash an invalid protocol response, incorrect fork hash index {}. Peer will be banned.",
×
423
                    peer_node_id,
424
                    resp.fork_hash_index,
425
                );
426
                return Err(BlockHeaderSyncError::InvalidProtocolResponse(
×
427
                    "Peer sent incorrect fork hash index".into(),
×
428
                ));
×
UNCOV
429
            }
×
UNCOV
430
            #[allow(clippy::cast_possible_truncation)]
×
UNCOV
431
            let chain_split_hash = block_hashes[resp.fork_hash_index as usize];
×
UNCOV
432

×
UNCOV
433
            return Ok(FindChainSplitResult {
×
UNCOV
434
                reorg_steps_back: resp.fork_hash_index.saturating_add(offset as u64),
×
UNCOV
435
                peer_headers: resp.headers,
×
UNCOV
436
                peer_fork_hash_index: resp.fork_hash_index,
×
UNCOV
437
                chain_split_hash,
×
UNCOV
438
            });
×
439
        }
UNCOV
440
    }
×
441

442
    /// Attempt to determine the point at which the remote and local chain diverge, returning the relevant information
443
    /// of the chain split (see [HeaderSyncStatus]).
444
    ///
445
    /// If the local node is behind the remote chain (i.e. `HeaderSyncStatus::Lagging`), the appropriate
446
    /// `ChainSplitInfo` is returned, the header validator is initialized and the preliminary headers are validated.
UNCOV
447
    async fn determine_sync_status(
×
UNCOV
448
        &mut self,
×
UNCOV
449
        sync_peer: &SyncPeer,
×
UNCOV
450
        best_header: ChainHeader,
×
UNCOV
451
        best_block_header: ChainHeader,
×
UNCOV
452
        client: &mut rpc::BaseNodeSyncRpcClient,
×
UNCOV
453
    ) -> Result<(HeaderSyncStatus, FindChainSplitResult), BlockHeaderSyncError> {
×
454
        // This method will return ban-able errors for certain offenses.
UNCOV
455
        let chain_split_result = self
×
UNCOV
456
            .find_chain_split(sync_peer.node_id(), client, HEADER_SYNC_INITIAL_MAX_HEADERS as u64)
×
UNCOV
457
            .await?;
×
UNCOV
458
        if chain_split_result.reorg_steps_back > 0 {
×
UNCOV
459
            debug!(
×
460
                target: LOG_TARGET,
×
461
                "Found chain split {} blocks back, received {} headers from peer `{}`",
×
462
                chain_split_result.reorg_steps_back,
×
463
                chain_split_result.peer_headers.len(),
×
464
                sync_peer
465
            );
UNCOV
466
        }
×
467

468
        // If the peer returned no new headers, they may still have more blocks than we have, thus have a higher
469
        // accumulated difficulty.
UNCOV
470
        if chain_split_result.peer_headers.is_empty() {
×
471
            // Our POW is less than the peer's POW, as verified before the attempted header sync, therefore, if the
472
            // peer did not supply any headers and we know we are behind based on the peer's claimed metadata, then
473
            // we can ban the peer.
UNCOV
474
            if best_header.height() == best_block_header.height() {
×
475
                warn!(
×
476
                    target: LOG_TARGET,
×
477
                    "Peer `{}` did not provide any headers although they have a better chain and more headers: their \
×
478
                    difficulty: {}, our difficulty: {}. Peer will be banned.",
×
479
                    sync_peer.node_id(),
×
480
                    sync_peer.claimed_chain_metadata().accumulated_difficulty(),
×
481
                    best_block_header.accumulated_data().total_accumulated_difficulty,
×
482
                );
483
                return Err(BlockHeaderSyncError::PeerSentInaccurateChainMetadata {
×
484
                    claimed: sync_peer.claimed_chain_metadata().accumulated_difficulty(),
×
485
                    actual: None,
×
486
                    local: best_block_header.accumulated_data().total_accumulated_difficulty,
×
487
                });
×
UNCOV
488
            }
×
UNCOV
489
            debug!(target: LOG_TARGET, "Peer `{}` sent no headers; headers already in sync with peer.", sync_peer.node_id());
×
UNCOV
490
            return Ok((HeaderSyncStatus::InSyncOrAhead, chain_split_result));
×
UNCOV
491
        }
×
492

UNCOV
493
        let headers = chain_split_result
×
UNCOV
494
            .peer_headers
×
UNCOV
495
            .clone()
×
UNCOV
496
            .into_iter()
×
UNCOV
497
            .map(BlockHeader::try_from)
×
UNCOV
498
            .collect::<Result<Vec<_>, _>>()
×
UNCOV
499
            .map_err(BlockHeaderSyncError::ReceivedInvalidHeader)?;
×
UNCOV
500
        let num_new_headers = headers.len();
×
UNCOV
501
        // Do a cheap check to verify that we do not have these series of headers in the db already - if the 1st one is
×
UNCOV
502
        // not there most probably the rest are not either - the peer could still have returned old headers later on in
×
UNCOV
503
        // the list
×
UNCOV
504
        if self.db.fetch_header_by_block_hash(headers[0].hash()).await?.is_some() {
×
505
            return Err(BlockHeaderSyncError::ReceivedInvalidHeader(
×
506
                "Header already in database".to_string(),
×
507
            ));
×
UNCOV
508
        };
×
UNCOV
509

×
UNCOV
510
        self.header_validator
×
UNCOV
511
            .initialize_state(&chain_split_result.chain_split_hash)
×
UNCOV
512
            .await?;
×
UNCOV
513
        for header in headers {
×
UNCOV
514
            debug!(
×
515
                target: LOG_TARGET,
×
516
                "Validating header #{} (Pow: {}) with hash: ({})",
×
517
                header.height,
×
518
                header.pow_algo(),
×
519
                header.hash().to_hex(),
×
520
            );
UNCOV
521
            self.header_validator.validate(header).await?;
×
522
        }
523

UNCOV
524
        debug!(
×
525
            target: LOG_TARGET,
×
526
            "Peer `{}` has submitted {} valid header(s)", sync_peer.node_id(), num_new_headers
×
527
        );
528

UNCOV
529
        let chain_split_info = ChainSplitInfo {
×
UNCOV
530
            best_block_header,
×
UNCOV
531
            reorg_steps_back: chain_split_result.reorg_steps_back,
×
UNCOV
532
            chain_split_hash: chain_split_result.chain_split_hash,
×
UNCOV
533
        };
×
UNCOV
534
        Ok((
×
UNCOV
535
            HeaderSyncStatus::Lagging(Box::new(chain_split_info)),
×
UNCOV
536
            chain_split_result,
×
UNCOV
537
        ))
×
UNCOV
538
    }
×
539

UNCOV
540
    async fn rewind_blockchain(&self, split_hash: HashOutput) -> Result<Vec<Arc<ChainBlock>>, BlockHeaderSyncError> {
×
UNCOV
541
        debug!(
×
542
            target: LOG_TARGET,
×
543
            "Deleting headers that no longer form part of the main chain up until split at {}",
×
544
            split_hash.to_hex()
×
545
        );
546

UNCOV
547
        let blocks = self.db.rewind_to_hash(split_hash).await?;
×
UNCOV
548
        debug!(
×
549
            target: LOG_TARGET,
×
550
            "Rewound {} block(s) in preparation for header sync",
×
551
            blocks.len()
×
552
        );
UNCOV
553
        Ok(blocks)
×
UNCOV
554
    }
×
555

556
    #[allow(clippy::too_many_lines)]
UNCOV
557
    async fn synchronize_headers(
×
UNCOV
558
        &mut self,
×
UNCOV
559
        mut sync_peer: SyncPeer,
×
UNCOV
560
        client: &mut rpc::BaseNodeSyncRpcClient,
×
UNCOV
561
        split_info: ChainSplitInfo,
×
UNCOV
562
        max_latency: Duration,
×
UNCOV
563
    ) -> Result<(), BlockHeaderSyncError> {
×
UNCOV
564
        info!(target: LOG_TARGET, "Starting header sync from peer {}", sync_peer);
×
565
        const COMMIT_EVERY_N_HEADERS: usize = 1000;
566

UNCOV
567
        let mut has_switched_to_new_chain = false;
×
UNCOV
568
        let pending_len = self.header_validator.valid_headers().len();
×
UNCOV
569

×
UNCOV
570
        // Find the hash to start syncing the rest of the headers.
×
UNCOV
571
        // The expectation cannot fail because there has been at least one valid header returned (checked in
×
UNCOV
572
        // determine_sync_status)
×
UNCOV
573
        let (start_header_height, start_header_hash, total_accumulated_difficulty) = self
×
UNCOV
574
            .header_validator
×
UNCOV
575
            .current_valid_chain_tip_header()
×
UNCOV
576
            .map(|h| (h.height(), *h.hash(), h.accumulated_data().total_accumulated_difficulty))
×
UNCOV
577
            .expect("synchronize_headers: expected there to be a valid tip header but it was None");
×
UNCOV
578

×
UNCOV
579
        // If we already have a stronger chain at this point, switch over to it.
×
UNCOV
580
        // just in case we happen to be exactly HEADER_SYNC_INITIAL_MAX_HEADERS headers behind.
×
UNCOV
581
        let has_better_pow = self.pending_chain_has_higher_pow(&split_info.best_block_header);
×
UNCOV
582

×
UNCOV
583
        if has_better_pow {
×
UNCOV
584
            debug!(
×
585
                target: LOG_TARGET,
×
586
                "Remote chain from peer {} has higher PoW. Switching",
×
587
                sync_peer.node_id()
×
588
            );
UNCOV
589
            self.switch_to_pending_chain(&split_info).await?;
×
UNCOV
590
            has_switched_to_new_chain = true;
×
591
        }
×
592

UNCOV
593
        if pending_len < HEADER_SYNC_INITIAL_MAX_HEADERS {
×
594
            // Peer returned less than the max number of requested headers. This indicates that we have all the
595
            // available headers from the peer.
UNCOV
596
            if !has_better_pow {
×
597
                // Because the pow is less or equal than the current chain the peer had to have lied about their pow
598
                debug!(target: LOG_TARGET, "No further headers to download");
×
599
                return Err(BlockHeaderSyncError::PeerSentInaccurateChainMetadata {
×
600
                    claimed: sync_peer.claimed_chain_metadata().accumulated_difficulty(),
×
601
                    actual: Some(total_accumulated_difficulty),
×
602
                    local: split_info
×
603
                        .best_block_header
×
604
                        .accumulated_data()
×
605
                        .total_accumulated_difficulty,
×
606
                });
×
UNCOV
607
            }
×
UNCOV
608
            // The pow is higher, we swapped to the higher chain, we have all the better chain headers, we can move on
×
UNCOV
609
            // to block sync.
×
UNCOV
610
            return Ok(());
×
611
        }
×
612

×
613
        debug!(
×
614
            target: LOG_TARGET,
×
615
            "Download remaining headers starting from header #{} from peer `{}`",
×
616
            start_header_height,
×
617
            sync_peer.node_id()
×
618
        );
619
        let request = SyncHeadersRequest {
×
620
            start_hash: start_header_hash.to_vec(),
×
621
            // To the tip!
×
622
            count: 0,
×
623
        };
×
624

625
        let mut header_stream = client.sync_headers(request).await?;
×
626
        debug!(
×
627
            target: LOG_TARGET,
×
628
            "Reading headers from peer `{}`",
×
629
            sync_peer.node_id()
×
630
        );
631

632
        let mut last_sync_timer = Instant::now();
×
633

×
634
        let mut last_total_accumulated_difficulty = U512::zero();
×
635
        let mut avg_latency = RollingAverageTime::new(20);
×
636
        let mut prev_height: Option<u64> = None;
×
637
        while let Some(header) = header_stream.next().await {
×
638
            let latency = last_sync_timer.elapsed();
×
639
            avg_latency.add_sample(latency);
×
640
            let header = BlockHeader::try_from(header?).map_err(BlockHeaderSyncError::ReceivedInvalidHeader)?;
×
641
            debug!(
×
642
                target: LOG_TARGET,
×
643
                "Validating header #{} (Pow: {}) with hash: ({}). Latency: {:.2?}",
×
644
                header.height,
×
645
                header.pow_algo(),
×
646
                header.hash().to_hex(),
×
647
                latency
648
            );
649
            trace!(
×
650
                target: LOG_TARGET,
×
651
                "{}",
×
652
                header
653
            );
654
            if let Some(prev_header_height) = prev_height {
×
655
                if header.height != prev_header_height.saturating_add(1) {
×
656
                    warn!(
×
657
                        target: LOG_TARGET,
×
658
                        "Received header #{} `{}` does not follow previous header",
×
659
                        header.height,
×
660
                        header.hash().to_hex()
×
661
                    );
662
                    return Err(BlockHeaderSyncError::ReceivedInvalidHeader(
×
663
                        "Header does not follow previous header".to_string(),
×
664
                    ));
×
665
                }
×
666
            }
×
667
            let existing_header = self.db.fetch_header_by_block_hash(header.hash()).await?;
×
668
            if let Some(h) = existing_header {
×
669
                warn!(
×
670
                    target: LOG_TARGET,
×
671
                    "Received header #{} `{}` that we already have.",
×
672
                    h.height,
×
673
                    h.hash().to_hex()
×
674
                );
675
                return Err(BlockHeaderSyncError::ReceivedInvalidHeader(
×
676
                    "Header already in database".to_string(),
×
677
                ));
×
678
            }
×
679
            let current_height = header.height;
×
680
            last_total_accumulated_difficulty = self.header_validator.validate(header).await?;
×
681

682
            if has_switched_to_new_chain {
×
683
                // If we've switched to the new chain, we simply commit every COMMIT_EVERY_N_HEADERS headers
684
                if self.header_validator.valid_headers().len() >= COMMIT_EVERY_N_HEADERS {
×
685
                    self.commit_pending_headers().await?;
×
686
                }
×
687
            } else {
688
                // The remote chain has not (yet) been accepted.
689
                // We check the tip difficulties, switching over to the new chain if a higher accumulated difficulty is
690
                // achieved.
691
                if self.pending_chain_has_higher_pow(&split_info.best_block_header) {
×
692
                    self.switch_to_pending_chain(&split_info).await?;
×
693
                    has_switched_to_new_chain = true;
×
694
                }
×
695
            }
696

697
            sync_peer.set_latency(latency);
×
698
            sync_peer.add_sample(last_sync_timer.elapsed());
×
699
            self.hooks.call_on_progress_header_hooks(
×
700
                current_height,
×
701
                sync_peer.claimed_chain_metadata().best_block_height(),
×
702
                &sync_peer,
×
703
            );
×
704

×
705
            let last_avg_latency = avg_latency.calculate_average_with_min_samples(5);
×
706
            if let Some(avg_latency) = last_avg_latency {
×
707
                if avg_latency > max_latency {
×
708
                    return Err(BlockHeaderSyncError::MaxLatencyExceeded {
×
709
                        peer: sync_peer.node_id().clone(),
×
710
                        latency: avg_latency,
×
711
                        max_latency,
×
712
                    });
×
713
                }
×
714
            }
×
715

716
            last_sync_timer = Instant::now();
×
717
            prev_height = Some(current_height);
×
718
        }
719

720
        if !has_switched_to_new_chain {
×
721
            if sync_peer.claimed_chain_metadata().accumulated_difficulty() <
×
722
                self.header_validator
×
723
                    .current_valid_chain_tip_header()
×
724
                    .map(|h| h.accumulated_data().total_accumulated_difficulty)
×
725
                    .unwrap_or_default()
×
726
            {
727
                // We should only return this error if the peer sent a PoW less than they advertised.
728
                return Err(BlockHeaderSyncError::PeerSentInaccurateChainMetadata {
×
729
                    claimed: sync_peer.claimed_chain_metadata().accumulated_difficulty(),
×
730
                    actual: self
×
731
                        .header_validator
×
732
                        .current_valid_chain_tip_header()
×
733
                        .map(|h| h.accumulated_data().total_accumulated_difficulty),
×
734
                    local: split_info
×
735
                        .best_block_header
×
736
                        .accumulated_data()
×
737
                        .total_accumulated_difficulty,
×
738
                });
×
739
            } else {
740
                warn!(
×
741
                    target: LOG_TARGET,
×
742
                    "Received pow from peer matches claimed, difficulty #{} but local is higher: ({}) and we have not \
×
743
                     swapped. Ignoring",
×
744
                    sync_peer.claimed_chain_metadata().accumulated_difficulty(),
×
745
                    split_info
×
746
                        .best_block_header
×
747
                        .accumulated_data()
×
748
                        .total_accumulated_difficulty
749
                );
750
                return Ok(());
×
751
            }
752
        }
×
753

×
754
        // Commit the last blocks that don't fit into the COMMIT_EVENT_N_HEADERS blocks
×
755
        if !self.header_validator.valid_headers().is_empty() {
×
756
            self.commit_pending_headers().await?;
×
757
        }
×
758

759
        let claimed_total_accumulated_diff = sync_peer.claimed_chain_metadata().accumulated_difficulty();
×
760
        // This rule is strict: if the peer advertised a higher PoW than they were able to provide (without
×
761
        // some other external factor like a disconnect etc), we detect the and ban the peer.
×
762
        if last_total_accumulated_difficulty < claimed_total_accumulated_diff {
×
763
            return Err(BlockHeaderSyncError::PeerSentInaccurateChainMetadata {
×
764
                claimed: claimed_total_accumulated_diff,
×
765
                actual: Some(last_total_accumulated_difficulty),
×
766
                local: split_info
×
767
                    .best_block_header
×
768
                    .accumulated_data()
×
769
                    .total_accumulated_difficulty,
×
770
            });
×
771
        }
×
772

×
773
        Ok(())
×
UNCOV
774
    }
×
775

UNCOV
776
    async fn commit_pending_headers(&mut self) -> Result<ChainHeader, BlockHeaderSyncError> {
×
UNCOV
777
        let chain_headers = self.header_validator.take_valid_headers();
×
UNCOV
778
        let num_headers = chain_headers.len();
×
UNCOV
779
        let start = Instant::now();
×
UNCOV
780

×
UNCOV
781
        let new_tip = chain_headers.last().cloned().unwrap();
×
UNCOV
782
        let mut txn = self.db.write_transaction();
×
UNCOV
783
        chain_headers.into_iter().for_each(|chain_header| {
×
UNCOV
784
            txn.insert_chain_header(chain_header);
×
UNCOV
785
        });
×
UNCOV
786

×
UNCOV
787
        txn.commit().await?;
×
788

UNCOV
789
        debug!(
×
790
            target: LOG_TARGET,
×
791
            "{} header(s) committed (tip = {}) to the blockchain db in {:.2?}",
×
792
            num_headers,
×
793
            new_tip.height(),
×
794
            start.elapsed()
×
795
        );
796

UNCOV
797
        Ok(new_tip)
×
UNCOV
798
    }
×
799

UNCOV
800
    fn pending_chain_has_higher_pow(&self, current_tip: &ChainHeader) -> bool {
×
UNCOV
801
        let chain_headers = self.header_validator.valid_headers();
×
UNCOV
802
        if chain_headers.is_empty() {
×
803
            return false;
×
UNCOV
804
        }
×
UNCOV
805

×
UNCOV
806
        // Check that the remote tip is stronger than the local tip, equal should not have ended up here, so we treat
×
UNCOV
807
        // equal as less
×
UNCOV
808
        let proposed_tip = chain_headers.last().unwrap();
×
UNCOV
809
        self.header_validator.compare_chains(current_tip, proposed_tip).is_lt()
×
UNCOV
810
    }
×
811

UNCOV
812
    async fn switch_to_pending_chain(&mut self, split_info: &ChainSplitInfo) -> Result<(), BlockHeaderSyncError> {
×
UNCOV
813
        // Reorg if required
×
UNCOV
814
        if split_info.reorg_steps_back > 0 {
×
UNCOV
815
            debug!(
×
816
                target: LOG_TARGET,
×
817
                "Reorg: Rewinding the chain by {} block(s) (split hash = {})",
×
818
                split_info.reorg_steps_back,
×
819
                split_info.chain_split_hash.to_hex()
×
820
            );
UNCOV
821
            let blocks = self.rewind_blockchain(split_info.chain_split_hash).await?;
×
UNCOV
822
            if !blocks.is_empty() {
×
UNCOV
823
                self.hooks.call_on_rewind_hooks(blocks);
×
UNCOV
824
            }
×
UNCOV
825
        }
×
826

827
        // Commit the forked chain. At this point
828
        // 1. Headers have been validated
829
        // 2. The forked chain has a higher PoW than the local chain
830
        //
831
        // After this we commit headers every `n` blocks
UNCOV
832
        self.commit_pending_headers().await?;
×
833

UNCOV
834
        Ok(())
×
UNCOV
835
    }
×
836

837
    // Sync peers are also removed from the list of sync peers if the ban duration is longer than the short ban period.
838
    fn remove_sync_peer(&mut self, node_id: &NodeId) {
10✔
839
        if let Some(pos) = self.sync_peers.iter().position(|p| p.node_id() == node_id) {
10✔
840
            self.sync_peers.remove(pos);
10✔
841
        }
10✔
842
    }
10✔
843

844
    // Helper function to get the index to the node_id inside of the vec of peers
845
    fn get_sync_peer_index(&mut self, node_id: &NodeId) -> Option<usize> {
10✔
846
        self.sync_peers.iter().position(|p| p.node_id() == node_id)
10✔
847
    }
10✔
848
}
849

850
#[derive(Debug, Clone)]
851
struct FindChainSplitResult {
852
    reorg_steps_back: u64,
853
    peer_headers: Vec<ProtoBlockHeader>,
854
    peer_fork_hash_index: u64,
855
    chain_split_hash: HashOutput,
856
}
857

858
/// Information about the chain split from the remote node.
859
#[derive(Debug, Clone, PartialEq)]
860
pub struct ChainSplitInfo {
861
    /// The best block's header on the local chain.
862
    pub best_block_header: ChainHeader,
863
    /// The number of blocks to reorg back to the fork.
864
    pub reorg_steps_back: u64,
865
    /// The hash of the block at the fork.
866
    pub chain_split_hash: HashOutput,
867
}
868

869
/// The result of an attempt to synchronize headers with a peer.
870
#[derive(Debug, Clone, PartialEq)]
871
pub struct AttemptSyncResult {
872
    /// The number of headers that were returned.
873
    pub headers_returned: u64,
874
    /// The fork hash index of the remote peer.
875
    pub peer_fork_hash_index: u64,
876
    /// The header sync status.
877
    pub header_sync_status: HeaderSyncStatus,
878
}
879

880
#[derive(Debug, Clone, PartialEq)]
881
pub enum HeaderSyncStatus {
882
    /// Local and remote node are in sync or ahead
883
    InSyncOrAhead,
884
    /// Local node is lagging behind remote node
885
    Lagging(Box<ChainSplitInfo>),
886
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc