• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 17328022990

29 Aug 2025 03:33PM UTC coverage: 61.105% (+1.0%) from 60.114%
17328022990

push

github

web-flow
feat: improve header sync (#7421)

Description
---
When we have a valid PoW equal to or higher than what we have or what
has been advertised by the sync peer, we need to commit the headers, so
any downstream block sync error does not invalidate the headers that
have been downloaded.

_**Edit:** When block sync fails, we also make sure to swap to the best
PoW chain, also preserving banked headers if we do not need to reorg._

Closes #7342

Motivation and Context
---
See #7342

How Has This Been Tested?
---
System-level tests passed:
- Synching a previously synced node, a couple of days old.
- Syncing from scratch, encountering multiple block-sync failures, but
using the good PoW headers already banked.
```rust
   51952: 2025-08-18 16:07:02.097680100 [c::bn::block_sync] DEBUG Validating block body #22624 (PoW = RandomXTari, input(s): 1, output(s): 54, kernel(s): 2, latency: 108.00µs)
   51961: 2025-08-18 16:07:02.111648900 [c::bn::block_sync] DEBUG Validated in 14ms. Storing block body #22624 (PoW = RandomXTari, input(s): 1, output(s): 54, kernel(s): 2)
   51962: 2025-08-18 16:07:02.111668100 [c::bn::block_sync] TRACE Hash: <a class=hub.com/tari-project/tari/commit/91a1fbccc1f94e2aa93b6efdaf9c3fbf16d0588c">91a1fbccc<a href="https://github.com/tari-project/tari/commit/84c81d129edc8d90adeec2eed2e207a61b44bef4">3458b7ef444e605f62375077
   54024: 2025-08-18 16:11:06.389482300 [c::bn::block_sync] WARN  Peer did not supply all the blocks they claimed they had: Their claim - height: 74721, accumulated difficulty: <a class="double-link" href="https://github.com/tari-project/tari/commit/4089235547434023187024157117581153168163">408923554</a><a href="https://github.com/tari-project/tari/commit/84c81d129edc8d90adeec2eed2e207a61b44bef4">715651441378. Our status after block sync - height: 22624, accumulated difficulty: </a><a class="double-link" href="https://github.com/tari-project/tari/commit/1547640955349056263010512134240412613668">154764095</a><a href="https://github.com/tari-project/tari/commit/84c81d129edc8d90adeec2eed2e207a61b44bef4">889130919000
   54028: 2025-08-18 16:11:06.390047900 [c::bn::block_sync] WARN  Block sync failed - best header: 74723/1bcf293971df2d6c888a6589dca3a323b, best block: 22624/91a1fbccc1f94e2aa93b6efdaf9c3fbf16d0588c3458b7ef444e605f62375077. No more sync peers available: Block sync failed
   54109: 2025-08-18 16:11:11.596105400 [c::bn::header_sync] DEBUG Starting header sync.
   54110: 2025-08-18 16:11:11.59617... (continued)

70 of 132 new or added lines in 3 files covered. (53.03%)

11 existing lines in 6 files now uncovered.

72705 of 118983 relevant lines covered (61.11%)

229424.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs
1
//  Copyright 2020, The Tari Project
2
//
3
//  Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
//  following conditions are met:
5
//
6
//  1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
//  disclaimer.
8
//
9
//  2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
//  following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
//  3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
//  products derived from this software without specific prior written permission.
14
//
15
//  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
//  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
//  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
//  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
//  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
//  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
//  USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22
use std::{
23
    sync::Arc,
24
    time::{Duration, Instant},
25
};
26

27
use futures::StreamExt;
28
use log::*;
29
use primitive_types::U512;
30
use tari_common_types::{chain_metadata::ChainMetadata, types::HashOutput};
31
use tari_comms::{
32
    connectivity::ConnectivityRequester,
33
    peer_manager::NodeId,
34
    protocol::rpc::{RpcClient, RpcError},
35
    PeerConnection,
36
};
37
use tari_node_components::blocks::BlockHeader;
38
use tari_transaction_components::BanPeriod;
39
use tari_utilities::hex::Hex;
40

41
pub(crate) use super::{validator::BlockHeaderSyncValidator, BlockHeaderSyncError};
42
use crate::{
43
    base_node::sync::{
44
        ban::PeerBanManager,
45
        header_sync::HEADER_SYNC_INITIAL_MAX_HEADERS,
46
        hooks::Hooks,
47
        rpc,
48
        BlockchainSyncConfig,
49
        SyncPeer,
50
    },
51
    blocks::{ChainBlock, ChainHeader},
52
    chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend, ChainStorageError},
53
    common::rolling_avg::RollingAverageTime,
54
    consensus::BaseNodeConsensusManager,
55
    proof_of_work::randomx_factory::RandomXFactory,
56
    proto::{
57
        base_node::{FindChainSplitRequest, SyncHeadersRequest},
58
        core::BlockHeader as ProtoBlockHeader,
59
    },
60
};
61

62
const LOG_TARGET: &str = "c::bn::header_sync";
63

64
const MAX_LATENCY_INCREASES: usize = 5;
65

66
pub struct HeaderSynchronizer<'a, B> {
67
    config: BlockchainSyncConfig,
68
    db: AsyncBlockchainDb<B>,
69
    header_validator: BlockHeaderSyncValidator<B>,
70
    connectivity: ConnectivityRequester,
71
    sync_peers: &'a mut Vec<SyncPeer>,
72
    hooks: Hooks,
73
    local_cached_metadata: &'a ChainMetadata,
74
    peer_ban_manager: PeerBanManager,
75
}
76

77
impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {
78
    pub fn new(
×
79
        config: BlockchainSyncConfig,
×
80
        db: AsyncBlockchainDb<B>,
×
81
        consensus_rules: BaseNodeConsensusManager,
×
82
        connectivity: ConnectivityRequester,
×
83
        sync_peers: &'a mut Vec<SyncPeer>,
×
84
        randomx_factory: RandomXFactory,
×
85
        local_metadata: &'a ChainMetadata,
×
86
    ) -> Self {
×
87
        let peer_ban_manager = PeerBanManager::new(config.clone(), connectivity.clone());
×
88
        Self {
×
89
            config,
×
90
            header_validator: BlockHeaderSyncValidator::new(db.clone(), consensus_rules, randomx_factory),
×
91
            db,
×
92
            connectivity,
×
93
            sync_peers,
×
94
            hooks: Default::default(),
×
95
            local_cached_metadata: local_metadata,
×
96
            peer_ban_manager,
×
97
        }
×
98
    }
×
99

100
    pub fn on_starting<H>(&mut self, hook: H)
×
101
    where for<'r> H: FnOnce(&SyncPeer) + Send + Sync + 'static {
×
102
        self.hooks.add_on_starting_hook(hook);
×
103
    }
×
104

105
    pub fn on_progress<H>(&mut self, hook: H)
×
106
    where H: Fn(u64, u64, &SyncPeer) + Send + Sync + 'static {
×
107
        self.hooks.add_on_progress_header_hook(hook);
×
108
    }
×
109

110
    pub fn on_rewind<H>(&mut self, hook: H)
×
111
    where H: Fn(Vec<Arc<ChainBlock>>) + Send + Sync + 'static {
×
112
        self.hooks.add_on_rewind_hook(hook);
×
113
    }
×
114

115
    pub async fn synchronize(&mut self) -> Result<(SyncPeer, AttemptSyncResult), BlockHeaderSyncError> {
×
116
        debug!(target: LOG_TARGET, "Starting header sync.",);
×
117

118
        info!(
×
119
            target: LOG_TARGET,
×
120
            "Synchronizing headers ({} candidate peers selected)",
×
121
            self.sync_peers.len()
×
122
        );
123
        let mut max_latency = self.config.initial_max_sync_latency;
×
124
        let mut latency_increases_counter = 0;
×
125
        loop {
126
            match self.try_sync_from_all_peers(max_latency).await {
×
127
                Ok((peer, sync_result)) => break Ok((peer, sync_result)),
×
128
                Err(err @ BlockHeaderSyncError::AllSyncPeersExceedLatency) => {
×
129
                    // If we have few sync peers, throw this out to be retried later
×
130
                    if self.sync_peers.len() < 2 {
×
131
                        return Err(err);
×
132
                    }
×
133
                    max_latency += self.config.max_latency_increase;
×
134
                    latency_increases_counter += 1;
×
135
                    if latency_increases_counter > MAX_LATENCY_INCREASES {
×
136
                        return Err(err);
×
137
                    }
×
138
                },
139
                Err(err) => break Err(err),
×
140
            }
141
        }
142
    }
×
143

144
    #[allow(clippy::too_many_lines)]
145
    pub async fn try_sync_from_all_peers(
×
146
        &mut self,
×
147
        max_latency: Duration,
×
148
    ) -> Result<(SyncPeer, AttemptSyncResult), BlockHeaderSyncError> {
×
149
        let sync_peer_node_ids = self.sync_peers.iter().map(|p| p.node_id()).cloned().collect::<Vec<_>>();
×
150
        info!(
×
151
            target: LOG_TARGET,
×
152
            "Attempting to sync headers ({} sync peers)",
×
153
            sync_peer_node_ids.len()
×
154
        );
155
        let mut latency_counter = 0usize;
×
156
        for node_id in sync_peer_node_ids {
×
157
            match self.connect_and_attempt_sync(&node_id, max_latency).await {
×
158
                Ok((peer, sync_result)) => return Ok((peer, sync_result)),
×
159
                Err(err) => {
×
160
                    let ban_reason = BlockHeaderSyncError::get_ban_reason(&err);
×
161
                    if let Some(reason) = ban_reason {
×
162
                        warn!(target: LOG_TARGET, "{err}");
×
163
                        let duration = match reason.ban_duration {
×
164
                            BanPeriod::Short => self.config.short_ban_period,
×
165
                            BanPeriod::Long => self.config.ban_period,
×
166
                        };
167
                        self.peer_ban_manager
×
168
                            .ban_peer_if_required(&node_id, reason.reason, duration)
×
169
                            .await;
×
170
                    }
×
171
                    if let BlockHeaderSyncError::MaxLatencyExceeded { .. } = err {
×
172
                        latency_counter += 1;
×
173
                    } else {
×
174
                        self.remove_sync_peer(&node_id);
×
175
                    }
×
176
                },
177
            }
178
        }
179

180
        if self.sync_peers.is_empty() {
×
181
            Err(BlockHeaderSyncError::NoMoreSyncPeers("Header sync failed".to_string()))
×
182
        } else if latency_counter >= self.sync_peers.len() {
×
183
            Err(BlockHeaderSyncError::AllSyncPeersExceedLatency)
×
184
        } else {
185
            Err(BlockHeaderSyncError::SyncFailedAllPeers)
×
186
        }
187
    }
×
188

189
    async fn connect_and_attempt_sync(
×
190
        &mut self,
×
191
        node_id: &NodeId,
×
192
        max_latency: Duration,
×
193
    ) -> Result<(SyncPeer, AttemptSyncResult), BlockHeaderSyncError> {
×
194
        let peer_index = self
×
195
            .get_sync_peer_index(node_id)
×
196
            .ok_or(BlockHeaderSyncError::PeerNotFound)?;
×
197
        let sync_peer = self.sync_peers.get(peer_index).expect("Already checked");
×
198
        self.hooks.call_on_starting_hook(sync_peer);
×
199

200
        let mut conn = self.dial_sync_peer(node_id).await?;
×
201
        debug!(
×
202
            target: LOG_TARGET,
×
203
            "Attempting to synchronize headers with `{node_id}`"
×
204
        );
205

206
        let config = RpcClient::builder()
×
207
            .with_deadline(self.config.rpc_deadline)
×
208
            .with_deadline_grace_period(Duration::from_secs(5));
×
209
        let mut client = conn
×
210
            .connect_rpc_using_builder::<rpc::BaseNodeSyncRpcClient>(config)
×
211
            .await?;
×
212

213
        let latency = client
×
214
            .get_last_request_latency()
×
215
            .expect("unreachable panic: last request latency must be set after connect");
×
216
        self.sync_peers
×
217
            .get_mut(peer_index)
×
218
            .ok_or(BlockHeaderSyncError::PeerNotFound)?
×
219
            .set_latency(latency);
×
220
        if latency > max_latency {
×
221
            return Err(BlockHeaderSyncError::MaxLatencyExceeded {
×
222
                peer: conn.peer_node_id().clone(),
×
223
                latency,
×
224
                max_latency,
×
225
            });
×
226
        }
×
227

×
228
        debug!(target: LOG_TARGET, "Sync peer latency is {latency:.2?}");
×
229
        let sync_peer = self
×
230
            .sync_peers
×
231
            .get(peer_index)
×
232
            .ok_or(BlockHeaderSyncError::PeerNotFound)?
×
233
            .clone();
×
234
        let sync_result = self.attempt_sync(&sync_peer, client, max_latency).await?;
×
235
        Ok((sync_peer, sync_result))
×
236
    }
×
237

238
    async fn dial_sync_peer(&self, node_id: &NodeId) -> Result<PeerConnection, BlockHeaderSyncError> {
×
239
        let timer = Instant::now();
×
240
        debug!(target: LOG_TARGET, "Dialing {node_id} sync peer");
×
241
        let conn = self.connectivity.dial_peer(node_id.clone()).await?;
×
242
        info!(
×
243
            target: LOG_TARGET,
×
244
            "Successfully dialed sync peer {} in {:.2?}",
×
245
            node_id,
×
246
            timer.elapsed()
×
247
        );
248
        Ok(conn)
×
249
    }
×
250

251
    async fn attempt_sync(
×
252
        &mut self,
×
253
        sync_peer: &SyncPeer,
×
254
        mut client: rpc::BaseNodeSyncRpcClient,
×
255
        max_latency: Duration,
×
256
    ) -> Result<AttemptSyncResult, BlockHeaderSyncError> {
×
257
        let latency = client.get_last_request_latency();
×
258
        debug!(
×
259
            target: LOG_TARGET,
×
260
            "Initiating header sync with peer `{}` (sync latency = {}ms)",
×
261
            sync_peer.node_id(),
×
262
            latency.unwrap_or_default().as_millis()
×
263
        );
264

265
        // Fetch best local data at the beginning of the sync process
266
        let best_block_metadata = self.db.get_chain_metadata().await?;
×
267
        let best_header = self.db.fetch_last_chain_header().await?;
×
268
        let best_block_header = self
×
269
            .db
×
270
            .fetch_chain_header(best_block_metadata.best_block_height())
×
271
            .await?;
×
272
        let best_header_height = best_header.height();
×
273
        let best_block_height = best_block_header.height();
×
274

×
275
        if best_header_height < best_block_height || best_block_height < self.local_cached_metadata.best_block_height()
×
276
        {
277
            return Err(BlockHeaderSyncError::ChainStorageError(
×
278
                ChainStorageError::CorruptedDatabase("Inconsistent block and header data".to_string()),
×
279
            ));
×
280
        }
×
281

282
        // - At this point we may have more (InSyncOrAhead), equal (InSyncOrAhead), or less headers (Lagging) than the
283
        //   peer, but they claimed better POW before we attempted sync.
284
        // - This method will return ban-able errors for certain offenses.
285
        let (header_sync_status, peer_response) = self
×
286
            .determine_sync_status(sync_peer, best_header.clone(), best_block_header.clone(), &mut client)
×
287
            .await?;
×
288

289
        match header_sync_status.clone() {
×
290
            HeaderSyncStatus::InSyncOrAhead => {
291
                debug!(
×
292
                    target: LOG_TARGET,
×
293
                    "Headers are in sync at height {best_header_height} but tip is {best_block_height}. Proceeding to archival/pruned block sync"
×
294
                );
295

296
                Ok(AttemptSyncResult {
×
297
                    headers_returned: peer_response.peer_headers.len() as u64,
×
298
                    peer_fork_hash_index: peer_response.peer_fork_hash_index,
×
299
                    header_sync_status,
×
300
                })
×
301
            },
302
            HeaderSyncStatus::Lagging(split_info) => {
×
303
                self.hooks.call_on_progress_header_hooks(
×
304
                    split_info
×
305
                        .best_block_header
×
306
                        .height()
×
307
                        .checked_sub(split_info.reorg_steps_back)
×
308
                        .unwrap_or_default(),
×
309
                    sync_peer.claimed_chain_metadata().best_block_height(),
×
310
                    sync_peer,
×
311
                );
×
312
                self.synchronize_headers(sync_peer.clone(), &mut client, *split_info, max_latency)
×
313
                    .await?;
×
314
                Ok(AttemptSyncResult {
×
315
                    headers_returned: peer_response.peer_headers.len() as u64,
×
316
                    peer_fork_hash_index: peer_response.peer_fork_hash_index,
×
317
                    header_sync_status,
×
318
                })
×
319
            },
320
        }
321
    }
×
322

323
    #[allow(clippy::too_many_lines)]
324
    async fn find_chain_split(
×
325
        &mut self,
×
326
        peer_node_id: &NodeId,
×
327
        client: &mut rpc::BaseNodeSyncRpcClient,
×
328
        header_count: u64,
×
329
    ) -> Result<FindChainSplitResult, BlockHeaderSyncError> {
×
330
        const NUM_CHAIN_SPLIT_HEADERS: usize = 500;
331
        // Limit how far back we're willing to go. A peer might just say it does not have a chain split
332
        // and keep us busy going back until the genesis.
333
        // 20 x 500 = max 10,000 block split can be detected
334
        const MAX_CHAIN_SPLIT_ITERS: usize = 20;
335

336
        let mut offset = 0;
×
337
        let mut iter_count = 0;
×
338
        loop {
339
            iter_count += 1;
×
340
            if iter_count > MAX_CHAIN_SPLIT_ITERS {
×
341
                warn!(
×
342
                    target: LOG_TARGET,
×
343
                    "Peer `{}` did not provide a chain split after {} headers requested. Peer will be banned.",
×
344
                    peer_node_id,
×
345
                    NUM_CHAIN_SPLIT_HEADERS * MAX_CHAIN_SPLIT_ITERS,
×
346
                );
347
                return Err(BlockHeaderSyncError::ChainSplitNotFound(peer_node_id.clone()));
×
348
            }
×
349

350
            let block_hashes = self
×
351
                .db
×
352
                .fetch_block_hashes_from_header_tip(NUM_CHAIN_SPLIT_HEADERS, offset)
×
353
                .await?;
×
354
            debug!(
×
355
                target: LOG_TARGET,
×
356
                "Determining if chain splits between {} and {} headers back from the tip (peer: `{}`, {} hashes sent)",
×
357
                offset,
×
358
                offset + NUM_CHAIN_SPLIT_HEADERS,
×
359
                peer_node_id,
×
360
                block_hashes.len()
×
361
            );
362

363
            // No further hashes to send.
364
            if block_hashes.is_empty() {
×
365
                warn!(
×
366
                    target: LOG_TARGET,
×
367
                    "Peer `{}` did not provide a chain split after {} headers requested. Peer will be banned.",
×
368
                    peer_node_id,
×
369
                    NUM_CHAIN_SPLIT_HEADERS * MAX_CHAIN_SPLIT_ITERS,
×
370
                );
371
                return Err(BlockHeaderSyncError::ChainSplitNotFound(peer_node_id.clone()));
×
372
            }
×
373

×
374
            let request = FindChainSplitRequest {
×
375
                block_hashes: block_hashes.clone().iter().map(|v| v.to_vec()).collect(),
×
376
                header_count,
×
377
            };
×
378

379
            let resp = match client.find_chain_split(request).await {
×
380
                Ok(r) => r,
×
381
                Err(RpcError::RequestFailed(err)) if err.as_status_code().is_not_found() => {
×
382
                    // This round we sent less hashes than the max, so the next round will not have any more hashes to
×
383
                    // send. Exit early in this case.
×
384
                    if block_hashes.len() < NUM_CHAIN_SPLIT_HEADERS {
×
385
                        warn!(
×
386
                            target: LOG_TARGET,
×
387
                            "Peer `{}` did not provide a chain split after {} headers requested. Peer will be banned.",
×
388
                            peer_node_id,
×
389
                            NUM_CHAIN_SPLIT_HEADERS * MAX_CHAIN_SPLIT_ITERS,
×
390
                        );
391
                        return Err(BlockHeaderSyncError::ChainSplitNotFound(peer_node_id.clone()));
×
392
                    }
×
393
                    // Chain split not found, let's go further back
×
394
                    offset = NUM_CHAIN_SPLIT_HEADERS * iter_count;
×
395
                    continue;
×
396
                },
397
                Err(err) => {
×
398
                    return Err(err.into());
×
399
                },
400
            };
401
            if resp.headers.len() > HEADER_SYNC_INITIAL_MAX_HEADERS {
×
402
                warn!(
×
403
                    target: LOG_TARGET,
×
404
                    "Peer `{}` sent too many headers {}, only requested {}. Peer will be banned.",
×
405
                    peer_node_id,
×
406
                    resp.headers.len(),
×
407
                    HEADER_SYNC_INITIAL_MAX_HEADERS,
408
                );
409
                return Err(BlockHeaderSyncError::PeerSentTooManyHeaders(resp.headers.len()));
×
410
            }
×
411
            if resp.fork_hash_index >= block_hashes.len() as u64 {
×
412
                warn!(
×
413
                    target: LOG_TARGET,
×
414
                    "Peer `{}` sent hash index {} out of range {}. Peer will be banned.",
×
415
                    peer_node_id,
×
416
                    resp.fork_hash_index,
×
417
                    block_hashes.len(),
×
418
                );
419
                return Err(BlockHeaderSyncError::FoundHashIndexOutOfRange(
×
420
                    block_hashes.len() as u64,
×
421
                    resp.fork_hash_index,
×
422
                ));
×
423
            }
×
424
            #[allow(clippy::cast_possible_truncation)]
×
425
            if !resp.headers.is_empty() &&
×
426
                *resp.headers.first().expect("Already checked").prev_hash !=
×
427
                    *block_hashes
×
428
                        .get(resp.fork_hash_index as usize)
×
429
                        .expect("Already checked")
×
430
            {
431
                warn!(
×
432
                    target: LOG_TARGET,
×
433
                    "Peer `{}` sent hash an invalid protocol response, incorrect fork hash index {}. Peer will be banned.",
×
434
                    peer_node_id,
435
                    resp.fork_hash_index,
436
                );
437
                return Err(BlockHeaderSyncError::InvalidProtocolResponse(
×
438
                    "Peer sent incorrect fork hash index".into(),
×
439
                ));
×
440
            }
×
441
            #[allow(clippy::cast_possible_truncation)]
×
442
            let chain_split_hash = *block_hashes
×
443
                .get(resp.fork_hash_index as usize)
×
444
                .expect("Already checked");
×
445

×
446
            return Ok(FindChainSplitResult {
×
447
                reorg_steps_back: resp.fork_hash_index.saturating_add(offset as u64),
×
448
                peer_headers: resp.headers,
×
449
                peer_fork_hash_index: resp.fork_hash_index,
×
450
                chain_split_hash,
×
451
            });
×
452
        }
453
    }
×
454

455
    /// Attempt to determine the point at which the remote and local chain diverge, returning the relevant information
456
    /// of the chain split (see [HeaderSyncStatus]).
457
    ///
458
    /// If the local node is behind the remote chain (i.e. `HeaderSyncStatus::Lagging`), the appropriate
459
    /// `ChainSplitInfo` is returned, the header validator is initialized and the preliminary headers are validated.
460
    async fn determine_sync_status(
×
461
        &mut self,
×
462
        sync_peer: &SyncPeer,
×
463
        best_header: ChainHeader,
×
464
        best_block_header: ChainHeader,
×
465
        client: &mut rpc::BaseNodeSyncRpcClient,
×
466
    ) -> Result<(HeaderSyncStatus, FindChainSplitResult), BlockHeaderSyncError> {
×
467
        // This method will return ban-able errors for certain offenses.
468
        let chain_split_result = self
×
469
            .find_chain_split(sync_peer.node_id(), client, HEADER_SYNC_INITIAL_MAX_HEADERS as u64)
×
470
            .await?;
×
471
        if chain_split_result.reorg_steps_back > 0 {
×
472
            debug!(
×
473
                target: LOG_TARGET,
×
474
                "Found chain split {} blocks back, received {} headers from peer `{}`",
×
475
                chain_split_result.reorg_steps_back,
×
476
                chain_split_result.peer_headers.len(),
×
477
                sync_peer
478
            );
479
        }
×
480

481
        // If the peer returned no new headers, they may still have more blocks than we have, thus have a higher
482
        // accumulated difficulty.
483
        if chain_split_result.peer_headers.is_empty() {
×
484
            // Our POW is less than the peer's POW, as verified before the attempted header sync, therefore, if the
485
            // peer did not supply any headers and we know we are behind based on the peer's claimed metadata, then
486
            // we can ban the peer.
487
            if best_header.height() == best_block_header.height() {
×
488
                warn!(
×
489
                    target: LOG_TARGET,
×
490
                    "Peer `{}` did not provide any headers although they have a better chain and more headers: their \
×
491
                    difficulty: {}, our difficulty: {}. Peer will be banned.",
×
492
                    sync_peer.node_id(),
×
493
                    sync_peer.claimed_chain_metadata().accumulated_difficulty(),
×
494
                    best_block_header.accumulated_data().total_accumulated_difficulty,
×
495
                );
496
                return Err(BlockHeaderSyncError::PeerSentInaccurateChainMetadata {
×
497
                    claimed: sync_peer.claimed_chain_metadata().accumulated_difficulty(),
×
498
                    actual: None,
×
499
                    local: best_block_header.accumulated_data().total_accumulated_difficulty,
×
500
                });
×
501
            }
×
502
            debug!(target: LOG_TARGET, "Peer `{}` sent no headers; headers already in sync with peer.", sync_peer.node_id());
×
503
            return Ok((HeaderSyncStatus::InSyncOrAhead, chain_split_result));
×
504
        }
×
505

506
        let headers = chain_split_result
×
507
            .peer_headers
×
508
            .clone()
×
509
            .into_iter()
×
510
            .map(BlockHeader::try_from)
×
511
            .collect::<Result<Vec<_>, _>>()
×
512
            .map_err(BlockHeaderSyncError::ReceivedInvalidHeader)?;
×
513
        let num_new_headers = headers.len();
×
514
        // Do a cheap check to verify that we do not have these series of headers in the db already - if the 1st one is
×
515
        // not there most probably the rest are not either - the peer could still have returned old headers later on in
×
516
        // the list
×
517
        if self
×
518
            .db
×
519
            .fetch_header_by_block_hash(headers.first().expect("Already checked").hash())
×
520
            .await?
×
521
            .is_some()
×
522
        {
523
            return Err(BlockHeaderSyncError::ReceivedInvalidHeader(
×
524
                "Header already in database".to_string(),
×
525
            ));
×
526
        };
×
527

×
528
        self.header_validator
×
529
            .initialize_state(&chain_split_result.chain_split_hash)
×
530
            .await?;
×
531
        for header in headers {
×
532
            debug!(
×
533
                target: LOG_TARGET,
×
534
                "Validating header #{} (Pow: {}) with hash: ({})",
×
535
                header.height,
×
536
                header.pow_algo(),
×
537
                header.hash().to_hex(),
×
538
            );
539
            self.header_validator.validate(header).await?;
×
540
        }
541

542
        debug!(
×
543
            target: LOG_TARGET,
×
544
            "Peer `{}` has submitted {} valid header(s)", sync_peer.node_id(), num_new_headers
×
545
        );
546

547
        let chain_split_info = ChainSplitInfo {
×
548
            best_block_header,
×
549
            reorg_steps_back: chain_split_result.reorg_steps_back,
×
550
            chain_split_hash: chain_split_result.chain_split_hash,
×
551
        };
×
552
        Ok((
×
553
            HeaderSyncStatus::Lagging(Box::new(chain_split_info)),
×
554
            chain_split_result,
×
555
        ))
×
556
    }
×
557

558
    async fn rewind_blockchain(&self, split_hash: HashOutput) -> Result<Vec<Arc<ChainBlock>>, BlockHeaderSyncError> {
×
559
        debug!(
×
560
            target: LOG_TARGET,
×
561
            "Deleting headers that no longer form part of the main chain up until split at {}",
×
562
            split_hash.to_hex()
×
563
        );
564

565
        let blocks = self.db.rewind_to_hash(split_hash).await?;
×
566
        debug!(
×
567
            target: LOG_TARGET,
×
568
            "Rewound {} block(s) in preparation for header sync",
×
569
            blocks.len()
×
570
        );
571
        Ok(blocks)
×
572
    }
×
573

574
    #[allow(clippy::too_many_lines)]
575
    async fn synchronize_headers(
×
576
        &mut self,
×
577
        mut sync_peer: SyncPeer,
×
578
        client: &mut rpc::BaseNodeSyncRpcClient,
×
579
        split_info: ChainSplitInfo,
×
580
        max_latency: Duration,
×
581
    ) -> Result<(), BlockHeaderSyncError> {
×
582
        info!(target: LOG_TARGET, "Starting header sync from peer {sync_peer}");
×
583
        const COMMIT_EVERY_N_HEADERS: usize = 1000;
584

585
        let mut has_switched_to_new_chain = false;
×
586
        let pending_len = self.header_validator.valid_headers().len();
×
587

×
588
        // Find the hash to start syncing the rest of the headers.
×
589
        // The expectation cannot fail because there has been at least one valid header returned (checked in
×
590
        // determine_sync_status)
×
591
        let (start_header_height, start_header_hash, total_accumulated_difficulty) = self
×
592
            .header_validator
×
593
            .current_valid_chain_tip_header()
×
594
            .map(|h| (h.height(), *h.hash(), h.accumulated_data().total_accumulated_difficulty))
×
595
            .expect("synchronize_headers: expected there to be a valid tip header but it was None");
×
596

×
597
        // If we already have a stronger chain at this point, switch over to it.
×
598
        // just in case we happen to be exactly HEADER_SYNC_INITIAL_MAX_HEADERS headers behind.
×
599
        let has_better_pow = self.pending_chain_has_higher_pow(&split_info.best_block_header);
×
600

×
601
        if has_better_pow {
×
602
            debug!(
×
603
                target: LOG_TARGET,
×
604
                "Remote chain from peer {} has higher PoW. Switching",
×
605
                sync_peer.node_id()
×
606
            );
607
            self.switch_to_pending_chain(&split_info).await?;
×
608
            has_switched_to_new_chain = true;
×
609
        }
×
610

611
        if pending_len < HEADER_SYNC_INITIAL_MAX_HEADERS {
×
612
            // Peer returned less than the max number of requested headers. This indicates that we have all the
613
            // available headers from the peer.
614
            if !has_better_pow {
×
615
                // Because the pow is less or equal than the current chain the peer had to have lied about their pow
616
                debug!(target: LOG_TARGET, "No further headers to download");
×
617
                return Err(BlockHeaderSyncError::PeerSentInaccurateChainMetadata {
×
618
                    claimed: sync_peer.claimed_chain_metadata().accumulated_difficulty(),
×
619
                    actual: Some(total_accumulated_difficulty),
×
620
                    local: split_info
×
621
                        .best_block_header
×
622
                        .accumulated_data()
×
623
                        .total_accumulated_difficulty,
×
624
                });
×
625
            }
×
626
            // The pow is higher, we swapped to the higher chain, we have all the better chain headers, we can move on
×
627
            // to block sync.
×
628
            return Ok(());
×
629
        }
×
630

×
631
        debug!(
×
632
            target: LOG_TARGET,
×
633
            "Download remaining headers starting from header #{} from peer `{}`",
×
634
            start_header_height,
×
635
            sync_peer.node_id()
×
636
        );
637
        let request = SyncHeadersRequest {
×
638
            start_hash: start_header_hash.to_vec(),
×
639
            // To the tip!
×
640
            count: 0,
×
641
        };
×
642

643
        let mut header_stream = client.sync_headers(request).await?;
×
644
        debug!(
×
645
            target: LOG_TARGET,
×
646
            "Reading headers from peer `{}`",
×
647
            sync_peer.node_id()
×
648
        );
649

650
        let mut last_sync_timer = Instant::now();
×
651

×
652
        let mut last_total_accumulated_difficulty = U512::zero();
×
653
        let mut avg_latency = RollingAverageTime::new(20);
×
654
        let mut prev_height: Option<u64> = None;
×
655
        while let Some(header) = header_stream.next().await {
×
656
            let latency = last_sync_timer.elapsed();
×
657
            avg_latency.add_sample(latency);
×
658
            let header = BlockHeader::try_from(header?).map_err(BlockHeaderSyncError::ReceivedInvalidHeader)?;
×
659
            debug!(
×
660
                target: LOG_TARGET,
×
661
                "Validating header #{} (Pow: {}) with hash: ({}). Latency: {:.2?}",
×
662
                header.height,
×
663
                header.pow_algo(),
×
664
                header.hash().to_hex(),
×
665
                latency
666
            );
667
            trace!(
×
668
                target: LOG_TARGET,
×
669
                "{header}"
×
670
            );
671
            if let Some(prev_header_height) = prev_height {
×
672
                if header.height != prev_header_height.saturating_add(1) {
×
673
                    warn!(
×
674
                        target: LOG_TARGET,
×
675
                        "Received header #{} `{}` does not follow previous header",
×
676
                        header.height,
×
677
                        header.hash().to_hex()
×
678
                    );
679
                    return Err(BlockHeaderSyncError::ReceivedInvalidHeader(
×
680
                        "Header does not follow previous header".to_string(),
×
681
                    ));
×
682
                }
×
683
            }
×
684
            let existing_header = self.db.fetch_header_by_block_hash(header.hash()).await?;
×
685
            if let Some(h) = existing_header {
×
686
                warn!(
×
687
                    target: LOG_TARGET,
×
688
                    "Received header #{} `{}` that we already have.",
×
689
                    h.height,
×
690
                    h.hash().to_hex()
×
691
                );
692
                return Err(BlockHeaderSyncError::ReceivedInvalidHeader(
×
693
                    "Header already in database".to_string(),
×
694
                ));
×
695
            }
×
696
            let current_height = header.height;
×
697
            last_total_accumulated_difficulty = self.header_validator.validate(header).await?;
×
698

699
            if has_switched_to_new_chain {
×
700
                // If we've switched to the new chain, we simply commit every COMMIT_EVERY_N_HEADERS headers
701
                if self.header_validator.valid_headers().len() >= COMMIT_EVERY_N_HEADERS {
×
702
                    self.commit_pending_headers().await?;
×
703
                }
×
704
            } else {
705
                // The remote chain has not (yet) been accepted.
706
                // We check the tip difficulties, switching over to the new chain if a higher accumulated difficulty is
707
                // achieved.
708
                if self.pending_chain_has_higher_pow(&split_info.best_block_header) {
×
709
                    self.switch_to_pending_chain(&split_info).await?;
×
710
                    has_switched_to_new_chain = true;
×
711
                }
×
712
            }
713

714
            sync_peer.set_latency(latency);
×
715
            sync_peer.add_sample(last_sync_timer.elapsed());
×
716
            self.hooks.call_on_progress_header_hooks(
×
717
                current_height,
×
718
                sync_peer.claimed_chain_metadata().best_block_height(),
×
719
                &sync_peer,
×
720
            );
×
721

×
722
            let last_avg_latency = avg_latency.calculate_average_with_min_samples(5);
×
723
            if let Some(avg_latency) = last_avg_latency {
×
724
                if avg_latency > max_latency {
×
725
                    return Err(BlockHeaderSyncError::MaxLatencyExceeded {
×
726
                        peer: sync_peer.node_id().clone(),
×
727
                        latency: avg_latency,
×
728
                        max_latency,
×
729
                    });
×
730
                }
×
731
            }
×
732

733
            last_sync_timer = Instant::now();
×
734
            prev_height = Some(current_height);
×
735
        }
736

NEW
737
        let claimed_total_accumulated_diff = sync_peer.claimed_chain_metadata().accumulated_difficulty();
×
738
        if !has_switched_to_new_chain {
×
NEW
739
            let best_local_before_sync = split_info
×
NEW
740
                .best_block_header
×
NEW
741
                .accumulated_data()
×
NEW
742
                .total_accumulated_difficulty;
×
NEW
743
            match self
×
NEW
744
                .header_validator
×
NEW
745
                .current_valid_chain_tip_header()
×
NEW
746
                .map(|h| h.accumulated_data().total_accumulated_difficulty)
×
747
            {
NEW
748
                Some(validated_total_accumulated_diff) => {
×
NEW
749
                    if claimed_total_accumulated_diff > validated_total_accumulated_diff {
×
750
                        // Over-claim: peer advertised more PoW than their headers actually provide.
NEW
751
                        return Err(BlockHeaderSyncError::PeerSentInaccurateChainMetadata {
×
NEW
752
                            claimed: claimed_total_accumulated_diff,
×
NEW
753
                            actual: Some(validated_total_accumulated_diff),
×
NEW
754
                            local: best_local_before_sync,
×
NEW
755
                        });
×
NEW
756
                    } else if self.pending_chain_has_higher_pow(&split_info.best_block_header) {
×
NEW
757
                        self.switch_to_pending_chain(&split_info).await?;
×
NEW
758
                        has_switched_to_new_chain = true;
×
NEW
759
                        info!(
×
NEW
760
                            target: LOG_TARGET,
×
NEW
761
                            "Received PoW from peer exceeds local tip. Before sync: {}, received: {}. Committed.",
×
762
                            best_local_before_sync,
763
                            validated_total_accumulated_diff,
764
                        );
765
                    } else {
766
                        // We have a stronger chain, so we do not commit the headers.
NEW
767
                        debug!(
×
NEW
768
                            target: LOG_TARGET,
×
NEW
769
                            "Not committing headers as we have a stronger chain, ours: {} theirs: {}",
×
770
                            best_local_before_sync,
771
                            validated_total_accumulated_diff,
772
                        );
773
                    };
774
                },
775
                None => {
776
                    // No validated headers at this stage, but there should have been.
NEW
777
                    return Err(BlockHeaderSyncError::PeerSentInaccurateChainMetadata {
×
NEW
778
                        claimed: claimed_total_accumulated_diff,
×
NEW
779
                        actual: None,
×
NEW
780
                        local: best_local_before_sync,
×
NEW
781
                    });
×
782
                },
783
            }
784
        }
×
785

786
        // Commit the last blocks only if we have switched to the new chain.
NEW
787
        if has_switched_to_new_chain && !self.header_validator.valid_headers().is_empty() {
×
788
            self.commit_pending_headers().await?;
×
789
        }
×
790

791
        // This rule is strict: if the peer advertised a higher PoW than they were able to provide (without
792
        // some other external factor like a disconnect etc), we detect the and ban the peer.
793
        if last_total_accumulated_difficulty < claimed_total_accumulated_diff {
×
794
            return Err(BlockHeaderSyncError::PeerSentInaccurateChainMetadata {
×
795
                claimed: claimed_total_accumulated_diff,
×
796
                actual: Some(last_total_accumulated_difficulty),
×
797
                local: split_info
×
798
                    .best_block_header
×
799
                    .accumulated_data()
×
800
                    .total_accumulated_difficulty,
×
801
            });
×
802
        }
×
803

×
804
        Ok(())
×
805
    }
×
806

807
    async fn commit_pending_headers(&mut self) -> Result<ChainHeader, BlockHeaderSyncError> {
×
808
        let chain_headers = self.header_validator.take_valid_headers();
×
809
        let num_headers = chain_headers.len();
×
810
        let start = Instant::now();
×
811

×
812
        let new_tip = chain_headers.last().cloned().unwrap();
×
813
        let mut txn = self.db.write_transaction();
×
814
        chain_headers.into_iter().for_each(|chain_header| {
×
815
            txn.insert_chain_header(chain_header);
×
816
        });
×
817

×
818
        txn.commit().await?;
×
819

820
        debug!(
×
821
            target: LOG_TARGET,
×
822
            "{} header(s) committed (tip = {}) to the blockchain db in {:.2?}",
×
823
            num_headers,
×
824
            new_tip.height(),
×
825
            start.elapsed()
×
826
        );
827

828
        Ok(new_tip)
×
829
    }
×
830

831
    fn pending_chain_has_higher_pow(&self, current_tip: &ChainHeader) -> bool {
×
832
        let chain_headers = self.header_validator.valid_headers();
×
833
        if chain_headers.is_empty() {
×
834
            return false;
×
835
        }
×
836

×
837
        // Check that the remote tip is stronger than the local tip, equal should not have ended up here, so we treat
×
838
        // equal as less
×
839
        let proposed_tip = chain_headers.last().unwrap();
×
840
        self.header_validator.compare_chains(current_tip, proposed_tip).is_lt()
×
841
    }
×
842

843
    async fn switch_to_pending_chain(&mut self, split_info: &ChainSplitInfo) -> Result<(), BlockHeaderSyncError> {
×
844
        // Reorg if required
×
845
        if split_info.reorg_steps_back > 0 {
×
846
            debug!(
×
847
                target: LOG_TARGET,
×
848
                "Reorg: Rewinding the chain by {} block(s) (split hash = {})",
×
849
                split_info.reorg_steps_back,
×
850
                split_info.chain_split_hash.to_hex()
×
851
            );
852
            let blocks = self.rewind_blockchain(split_info.chain_split_hash).await?;
×
853
            if !blocks.is_empty() {
×
854
                self.hooks.call_on_rewind_hooks(blocks);
×
855
            }
×
856
        }
×
857

858
        // Commit the forked chain. At this point
859
        // 1. Headers have been validated
860
        // 2. The forked chain has a higher PoW than the local chain
861
        //
862
        // After this we commit headers every `n` blocks
863
        self.commit_pending_headers().await?;
×
864

865
        Ok(())
×
866
    }
×
867

868
    // Sync peers are also removed from the list of sync peers if the ban duration is longer than the short ban period.
869
    fn remove_sync_peer(&mut self, node_id: &NodeId) {
×
870
        if let Some(pos) = self.sync_peers.iter().position(|p| p.node_id() == node_id) {
×
871
            self.sync_peers.remove(pos);
×
872
        }
×
873
    }
×
874

875
    // Helper function to get the index to the node_id inside of the vec of peers
876
    fn get_sync_peer_index(&mut self, node_id: &NodeId) -> Option<usize> {
×
877
        self.sync_peers.iter().position(|p| p.node_id() == node_id)
×
878
    }
×
879
}
880

881
#[derive(Debug, Clone)]
882
struct FindChainSplitResult {
883
    reorg_steps_back: u64,
884
    peer_headers: Vec<ProtoBlockHeader>,
885
    peer_fork_hash_index: u64,
886
    chain_split_hash: HashOutput,
887
}
888

889
/// Information about the chain split from the remote node.
890
#[derive(Debug, Clone, PartialEq)]
891
pub struct ChainSplitInfo {
892
    /// The best block's header on the local chain.
893
    pub best_block_header: ChainHeader,
894
    /// The number of blocks to reorg back to the fork.
895
    pub reorg_steps_back: u64,
896
    /// The hash of the block at the fork.
897
    pub chain_split_hash: HashOutput,
898
}
899

900
/// The result of an attempt to synchronize headers with a peer.
901
#[derive(Debug, Clone, PartialEq)]
902
pub struct AttemptSyncResult {
903
    /// The number of headers that were returned.
904
    pub headers_returned: u64,
905
    /// The fork hash index of the remote peer.
906
    pub peer_fork_hash_index: u64,
907
    /// The header sync status.
908
    pub header_sync_status: HeaderSyncStatus,
909
}
910

911
#[derive(Debug, Clone, PartialEq)]
912
pub enum HeaderSyncStatus {
913
    /// Local and remote node are in sync or ahead
914
    InSyncOrAhead,
915
    /// Local node is lagging behind remote node
916
    Lagging(Box<ChainSplitInfo>),
917
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc