• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 27141517531

08 Jun 2026 01:37PM UTC coverage: 61.298% (-0.01%) from 61.308%
27141517531

push

github

SWvheerden
chore: new release v5.4.0-pre.3

72248 of 117863 relevant lines covered (61.3%)

221811.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.74
/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs
1
//  Copyright 2020, The Tari Project
2
//
3
//  Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
//  following conditions are met:
5
//
6
//  1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
//  disclaimer.
8
//
9
//  2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
//  following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
//  3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
//  products derived from this software without specific prior written permission.
14
//
15
//  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
//  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
//  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
//  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
//  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
//  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
//  USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22
use std::{
23
    sync::Arc,
24
    time::{Duration, Instant},
25
};
26

27
use futures::StreamExt;
28
use log::*;
29
use primitive_types::U512;
30
use tari_common_types::{chain_metadata::ChainMetadata, types::HashOutput};
31
use tari_comms::{
32
    PeerConnection,
33
    RefKind,
34
    connectivity::ConnectivityRequester,
35
    peer_manager::NodeId,
36
    protocol::rpc::{RpcClient, RpcError},
37
};
38
use tari_node_components::blocks::{BlockHeader, ChainBlock, ChainHeader};
39
use tari_transaction_components::BanPeriod;
40
use tari_utilities::hex::Hex;
41

42
pub(crate) use super::{BlockHeaderSyncError, validator::BlockHeaderSyncValidator};
43
use crate::{
44
    base_node::sync::{
45
        BlockchainSyncConfig,
46
        SyncPeer,
47
        ban::PeerBanManager,
48
        header_sync::HEADER_SYNC_INITIAL_MAX_HEADERS,
49
        hooks::Hooks,
50
        rpc,
51
    },
52
    chain_storage::{BlockchainBackend, ChainStorageError, async_db::AsyncBlockchainDb},
53
    common::rolling_avg::RollingAverageTime,
54
    consensus::BaseNodeConsensusManager,
55
    proof_of_work::randomx_factory::RandomXFactory,
56
    proto::{
57
        base_node::{FindChainSplitRequest, SyncHeadersRequest},
58
        core::BlockHeader as ProtoBlockHeader,
59
    },
60
};
61

62
const LOG_TARGET: &str = "c::bn::header_sync";
63

64
const MAX_LATENCY_INCREASES: usize = 5;
65

66
pub struct HeaderSynchronizer<'a, B> {
67
    config: BlockchainSyncConfig,
68
    db: AsyncBlockchainDb<B>,
69
    header_validator: BlockHeaderSyncValidator<B>,
70
    connectivity: ConnectivityRequester,
71
    sync_peers: &'a mut Vec<SyncPeer>,
72
    hooks: Hooks,
73
    local_cached_metadata: &'a ChainMetadata,
74
    peer_ban_manager: PeerBanManager,
75
}
76

77
impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {
78
    pub fn new(
17✔
79
        config: BlockchainSyncConfig,
17✔
80
        db: AsyncBlockchainDb<B>,
17✔
81
        consensus_rules: BaseNodeConsensusManager,
17✔
82
        connectivity: ConnectivityRequester,
17✔
83
        sync_peers: &'a mut Vec<SyncPeer>,
17✔
84
        randomx_factory: RandomXFactory,
17✔
85
        local_metadata: &'a ChainMetadata,
17✔
86
    ) -> Self {
17✔
87
        let peer_ban_manager = PeerBanManager::new(config.clone(), connectivity.clone());
17✔
88
        Self {
17✔
89
            config,
17✔
90
            header_validator: BlockHeaderSyncValidator::new(db.clone(), consensus_rules, randomx_factory),
17✔
91
            db,
17✔
92
            connectivity,
17✔
93
            sync_peers,
17✔
94
            hooks: Default::default(),
17✔
95
            local_cached_metadata: local_metadata,
17✔
96
            peer_ban_manager,
17✔
97
        }
17✔
98
    }
17✔
99

100
    pub fn on_starting<H>(&mut self, hook: H)
17✔
101
    where for<'r> H: FnOnce(&SyncPeer) + Send + Sync + 'static {
17✔
102
        self.hooks.add_on_starting_hook(hook);
17✔
103
    }
17✔
104

105
    pub fn on_progress<H>(&mut self, hook: H)
17✔
106
    where H: Fn(u64, u64, &SyncPeer) + Send + Sync + 'static {
17✔
107
        self.hooks.add_on_progress_header_hook(hook);
17✔
108
    }
17✔
109

110
    pub fn on_rewind<H>(&mut self, hook: H)
17✔
111
    where H: Fn(Vec<Arc<ChainBlock>>) + Send + Sync + 'static {
17✔
112
        self.hooks.add_on_rewind_hook(hook);
17✔
113
    }
17✔
114

115
    pub async fn synchronize(&mut self) -> Result<(SyncPeer, AttemptSyncResult), BlockHeaderSyncError> {
17✔
116
        debug!(target: LOG_TARGET, "Starting header sync.",);
17✔
117

118
        info!(
17✔
119
            target: LOG_TARGET,
×
120
            "Synchronizing headers ({} candidate peers selected)",
121
            self.sync_peers.len()
×
122
        );
123

124
        // Ensure every sync-candidate peer has a Strong PeerConnection attached for the
125
        // duration of header sync. The Strong handle bumps the per-connection strong counter,
126
        // signalling to background reapers (ConnectivityManager n-closest pruning,
127
        // inactive-connection reaping, DhtConnectivity random-pool pruning) that they must not
128
        // disconnect the peer. If a stored connection is already attached (e.g. from a prior
129
        // sync stage that downgraded it to Weak), upgrade in place rather than redialling.
130
        for peer in self.sync_peers.iter_mut() {
17✔
131
            if let Some(conn) = peer.connection() &&
17✔
132
                !conn.is_connected()
×
133
            {
×
134
                peer.clear_connection();
×
135
            }
17✔
136
            if peer.ensure_strong_connection() {
17✔
137
                continue;
×
138
            }
17✔
139
            match self
17✔
140
                .connectivity
17✔
141
                .dial_peer(peer.node_id().clone(), RefKind::Strong)
17✔
142
                .await
17✔
143
            {
144
                Ok(conn) => peer.set_connection(conn),
13✔
145
                Err(e) => debug!(
4✔
146
                    target: LOG_TARGET,
×
147
                    "Failed to dial sync peer {} as strong: {e}", peer.node_id()
×
148
                ),
149
            }
150
        }
151

152
        let result = self.synchronize_inner().await;
17✔
153

154
        // Sync is done (success or failure): downgrade every Strong handle to Weak so reapers
155
        // may reclaim the connection if needed. The connections remain attached on the
156
        // SyncPeers, so the next sync stage can upgrade them back in place without redialling.
157
        for peer in self.sync_peers.iter_mut() {
17✔
158
            peer.downgrade_connection();
12✔
159
        }
12✔
160

161
        result
17✔
162
    }
17✔
163

164
    async fn synchronize_inner(&mut self) -> Result<(SyncPeer, AttemptSyncResult), BlockHeaderSyncError> {
17✔
165
        let mut max_latency = self.config.initial_max_sync_latency;
17✔
166
        let mut latency_increases_counter = 0;
17✔
167
        loop {
168
            match self.try_sync_from_all_peers(max_latency).await {
17✔
169
                Ok((peer, sync_result)) => break Ok((peer, sync_result)),
12✔
170
                Err(err @ BlockHeaderSyncError::AllSyncPeersExceedLatency) => {
×
171
                    // If we have few sync peers, throw this out to be retried later
172
                    if self.sync_peers.len() < 2 {
×
173
                        return Err(err);
×
174
                    }
×
175
                    max_latency += self.config.max_latency_increase;
×
176
                    latency_increases_counter += 1;
×
177
                    if latency_increases_counter > MAX_LATENCY_INCREASES {
×
178
                        return Err(err);
×
179
                    }
×
180
                },
181
                Err(err) => break Err(err),
5✔
182
            }
183
        }
184
    }
17✔
185

186
    #[allow(clippy::too_many_lines)]
187
    pub async fn try_sync_from_all_peers(
17✔
188
        &mut self,
17✔
189
        max_latency: Duration,
17✔
190
    ) -> Result<(SyncPeer, AttemptSyncResult), BlockHeaderSyncError> {
17✔
191
        let sync_peer_node_ids = self.sync_peers.iter().map(|p| p.node_id()).cloned().collect::<Vec<_>>();
17✔
192
        info!(
17✔
193
            target: LOG_TARGET,
×
194
            "Attempting to sync headers ({} sync peers)",
195
            sync_peer_node_ids.len()
×
196
        );
197
        let mut latency_counter = 0usize;
17✔
198
        for node_id in sync_peer_node_ids {
17✔
199
            match self.connect_and_attempt_sync(&node_id, max_latency).await {
17✔
200
                Ok((peer, sync_result)) => return Ok((peer, sync_result)),
12✔
201
                Err(err) => {
5✔
202
                    let ban_reason = BlockHeaderSyncError::get_ban_reason(&err);
5✔
203
                    if let Some(reason) = ban_reason {
5✔
204
                        warn!(target: LOG_TARGET, "{err}");
1✔
205
                        let duration = match reason.ban_duration {
1✔
206
                            BanPeriod::Short => self.config.short_ban_period,
1✔
207
                            BanPeriod::Long => self.config.ban_period,
×
208
                        };
209
                        self.peer_ban_manager
1✔
210
                            .ban_peer_if_required(&node_id, reason.reason, duration)
1✔
211
                            .await;
1✔
212
                    }
4✔
213
                    if let BlockHeaderSyncError::MaxLatencyExceeded { .. } = err {
5✔
214
                        latency_counter += 1;
×
215
                    } else {
5✔
216
                        self.remove_sync_peer(&node_id);
5✔
217
                    }
5✔
218
                },
219
            }
220
        }
221

222
        if self.sync_peers.is_empty() {
5✔
223
            Err(BlockHeaderSyncError::NoMoreSyncPeers("Header sync failed".to_string()))
5✔
224
        } else if latency_counter >= self.sync_peers.len() {
×
225
            Err(BlockHeaderSyncError::AllSyncPeersExceedLatency)
×
226
        } else {
227
            Err(BlockHeaderSyncError::SyncFailedAllPeers)
×
228
        }
229
    }
17✔
230

231
    async fn connect_and_attempt_sync(
17✔
232
        &mut self,
17✔
233
        node_id: &NodeId,
17✔
234
        max_latency: Duration,
17✔
235
    ) -> Result<(SyncPeer, AttemptSyncResult), BlockHeaderSyncError> {
17✔
236
        let peer_index = self
17✔
237
            .get_sync_peer_index(node_id)
17✔
238
            .ok_or(BlockHeaderSyncError::PeerNotFound)?;
17✔
239
        let sync_peer = self.sync_peers.get(peer_index).expect("Already checked");
17✔
240
        self.hooks.call_on_starting_hook(sync_peer);
17✔
241

242
        // Prefer the connection pre-dialled (as Strong) in `synchronize()` and stashed on the
243
        // SyncPeer. Hand the per-attempt code a Weak clone — the strong handle living on the
244
        // SyncPeer keeps the connection pinned for the full sync, while dropping this clone
245
        // when the attempt ends has no effect on the counter.
246
        let mut conn = match sync_peer.connection() {
17✔
247
            Some(stored) if stored.is_connected() => stored.clone_weak(),
13✔
248
            _ => self.dial_sync_peer(node_id).await?,
4✔
249
        };
250
        debug!(
13✔
251
            target: LOG_TARGET,
×
252
            "Attempting to synchronize headers with `{node_id}`"
253
        );
254
        // Defensive: the connection may have been torn down by another subsystem between
255
        // dial returning and this point (e.g. DhtConnectivity pruning). This is not the peer's
256
        // fault, so use NotInSync (no ban) to trigger a skip-and-retry with the next peer.
257
        if !conn.is_connected() {
13✔
258
            warn!(
×
259
                target: LOG_TARGET,
×
260
                "Sync peer `{node_id}` was disconnected before RPC negotiation could begin"
261
            );
262
            return Err(BlockHeaderSyncError::NotInSync);
×
263
        }
13✔
264

265
        let config = RpcClient::builder()
13✔
266
            .with_deadline(self.config.rpc_deadline)
13✔
267
            .with_deadline_grace_period(Duration::from_secs(5));
13✔
268
        // Bound RPC negotiation so a stuck negotiation cannot wedge the sync loop.
269
        let mut client = tokio::time::timeout(
13✔
270
            self.config.rpc_deadline,
13✔
271
            conn.connect_rpc_using_builder::<rpc::BaseNodeSyncRpcClient>(config),
13✔
272
        )
13✔
273
        .await
13✔
274
        .map_err(|_| BlockHeaderSyncError::RpcError(RpcError::ReplyTimeout))??;
13✔
275

276
        let latency = client
13✔
277
            .get_last_request_latency()
13✔
278
            .expect("unreachable panic: last request latency must be set after connect");
13✔
279
        self.sync_peers
13✔
280
            .get_mut(peer_index)
13✔
281
            .ok_or(BlockHeaderSyncError::PeerNotFound)?
13✔
282
            .set_latency(latency);
13✔
283
        if latency > max_latency {
13✔
284
            return Err(BlockHeaderSyncError::MaxLatencyExceeded {
×
285
                peer: conn.peer_node_id().clone(),
×
286
                latency,
×
287
                max_latency,
×
288
            });
×
289
        }
13✔
290

291
        debug!(target: LOG_TARGET, "Sync peer latency is {latency:.2?}");
13✔
292
        let sync_peer = self
13✔
293
            .sync_peers
13✔
294
            .get(peer_index)
13✔
295
            .ok_or(BlockHeaderSyncError::PeerNotFound)?
13✔
296
            .clone();
13✔
297
        let sync_result = self.attempt_sync(&sync_peer, client, max_latency).await?;
13✔
298
        Ok((sync_peer, sync_result))
12✔
299
    }
17✔
300

301
    async fn dial_sync_peer(&self, node_id: &NodeId) -> Result<PeerConnection, BlockHeaderSyncError> {
4✔
302
        let timer = Instant::now();
4✔
303
        debug!(target: LOG_TARGET, "Dialing {node_id} sync peer");
4✔
304
        // `synchronize()` holds Strong references to every sync peer for the duration of this
305
        // sync, so the per-attempt redial can be Weak — the strong handles in the outer guard
306
        // list keep the connection pinned.
307
        let conn = self.connectivity.dial_peer(node_id.clone(), RefKind::Weak).await?;
4✔
308
        info!(
×
309
            target: LOG_TARGET,
×
310
            "Successfully dialed sync peer {} in {:.2?}",
311
            node_id,
312
            timer.elapsed()
×
313
        );
314
        Ok(conn)
×
315
    }
4✔
316

317
    async fn attempt_sync(
13✔
318
        &mut self,
13✔
319
        sync_peer: &SyncPeer,
13✔
320
        mut client: rpc::BaseNodeSyncRpcClient,
13✔
321
        max_latency: Duration,
13✔
322
    ) -> Result<AttemptSyncResult, BlockHeaderSyncError> {
13✔
323
        let latency = client.get_last_request_latency();
13✔
324
        debug!(
13✔
325
            target: LOG_TARGET,
×
326
            "Initiating header sync with peer `{}` (sync latency = {}ms)",
327
            sync_peer.node_id(),
×
328
            latency.unwrap_or_default().as_millis()
×
329
        );
330

331
        // Fetch best local data at the beginning of the sync process
332
        let best_block_metadata = self.db.get_chain_metadata().await?;
13✔
333
        let best_header = self.db.fetch_last_chain_header().await?;
13✔
334
        let best_block_header = self
13✔
335
            .db
13✔
336
            .fetch_chain_header(best_block_metadata.best_block_height())
13✔
337
            .await?;
13✔
338
        let best_header_height = best_header.height();
13✔
339
        let best_block_height = best_block_header.height();
13✔
340

341
        if best_header_height < best_block_height || best_block_height < self.local_cached_metadata.best_block_height()
13✔
342
        {
343
            return Err(BlockHeaderSyncError::ChainStorageError(
×
344
                ChainStorageError::CorruptedDatabase("Inconsistent block and header data".to_string()),
×
345
            ));
×
346
        }
13✔
347

348
        // - At this point we may have more (InSyncOrAhead), equal (InSyncOrAhead), or less headers (Lagging) than the
349
        //   peer, but they claimed better POW before we attempted sync.
350
        // - This method will return ban-able errors for certain offenses.
351
        let (header_sync_status, peer_response) = self
13✔
352
            .determine_sync_status(
13✔
353
                sync_peer,
13✔
354
                best_header.clone(),
13✔
355
                best_block_header.clone(),
13✔
356
                self.config.max_reorg_depth_allowed,
13✔
357
                &mut client,
13✔
358
            )
13✔
359
            .await?;
13✔
360

361
        match header_sync_status.clone() {
12✔
362
            HeaderSyncStatus::InSyncOrAhead => {
363
                debug!(
3✔
364
                    target: LOG_TARGET,
×
365
                    "Headers are in sync at height {best_header_height} but tip is {best_block_height}. Proceeding to archival/pruned block sync"
366
                );
367

368
                Ok(AttemptSyncResult {
3✔
369
                    headers_returned: peer_response.peer_headers.len() as u64,
3✔
370
                    peer_fork_hash_index: peer_response.peer_fork_hash_index,
3✔
371
                    header_sync_status,
3✔
372
                })
3✔
373
            },
374
            HeaderSyncStatus::Lagging(split_info) => {
9✔
375
                self.hooks.call_on_progress_header_hooks(
9✔
376
                    split_info
9✔
377
                        .best_block_header
9✔
378
                        .height()
9✔
379
                        .saturating_sub(split_info.reorg_steps_back),
9✔
380
                    sync_peer.claimed_chain_metadata().best_block_height(),
9✔
381
                    sync_peer,
9✔
382
                );
383
                self.synchronize_headers(sync_peer.clone(), &mut client, *split_info, max_latency)
9✔
384
                    .await?;
9✔
385
                Ok(AttemptSyncResult {
9✔
386
                    headers_returned: peer_response.peer_headers.len() as u64,
9✔
387
                    peer_fork_hash_index: peer_response.peer_fork_hash_index,
9✔
388
                    header_sync_status,
9✔
389
                })
9✔
390
            },
391
        }
392
    }
13✔
393

394
    #[allow(clippy::too_many_lines)]
395
    async fn find_chain_split(
13✔
396
        &mut self,
13✔
397
        peer_node_id: &NodeId,
13✔
398
        max_reorg_depth_allowed: usize,
13✔
399
        client: &mut rpc::BaseNodeSyncRpcClient,
13✔
400
        header_count: u64,
13✔
401
    ) -> Result<FindChainSplitResult, BlockHeaderSyncError> {
13✔
402
        const NUM_CHAIN_SPLIT_HEADERS: usize = 500;
403
        // Limit how far back we're willing to go. A peer might just say it does not have a chain split
404
        // and keep us busy going back until the genesis.
405
        // 20 x 500 = max 10,000 block split can be detected. The 10_000 limit is default, but can be overridden
406
        let max_chain_split_iters = max_reorg_depth_allowed.saturating_div(NUM_CHAIN_SPLIT_HEADERS);
13✔
407

408
        let mut offset = 0;
13✔
409
        let mut iter_count = 0;
13✔
410
        loop {
411
            iter_count += 1;
13✔
412
            if iter_count > max_chain_split_iters {
13✔
413
                warn!(
×
414
                    target: LOG_TARGET,
×
415
                    "Peer `{}` did not provide a chain split after {} headers requested. Peer will be banned.",
416
                    peer_node_id,
417
                    NUM_CHAIN_SPLIT_HEADERS * max_chain_split_iters,
×
418
                );
419
                return Err(BlockHeaderSyncError::ChainSplitNotFound(peer_node_id.clone()));
×
420
            }
13✔
421

422
            let block_hashes = self
13✔
423
                .db
13✔
424
                .fetch_block_hashes_from_header_tip(NUM_CHAIN_SPLIT_HEADERS, offset)
13✔
425
                .await?;
13✔
426
            debug!(
13✔
427
                target: LOG_TARGET,
×
428
                "Determining if chain splits between {} and {} headers back from the tip (peer: `{}`, {} hashes sent)",
429
                offset,
430
                offset + NUM_CHAIN_SPLIT_HEADERS,
×
431
                peer_node_id,
432
                block_hashes.len()
×
433
            );
434

435
            // No further hashes to send.
436
            if block_hashes.is_empty() {
13✔
437
                warn!(
×
438
                    target: LOG_TARGET,
×
439
                    "Peer `{}` did not provide a chain split after {} headers requested. Peer will be banned.",
440
                    peer_node_id,
441
                    NUM_CHAIN_SPLIT_HEADERS * max_chain_split_iters,
×
442
                );
443
                return Err(BlockHeaderSyncError::ChainSplitNotFound(peer_node_id.clone()));
×
444
            }
13✔
445

446
            let request = FindChainSplitRequest {
13✔
447
                block_hashes: block_hashes.clone().iter().map(|v| v.to_vec()).collect(),
56✔
448
                header_count,
13✔
449
            };
450

451
            let resp = match client.find_chain_split(request).await {
13✔
452
                Ok(r) => r,
13✔
453
                Err(RpcError::RequestFailed(err)) if err.as_status_code().is_not_found() => {
×
454
                    // This round we sent less hashes than the max, so the next round will not have any more hashes to
455
                    // send. Exit early in this case.
456
                    if block_hashes.len() < NUM_CHAIN_SPLIT_HEADERS {
×
457
                        warn!(
×
458
                            target: LOG_TARGET,
×
459
                            "Peer `{}` did not provide a chain split after {} headers requested. Peer will be banned.",
460
                            peer_node_id,
461
                            NUM_CHAIN_SPLIT_HEADERS * max_chain_split_iters,
×
462
                        );
463
                        return Err(BlockHeaderSyncError::ChainSplitNotFound(peer_node_id.clone()));
×
464
                    }
×
465
                    // Chain split not found, let's go further back
466
                    offset = NUM_CHAIN_SPLIT_HEADERS * iter_count;
×
467
                    continue;
×
468
                },
469
                Err(err) => {
×
470
                    return Err(err.into());
×
471
                },
472
            };
473
            if resp.headers.len() > HEADER_SYNC_INITIAL_MAX_HEADERS {
13✔
474
                warn!(
×
475
                    target: LOG_TARGET,
×
476
                    "Peer `{}` sent too many headers {}, only requested {}. Peer will be banned.",
477
                    peer_node_id,
478
                    resp.headers.len(),
×
479
                    HEADER_SYNC_INITIAL_MAX_HEADERS,
480
                );
481
                return Err(BlockHeaderSyncError::PeerSentTooManyHeaders(resp.headers.len()));
×
482
            }
13✔
483
            if resp.fork_hash_index >= block_hashes.len() as u64 {
13✔
484
                warn!(
×
485
                    target: LOG_TARGET,
×
486
                    "Peer `{}` sent hash index {} out of range {}. Peer will be banned.",
487
                    peer_node_id,
488
                    resp.fork_hash_index,
489
                    block_hashes.len(),
×
490
                );
491
                return Err(BlockHeaderSyncError::FoundHashIndexOutOfRange(
×
492
                    block_hashes.len() as u64,
×
493
                    resp.fork_hash_index,
×
494
                ));
×
495
            }
13✔
496
            #[allow(clippy::cast_possible_truncation)]
497
            if !resp.headers.is_empty() &&
13✔
498
                *resp.headers.first().expect("Already checked").prev_hash !=
9✔
499
                    *block_hashes
9✔
500
                        .get(resp.fork_hash_index as usize)
9✔
501
                        .expect("Already checked")
9✔
502
            {
503
                warn!(
×
504
                    target: LOG_TARGET,
×
505
                    "Peer `{}` sent hash an invalid protocol response, incorrect fork hash index {}. Peer will be banned.",
506
                    peer_node_id,
507
                    resp.fork_hash_index,
508
                );
509
                return Err(BlockHeaderSyncError::InvalidProtocolResponse(
×
510
                    "Peer sent incorrect fork hash index".into(),
×
511
                ));
×
512
            }
13✔
513
            #[allow(clippy::cast_possible_truncation)]
514
            let chain_split_hash = *block_hashes
13✔
515
                .get(resp.fork_hash_index as usize)
13✔
516
                .expect("Already checked");
13✔
517

518
            return Ok(FindChainSplitResult {
13✔
519
                reorg_steps_back: resp.fork_hash_index.saturating_add(offset as u64),
13✔
520
                peer_headers: resp.headers,
13✔
521
                peer_fork_hash_index: resp.fork_hash_index,
13✔
522
                chain_split_hash,
13✔
523
            });
13✔
524
        }
525
    }
13✔
526

527
    /// Attempt to determine the point at which the remote and local chain diverge, returning the relevant information
528
    /// of the chain split (see [HeaderSyncStatus]).
529
    ///
530
    /// If the local node is behind the remote chain (i.e. `HeaderSyncStatus::Lagging`), the appropriate
531
    /// `ChainSplitInfo` is returned, the header validator is initialized and the preliminary headers are validated.
532
    async fn determine_sync_status(
13✔
533
        &mut self,
13✔
534
        sync_peer: &SyncPeer,
13✔
535
        best_header: ChainHeader,
13✔
536
        best_block_header: ChainHeader,
13✔
537
        max_reorg_depth_allowed: usize,
13✔
538
        client: &mut rpc::BaseNodeSyncRpcClient,
13✔
539
    ) -> Result<(HeaderSyncStatus, FindChainSplitResult), BlockHeaderSyncError> {
13✔
540
        // This method will return ban-able errors for certain offenses.
541
        let chain_split_result = self
13✔
542
            .find_chain_split(
13✔
543
                sync_peer.node_id(),
13✔
544
                max_reorg_depth_allowed,
13✔
545
                client,
13✔
546
                HEADER_SYNC_INITIAL_MAX_HEADERS as u64,
13✔
547
            )
13✔
548
            .await?;
13✔
549
        if chain_split_result.reorg_steps_back > 0 {
13✔
550
            debug!(
4✔
551
                target: LOG_TARGET,
×
552
                "Found chain split {} blocks back, received {} headers from peer `{}`",
553
                chain_split_result.reorg_steps_back,
554
                chain_split_result.peer_headers.len(),
×
555
                sync_peer
556
            );
557
        }
9✔
558

559
        // If the peer returned no new headers, they may still have more blocks than we have, thus have a higher
560
        // accumulated difficulty.
561
        if chain_split_result.peer_headers.is_empty() {
13✔
562
            // Our POW is less than the peer's POW, as verified before the attempted header sync, therefore, if the
563
            // peer did not supply any headers and we know we are behind based on the peer's claimed metadata, then
564
            // we can ban the peer.
565
            if best_header.height() == best_block_header.height() {
4✔
566
                warn!(
1✔
567
                    target: LOG_TARGET,
×
568
                    "Peer `{}` did not provide any headers although they have a better chain and more headers: their \
569
                    difficulty: {}, our difficulty: {}. Peer will be banned.",
570
                    sync_peer.node_id(),
×
571
                    sync_peer.claimed_chain_metadata().accumulated_difficulty(),
×
572
                    best_block_header.accumulated_data().total_accumulated_difficulty,
×
573
                );
574
                return Err(BlockHeaderSyncError::PeerSentInaccurateChainMetadata {
1✔
575
                    claimed: sync_peer.claimed_chain_metadata().accumulated_difficulty(),
1✔
576
                    actual: None,
1✔
577
                    local: best_block_header.accumulated_data().total_accumulated_difficulty,
1✔
578
                });
1✔
579
            }
3✔
580
            debug!(target: LOG_TARGET, "Peer `{}` sent no headers; headers already in sync with peer.", sync_peer.node_id());
3✔
581
            return Ok((HeaderSyncStatus::InSyncOrAhead, chain_split_result));
3✔
582
        }
9✔
583

584
        let headers = chain_split_result
9✔
585
            .peer_headers
9✔
586
            .clone()
9✔
587
            .into_iter()
9✔
588
            .map(BlockHeader::try_from)
9✔
589
            .collect::<Result<Vec<_>, _>>()
9✔
590
            .map_err(BlockHeaderSyncError::ReceivedInvalidHeader)?;
9✔
591
        let num_new_headers = headers.len();
9✔
592
        // Do a cheap check to verify that we do not have these series of headers in the db already - if the 1st one is
593
        // not there most probably the rest are not either - the peer could still have returned old headers later on in
594
        // the list
595
        if self
9✔
596
            .db
9✔
597
            .fetch_header_by_block_hash(headers.first().expect("Already checked").hash())
9✔
598
            .await?
9✔
599
            .is_some()
9✔
600
        {
601
            return Err(BlockHeaderSyncError::ReceivedInvalidHeader(
×
602
                "Header already in database".to_string(),
×
603
            ));
×
604
        };
9✔
605

606
        self.header_validator
9✔
607
            .initialize_state(&chain_split_result.chain_split_hash)
9✔
608
            .await?;
9✔
609
        for header in headers {
32✔
610
            debug!(
32✔
611
                target: LOG_TARGET,
×
612
                "Validating header #{} (Pow: {}) with hash: ({})",
613
                header.height,
614
                header.pow_algo(),
×
615
                header.hash().to_hex(),
×
616
            );
617
            self.header_validator.validate(header).await?;
32✔
618
        }
619

620
        debug!(
9✔
621
            target: LOG_TARGET,
×
622
            "Peer `{}` has submitted {} valid header(s)", sync_peer.node_id(), num_new_headers
×
623
        );
624

625
        let chain_split_info = ChainSplitInfo {
9✔
626
            best_block_header,
9✔
627
            reorg_steps_back: chain_split_result.reorg_steps_back,
9✔
628
            chain_split_hash: chain_split_result.chain_split_hash,
9✔
629
        };
9✔
630
        Ok((
9✔
631
            HeaderSyncStatus::Lagging(Box::new(chain_split_info)),
9✔
632
            chain_split_result,
9✔
633
        ))
9✔
634
    }
13✔
635

636
    async fn rewind_blockchain(&self, split_hash: HashOutput) -> Result<Vec<Arc<ChainBlock>>, BlockHeaderSyncError> {
1✔
637
        debug!(
1✔
638
            target: LOG_TARGET,
×
639
            "Deleting headers that no longer form part of the main chain up until split at {}",
640
            split_hash.to_hex()
×
641
        );
642

643
        let blocks = self.db.rewind_to_hash(split_hash).await?;
1✔
644
        debug!(
1✔
645
            target: LOG_TARGET,
×
646
            "Rewound {} block(s) in preparation for header sync",
647
            blocks.len()
×
648
        );
649
        Ok(blocks)
1✔
650
    }
1✔
651

652
    #[allow(clippy::too_many_lines)]
653
    async fn synchronize_headers(
9✔
654
        &mut self,
9✔
655
        mut sync_peer: SyncPeer,
9✔
656
        client: &mut rpc::BaseNodeSyncRpcClient,
9✔
657
        split_info: ChainSplitInfo,
9✔
658
        max_latency: Duration,
9✔
659
    ) -> Result<(), BlockHeaderSyncError> {
9✔
660
        info!(target: LOG_TARGET, "Starting header sync from peer {sync_peer}");
9✔
661
        const COMMIT_EVERY_N_HEADERS: usize = 1000;
662

663
        let mut has_switched_to_new_chain = false;
9✔
664
        let pending_len = self.header_validator.valid_headers().len();
9✔
665

666
        // Find the hash to start syncing the rest of the headers.
667
        // The expectation cannot fail because there has been at least one valid header returned (checked in
668
        // determine_sync_status)
669
        let (start_header_height, start_header_hash, total_accumulated_difficulty) = self
9✔
670
            .header_validator
9✔
671
            .current_valid_chain_tip_header()
9✔
672
            .map(|h| (h.height(), *h.hash(), h.accumulated_data().total_accumulated_difficulty))
9✔
673
            .expect("synchronize_headers: expected there to be a valid tip header but it was None");
9✔
674

675
        // If we already have a stronger chain at this point, switch over to it.
676
        // just in case we happen to be exactly HEADER_SYNC_INITIAL_MAX_HEADERS headers behind.
677
        let has_better_pow = self.pending_chain_has_higher_pow(&split_info.best_block_header);
9✔
678

679
        if has_better_pow {
9✔
680
            debug!(
9✔
681
                target: LOG_TARGET,
×
682
                "Remote chain from peer {} has higher PoW. Switching",
683
                sync_peer.node_id()
×
684
            );
685
            self.switch_to_pending_chain(&split_info).await?;
9✔
686
            has_switched_to_new_chain = true;
9✔
687
        }
×
688

689
        if pending_len < HEADER_SYNC_INITIAL_MAX_HEADERS {
9✔
690
            // Peer returned less than the max number of requested headers. This indicates that we have all the
691
            // available headers from the peer.
692
            if !has_better_pow {
9✔
693
                // Because the pow is less or equal than the current chain the peer had to have lied about their pow
694
                debug!(target: LOG_TARGET, "No further headers to download");
×
695
                return Err(BlockHeaderSyncError::PeerSentInaccurateChainMetadata {
×
696
                    claimed: sync_peer.claimed_chain_metadata().accumulated_difficulty(),
×
697
                    actual: Some(total_accumulated_difficulty),
×
698
                    local: split_info
×
699
                        .best_block_header
×
700
                        .accumulated_data()
×
701
                        .total_accumulated_difficulty,
×
702
                });
×
703
            }
9✔
704
            // The pow is higher, we swapped to the higher chain, we have all the better chain headers, we can move on
705
            // to block sync.
706
            return Ok(());
9✔
707
        }
×
708

709
        debug!(
×
710
            target: LOG_TARGET,
×
711
            "Download remaining headers starting from header #{} from peer `{}`",
712
            start_header_height,
713
            sync_peer.node_id()
×
714
        );
715
        let request = SyncHeadersRequest {
×
716
            start_hash: start_header_hash.to_vec(),
×
717
            // To the tip!
×
718
            count: 0,
×
719
        };
×
720

721
        let mut header_stream = client.sync_headers(request).await?;
×
722
        debug!(
×
723
            target: LOG_TARGET,
×
724
            "Reading headers from peer `{}`",
725
            sync_peer.node_id()
×
726
        );
727

728
        let mut last_sync_timer = Instant::now();
×
729

730
        let mut last_total_accumulated_difficulty = U512::zero();
×
731
        let mut avg_latency = RollingAverageTime::new(20);
×
732
        let mut prev_height: Option<u64> = None;
×
733
        while let Some(header) = header_stream.next().await {
×
734
            let latency = last_sync_timer.elapsed();
×
735
            avg_latency.add_sample(latency);
×
736
            let header = BlockHeader::try_from(header?).map_err(BlockHeaderSyncError::ReceivedInvalidHeader)?;
×
737
            debug!(
×
738
                target: LOG_TARGET,
×
739
                "Validating header #{} (Pow: {}) with hash: ({}). Latency: {:.2?}",
740
                header.height,
741
                header.pow_algo(),
×
742
                header.hash().to_hex(),
×
743
                latency
744
            );
745
            trace!(
×
746
                target: LOG_TARGET,
×
747
                "{header}"
748
            );
749
            if let Some(prev_header_height) = prev_height &&
×
750
                header.height != prev_header_height.saturating_add(1)
×
751
            {
752
                warn!(
×
753
                    target: LOG_TARGET,
×
754
                    "Received header #{} `{}` does not follow previous header",
755
                    header.height,
756
                    header.hash().to_hex()
×
757
                );
758
                return Err(BlockHeaderSyncError::ReceivedInvalidHeader(
×
759
                    "Header does not follow previous header".to_string(),
×
760
                ));
×
761
            }
×
762
            let existing_header = self.db.fetch_header_by_block_hash(header.hash()).await?;
×
763
            if let Some(h) = existing_header {
×
764
                warn!(
×
765
                    target: LOG_TARGET,
×
766
                    "Received header #{} `{}` that we already have.",
767
                    h.height,
768
                    h.hash().to_hex()
×
769
                );
770
                return Err(BlockHeaderSyncError::ReceivedInvalidHeader(
×
771
                    "Header already in database".to_string(),
×
772
                ));
×
773
            }
×
774
            let current_height = header.height;
×
775
            last_total_accumulated_difficulty = self.header_validator.validate(header).await?;
×
776

777
            if has_switched_to_new_chain {
×
778
                // If we've switched to the new chain, we simply commit every COMMIT_EVERY_N_HEADERS headers
779
                if self.header_validator.valid_headers().len() >= COMMIT_EVERY_N_HEADERS {
×
780
                    self.commit_pending_headers().await?;
×
781
                }
×
782
            } else {
783
                // The remote chain has not (yet) been accepted.
784
                // We check the tip difficulties, switching over to the new chain if a higher accumulated difficulty is
785
                // achieved.
786
                if self.pending_chain_has_higher_pow(&split_info.best_block_header) {
×
787
                    self.switch_to_pending_chain(&split_info).await?;
×
788
                    has_switched_to_new_chain = true;
×
789
                }
×
790
            }
791

792
            sync_peer.set_latency(latency);
×
793
            sync_peer.add_sample(last_sync_timer.elapsed());
×
794
            self.hooks.call_on_progress_header_hooks(
×
795
                current_height,
×
796
                sync_peer.claimed_chain_metadata().best_block_height(),
×
797
                &sync_peer,
×
798
            );
799

800
            let last_avg_latency = avg_latency.calculate_average_with_min_samples(5);
×
801
            if let Some(avg_latency) = last_avg_latency &&
×
802
                avg_latency > max_latency
×
803
            {
804
                return Err(BlockHeaderSyncError::MaxLatencyExceeded {
×
805
                    peer: sync_peer.node_id().clone(),
×
806
                    latency: avg_latency,
×
807
                    max_latency,
×
808
                });
×
809
            }
×
810

811
            last_sync_timer = Instant::now();
×
812
            prev_height = Some(current_height);
×
813
        }
814

815
        let claimed_total_accumulated_diff = sync_peer.claimed_chain_metadata().accumulated_difficulty();
×
816
        if !has_switched_to_new_chain {
×
817
            let best_local_before_sync = split_info
×
818
                .best_block_header
×
819
                .accumulated_data()
×
820
                .total_accumulated_difficulty;
×
821
            match self
×
822
                .header_validator
×
823
                .current_valid_chain_tip_header()
×
824
                .map(|h| h.accumulated_data().total_accumulated_difficulty)
×
825
            {
826
                Some(validated_total_accumulated_diff) => {
×
827
                    if claimed_total_accumulated_diff > validated_total_accumulated_diff {
×
828
                        // Over-claim: peer advertised more PoW than their headers actually provide.
829
                        return Err(BlockHeaderSyncError::PeerSentInaccurateChainMetadata {
×
830
                            claimed: claimed_total_accumulated_diff,
×
831
                            actual: Some(validated_total_accumulated_diff),
×
832
                            local: best_local_before_sync,
×
833
                        });
×
834
                    } else if self.pending_chain_has_higher_pow(&split_info.best_block_header) {
×
835
                        self.switch_to_pending_chain(&split_info).await?;
×
836
                        has_switched_to_new_chain = true;
×
837
                        info!(
×
838
                            target: LOG_TARGET,
×
839
                            "Received PoW from peer exceeds local tip. Before sync: {}, received: {}. Committed.",
840
                            best_local_before_sync,
841
                            validated_total_accumulated_diff,
842
                        );
843
                    } else {
844
                        // We have a stronger chain, so we do not commit the headers.
845
                        debug!(
×
846
                            target: LOG_TARGET,
×
847
                            "Not committing headers as we have a stronger chain, ours: {} theirs: {}",
848
                            best_local_before_sync,
849
                            validated_total_accumulated_diff,
850
                        );
851
                    };
852
                },
853
                None => {
854
                    // No validated headers at this stage, but there should have been.
855
                    return Err(BlockHeaderSyncError::PeerSentInaccurateChainMetadata {
×
856
                        claimed: claimed_total_accumulated_diff,
×
857
                        actual: None,
×
858
                        local: best_local_before_sync,
×
859
                    });
×
860
                },
861
            }
862
        }
×
863

864
        // Commit the last blocks only if we have switched to the new chain.
865
        if has_switched_to_new_chain && !self.header_validator.valid_headers().is_empty() {
×
866
            self.commit_pending_headers().await?;
×
867
        }
×
868

869
        // This rule is strict: if the peer advertised a higher PoW than they were able to provide (without
870
        // some other external factor like a disconnect etc), we detect the and ban the peer.
871
        if last_total_accumulated_difficulty < claimed_total_accumulated_diff {
×
872
            return Err(BlockHeaderSyncError::PeerSentInaccurateChainMetadata {
×
873
                claimed: claimed_total_accumulated_diff,
×
874
                actual: Some(last_total_accumulated_difficulty),
×
875
                local: split_info
×
876
                    .best_block_header
×
877
                    .accumulated_data()
×
878
                    .total_accumulated_difficulty,
×
879
            });
×
880
        }
×
881

882
        Ok(())
×
883
    }
9✔
884

885
    async fn commit_pending_headers(&mut self) -> Result<ChainHeader, BlockHeaderSyncError> {
9✔
886
        let chain_headers = self.header_validator.take_valid_headers();
9✔
887
        let num_headers = chain_headers.len();
9✔
888
        let start = Instant::now();
9✔
889

890
        let new_tip = chain_headers.last().cloned().unwrap();
9✔
891
        let mut txn = self.db.write_transaction();
9✔
892
        chain_headers.into_iter().for_each(|chain_header| {
32✔
893
            txn.insert_chain_header(chain_header);
32✔
894
        });
32✔
895

896
        txn.commit().await?;
9✔
897

898
        debug!(
9✔
899
            target: LOG_TARGET,
×
900
            "{} header(s) committed (tip = {}) to the blockchain db in {:.2?}",
901
            num_headers,
902
            new_tip.height(),
×
903
            start.elapsed()
×
904
        );
905

906
        Ok(new_tip)
9✔
907
    }
9✔
908

909
    fn pending_chain_has_higher_pow(&self, current_tip: &ChainHeader) -> bool {
9✔
910
        let chain_headers = self.header_validator.valid_headers();
9✔
911
        if chain_headers.is_empty() {
9✔
912
            return false;
×
913
        }
9✔
914

915
        // Check that the remote tip is stronger than the local tip, equal should not have ended up here, so we treat
916
        // equal as less
917
        let proposed_tip = chain_headers.last().unwrap();
9✔
918
        self.header_validator.compare_chains(current_tip, proposed_tip).is_lt()
9✔
919
    }
9✔
920

921
    async fn switch_to_pending_chain(&mut self, split_info: &ChainSplitInfo) -> Result<(), BlockHeaderSyncError> {
9✔
922
        // Reorg if required
923
        if split_info.reorg_steps_back > 0 {
9✔
924
            debug!(
1✔
925
                target: LOG_TARGET,
×
926
                "Reorg: Rewinding the chain by {} block(s) (split hash = {})",
927
                split_info.reorg_steps_back,
928
                split_info.chain_split_hash.to_hex()
×
929
            );
930
            let blocks = self.rewind_blockchain(split_info.chain_split_hash).await?;
1✔
931
            if !blocks.is_empty() {
1✔
932
                self.hooks.call_on_rewind_hooks(blocks);
1✔
933
            }
1✔
934
        }
8✔
935

936
        // Commit the forked chain. At this point
937
        // 1. Headers have been validated
938
        // 2. The forked chain has a higher PoW than the local chain
939
        //
940
        // After this we commit headers every `n` blocks
941
        self.commit_pending_headers().await?;
9✔
942

943
        Ok(())
9✔
944
    }
9✔
945

946
    // Sync peers are also removed from the list of sync peers if the ban duration is longer than the short ban period.
947
    fn remove_sync_peer(&mut self, node_id: &NodeId) {
5✔
948
        if let Some(pos) = self.sync_peers.iter().position(|p| p.node_id() == node_id) {
5✔
949
            self.sync_peers.remove(pos);
5✔
950
        }
5✔
951
    }
5✔
952

953
    // Helper function to get the index to the node_id inside of the vec of peers
954
    fn get_sync_peer_index(&mut self, node_id: &NodeId) -> Option<usize> {
17✔
955
        self.sync_peers.iter().position(|p| p.node_id() == node_id)
17✔
956
    }
17✔
957
}
958

959
#[derive(Debug, Clone)]
960
struct FindChainSplitResult {
961
    reorg_steps_back: u64,
962
    peer_headers: Vec<ProtoBlockHeader>,
963
    peer_fork_hash_index: u64,
964
    chain_split_hash: HashOutput,
965
}
966

967
/// Information about the chain split from the remote node.
968
#[derive(Debug, Clone, PartialEq)]
969
pub struct ChainSplitInfo {
970
    /// The best block's header on the local chain.
971
    pub best_block_header: ChainHeader,
972
    /// The number of blocks to reorg back to the fork.
973
    pub reorg_steps_back: u64,
974
    /// The hash of the block at the fork.
975
    pub chain_split_hash: HashOutput,
976
}
977

978
/// The result of an attempt to synchronize headers with a peer.
979
#[derive(Debug, Clone, PartialEq)]
980
pub struct AttemptSyncResult {
981
    /// The number of headers that were returned.
982
    pub headers_returned: u64,
983
    /// The fork hash index of the remote peer.
984
    pub peer_fork_hash_index: u64,
985
    /// The header sync status.
986
    pub header_sync_status: HeaderSyncStatus,
987
}
988

989
#[derive(Debug, Clone, PartialEq)]
990
pub enum HeaderSyncStatus {
991
    /// Local and remote node are in sync or ahead
992
    InSyncOrAhead,
993
    /// Local node is lagging behind remote node
994
    Lagging(Box<ChainSplitInfo>),
995
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc