• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 20742611763

06 Jan 2026 08:28AM UTC coverage: 60.745% (-0.03%) from 60.777%
20742611763

push

github

web-flow
fix(node): add missing data to get_active_validator_nodes (#7645)

Description
---
fix(node): add missing data to get_active_validator_nodes
fix(storage/vns): key bug in get_all_vns call

Motivation and Context
---
Additional info required from base layer get active validators call.

How Has This Been Tested?
---

What process can a PR reviewer use to test or verify this change?
---

<!-- Checklist -->
<!-- 1. Is the title of your PR in the form that would make nice release
notes? The title, excluding the conventional commit
tag, will be included exactly as is in the CHANGELOG, so please think
about it carefully. -->


Breaking Changes
---

- [x] None
- [ ] Requires data directory on base node to be deleted
- [ ] Requires hard fork
- [ ] Other - Please specify

<!-- Does this include a breaking change? If so, include this line as a
footer -->
<!-- BREAKING CHANGE: Description what the user should do, e.g. delete a
database, resync the chain -->

30 of 41 new or added lines in 5 files covered. (73.17%)

61 existing lines in 15 files now uncovered.

70500 of 116059 relevant lines covered (60.74%)

299672.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

53.11
/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs
1
//  Copyright 2020, The Tari Project
2
//
3
//  Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
//  following conditions are met:
5
//
6
//  1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
//  disclaimer.
8
//
9
//  2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
//  following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
//  3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
//  products derived from this software without specific prior written permission.
14
//
15
//  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
//  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
//  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
//  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
//  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
//  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
//  USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22
use std::{
23
    sync::Arc,
24
    time::{Duration, Instant},
25
};
26

27
use futures::StreamExt;
28
use log::*;
29
use primitive_types::U512;
30
use tari_common_types::{chain_metadata::ChainMetadata, types::HashOutput};
31
use tari_comms::{
32
    connectivity::ConnectivityRequester,
33
    peer_manager::NodeId,
34
    protocol::rpc::{RpcClient, RpcError},
35
    PeerConnection,
36
};
37
use tari_node_components::blocks::{BlockHeader, ChainBlock, ChainHeader};
38
use tari_transaction_components::BanPeriod;
39
use tari_utilities::hex::Hex;
40

41
pub(crate) use super::{validator::BlockHeaderSyncValidator, BlockHeaderSyncError};
42
use crate::{
43
    base_node::sync::{
44
        ban::PeerBanManager,
45
        header_sync::HEADER_SYNC_INITIAL_MAX_HEADERS,
46
        hooks::Hooks,
47
        rpc,
48
        BlockchainSyncConfig,
49
        SyncPeer,
50
    },
51
    chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend, ChainStorageError},
52
    common::rolling_avg::RollingAverageTime,
53
    consensus::BaseNodeConsensusManager,
54
    proof_of_work::randomx_factory::RandomXFactory,
55
    proto::{
56
        base_node::{FindChainSplitRequest, SyncHeadersRequest},
57
        core::BlockHeader as ProtoBlockHeader,
58
    },
59
};
60

61
const LOG_TARGET: &str = "c::bn::header_sync";
62

63
const MAX_LATENCY_INCREASES: usize = 5;
64

65
pub struct HeaderSynchronizer<'a, B> {
66
    config: BlockchainSyncConfig,
67
    db: AsyncBlockchainDb<B>,
68
    header_validator: BlockHeaderSyncValidator<B>,
69
    connectivity: ConnectivityRequester,
70
    sync_peers: &'a mut Vec<SyncPeer>,
71
    hooks: Hooks,
72
    local_cached_metadata: &'a ChainMetadata,
73
    peer_ban_manager: PeerBanManager,
74
}
75

76
impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {
77
    pub fn new(
12✔
78
        config: BlockchainSyncConfig,
12✔
79
        db: AsyncBlockchainDb<B>,
12✔
80
        consensus_rules: BaseNodeConsensusManager,
12✔
81
        connectivity: ConnectivityRequester,
12✔
82
        sync_peers: &'a mut Vec<SyncPeer>,
12✔
83
        randomx_factory: RandomXFactory,
12✔
84
        local_metadata: &'a ChainMetadata,
12✔
85
    ) -> Self {
12✔
86
        let peer_ban_manager = PeerBanManager::new(config.clone(), connectivity.clone());
12✔
87
        Self {
12✔
88
            config,
12✔
89
            header_validator: BlockHeaderSyncValidator::new(db.clone(), consensus_rules, randomx_factory),
12✔
90
            db,
12✔
91
            connectivity,
12✔
92
            sync_peers,
12✔
93
            hooks: Default::default(),
12✔
94
            local_cached_metadata: local_metadata,
12✔
95
            peer_ban_manager,
12✔
96
        }
12✔
97
    }
12✔
98

99
    pub fn on_starting<H>(&mut self, hook: H)
12✔
100
    where for<'r> H: FnOnce(&SyncPeer) + Send + Sync + 'static {
12✔
101
        self.hooks.add_on_starting_hook(hook);
12✔
102
    }
12✔
103

104
    pub fn on_progress<H>(&mut self, hook: H)
12✔
105
    where H: Fn(u64, u64, &SyncPeer) + Send + Sync + 'static {
12✔
106
        self.hooks.add_on_progress_header_hook(hook);
12✔
107
    }
12✔
108

109
    pub fn on_rewind<H>(&mut self, hook: H)
12✔
110
    where H: Fn(Vec<Arc<ChainBlock>>) + Send + Sync + 'static {
12✔
111
        self.hooks.add_on_rewind_hook(hook);
12✔
112
    }
12✔
113

114
    pub async fn synchronize(&mut self) -> Result<(SyncPeer, AttemptSyncResult), BlockHeaderSyncError> {
12✔
115
        debug!(target: LOG_TARGET, "Starting header sync.",);
12✔
116

117
        info!(
12✔
118
            target: LOG_TARGET,
×
119
            "Synchronizing headers ({} candidate peers selected)",
×
120
            self.sync_peers.len()
×
121
        );
122
        let mut max_latency = self.config.initial_max_sync_latency;
12✔
123
        let mut latency_increases_counter = 0;
12✔
124
        loop {
125
            match self.try_sync_from_all_peers(max_latency).await {
12✔
126
                Ok((peer, sync_result)) => break Ok((peer, sync_result)),
4✔
127
                Err(err @ BlockHeaderSyncError::AllSyncPeersExceedLatency) => {
×
128
                    // If we have few sync peers, throw this out to be retried later
129
                    if self.sync_peers.len() < 2 {
×
130
                        return Err(err);
×
131
                    }
×
132
                    max_latency += self.config.max_latency_increase;
×
133
                    latency_increases_counter += 1;
×
134
                    if latency_increases_counter > MAX_LATENCY_INCREASES {
×
135
                        return Err(err);
×
136
                    }
×
137
                },
138
                Err(err) => break Err(err),
8✔
139
            }
140
        }
141
    }
12✔
142

143
    #[allow(clippy::too_many_lines)]
144
    pub async fn try_sync_from_all_peers(
12✔
145
        &mut self,
12✔
146
        max_latency: Duration,
12✔
147
    ) -> Result<(SyncPeer, AttemptSyncResult), BlockHeaderSyncError> {
12✔
148
        let sync_peer_node_ids = self.sync_peers.iter().map(|p| p.node_id()).cloned().collect::<Vec<_>>();
12✔
149
        info!(
12✔
150
            target: LOG_TARGET,
×
151
            "Attempting to sync headers ({} sync peers)",
×
152
            sync_peer_node_ids.len()
×
153
        );
154
        let mut latency_counter = 0usize;
12✔
155
        for node_id in sync_peer_node_ids {
20✔
156
            match self.connect_and_attempt_sync(&node_id, max_latency).await {
12✔
157
                Ok((peer, sync_result)) => return Ok((peer, sync_result)),
4✔
158
                Err(err) => {
8✔
159
                    let ban_reason = BlockHeaderSyncError::get_ban_reason(&err);
8✔
160
                    if let Some(reason) = ban_reason {
8✔
UNCOV
161
                        warn!(target: LOG_TARGET, "{err}");
×
UNCOV
162
                        let duration = match reason.ban_duration {
×
UNCOV
163
                            BanPeriod::Short => self.config.short_ban_period,
×
164
                            BanPeriod::Long => self.config.ban_period,
×
165
                        };
UNCOV
166
                        self.peer_ban_manager
×
UNCOV
167
                            .ban_peer_if_required(&node_id, reason.reason, duration)
×
UNCOV
168
                            .await;
×
169
                    }
8✔
170
                    if let BlockHeaderSyncError::MaxLatencyExceeded { .. } = err {
8✔
171
                        latency_counter += 1;
×
172
                    } else {
8✔
173
                        self.remove_sync_peer(&node_id);
8✔
174
                    }
8✔
175
                },
176
            }
177
        }
178

179
        if self.sync_peers.is_empty() {
8✔
180
            Err(BlockHeaderSyncError::NoMoreSyncPeers("Header sync failed".to_string()))
8✔
181
        } else if latency_counter >= self.sync_peers.len() {
×
182
            Err(BlockHeaderSyncError::AllSyncPeersExceedLatency)
×
183
        } else {
184
            Err(BlockHeaderSyncError::SyncFailedAllPeers)
×
185
        }
186
    }
12✔
187

188
    async fn connect_and_attempt_sync(
12✔
189
        &mut self,
12✔
190
        node_id: &NodeId,
12✔
191
        max_latency: Duration,
12✔
192
    ) -> Result<(SyncPeer, AttemptSyncResult), BlockHeaderSyncError> {
12✔
193
        let peer_index = self
12✔
194
            .get_sync_peer_index(node_id)
12✔
195
            .ok_or(BlockHeaderSyncError::PeerNotFound)?;
12✔
196
        let sync_peer = self.sync_peers.get(peer_index).expect("Already checked");
12✔
197
        self.hooks.call_on_starting_hook(sync_peer);
12✔
198

199
        let mut conn = self.dial_sync_peer(node_id).await?;
12✔
200
        debug!(
4✔
201
            target: LOG_TARGET,
×
202
            "Attempting to synchronize headers with `{node_id}`"
×
203
        );
204

205
        let config = RpcClient::builder()
4✔
206
            .with_deadline(self.config.rpc_deadline)
4✔
207
            .with_deadline_grace_period(Duration::from_secs(5));
4✔
208
        let mut client = conn
4✔
209
            .connect_rpc_using_builder::<rpc::BaseNodeSyncRpcClient>(config)
4✔
210
            .await?;
4✔
211

212
        let latency = client
4✔
213
            .get_last_request_latency()
4✔
214
            .expect("unreachable panic: last request latency must be set after connect");
4✔
215
        self.sync_peers
4✔
216
            .get_mut(peer_index)
4✔
217
            .ok_or(BlockHeaderSyncError::PeerNotFound)?
4✔
218
            .set_latency(latency);
4✔
219
        if latency > max_latency {
4✔
220
            return Err(BlockHeaderSyncError::MaxLatencyExceeded {
×
221
                peer: conn.peer_node_id().clone(),
×
222
                latency,
×
223
                max_latency,
×
224
            });
×
225
        }
4✔
226

227
        debug!(target: LOG_TARGET, "Sync peer latency is {latency:.2?}");
4✔
228
        let sync_peer = self
4✔
229
            .sync_peers
4✔
230
            .get(peer_index)
4✔
231
            .ok_or(BlockHeaderSyncError::PeerNotFound)?
4✔
232
            .clone();
4✔
233
        let sync_result = self.attempt_sync(&sync_peer, client, max_latency).await?;
4✔
234
        Ok((sync_peer, sync_result))
4✔
235
    }
12✔
236

237
    async fn dial_sync_peer(&self, node_id: &NodeId) -> Result<PeerConnection, BlockHeaderSyncError> {
12✔
238
        let timer = Instant::now();
12✔
239
        debug!(target: LOG_TARGET, "Dialing {node_id} sync peer");
12✔
240
        let conn = self.connectivity.dial_peer(node_id.clone()).await?;
12✔
241
        info!(
4✔
242
            target: LOG_TARGET,
×
243
            "Successfully dialed sync peer {} in {:.2?}",
×
244
            node_id,
245
            timer.elapsed()
×
246
        );
247
        Ok(conn)
4✔
248
    }
12✔
249

250
    async fn attempt_sync(
4✔
251
        &mut self,
4✔
252
        sync_peer: &SyncPeer,
4✔
253
        mut client: rpc::BaseNodeSyncRpcClient,
4✔
254
        max_latency: Duration,
4✔
255
    ) -> Result<AttemptSyncResult, BlockHeaderSyncError> {
4✔
256
        let latency = client.get_last_request_latency();
4✔
257
        debug!(
4✔
258
            target: LOG_TARGET,
×
259
            "Initiating header sync with peer `{}` (sync latency = {}ms)",
×
260
            sync_peer.node_id(),
×
261
            latency.unwrap_or_default().as_millis()
×
262
        );
263

264
        // Fetch best local data at the beginning of the sync process
265
        let best_block_metadata = self.db.get_chain_metadata().await?;
4✔
266
        let best_header = self.db.fetch_last_chain_header().await?;
4✔
267
        let best_block_header = self
4✔
268
            .db
4✔
269
            .fetch_chain_header(best_block_metadata.best_block_height())
4✔
270
            .await?;
4✔
271
        let best_header_height = best_header.height();
4✔
272
        let best_block_height = best_block_header.height();
4✔
273

274
        if best_header_height < best_block_height || best_block_height < self.local_cached_metadata.best_block_height()
4✔
275
        {
276
            return Err(BlockHeaderSyncError::ChainStorageError(
×
277
                ChainStorageError::CorruptedDatabase("Inconsistent block and header data".to_string()),
×
278
            ));
×
279
        }
4✔
280

281
        // - At this point we may have more (InSyncOrAhead), equal (InSyncOrAhead), or less headers (Lagging) than the
282
        //   peer, but they claimed better POW before we attempted sync.
283
        // - This method will return ban-able errors for certain offenses.
284
        let (header_sync_status, peer_response) = self
4✔
285
            .determine_sync_status(
4✔
286
                sync_peer,
4✔
287
                best_header.clone(),
4✔
288
                best_block_header.clone(),
4✔
289
                self.config.max_reorg_depth_allowed,
4✔
290
                &mut client,
4✔
291
            )
4✔
292
            .await?;
4✔
293

294
        match header_sync_status.clone() {
4✔
295
            HeaderSyncStatus::InSyncOrAhead => {
296
                debug!(
1✔
297
                    target: LOG_TARGET,
×
298
                    "Headers are in sync at height {best_header_height} but tip is {best_block_height}. Proceeding to archival/pruned block sync"
×
299
                );
300

301
                Ok(AttemptSyncResult {
1✔
302
                    headers_returned: peer_response.peer_headers.len() as u64,
1✔
303
                    peer_fork_hash_index: peer_response.peer_fork_hash_index,
1✔
304
                    header_sync_status,
1✔
305
                })
1✔
306
            },
307
            HeaderSyncStatus::Lagging(split_info) => {
3✔
308
                self.hooks.call_on_progress_header_hooks(
3✔
309
                    split_info
3✔
310
                        .best_block_header
3✔
311
                        .height()
3✔
312
                        .checked_sub(split_info.reorg_steps_back)
3✔
313
                        .unwrap_or_default(),
3✔
314
                    sync_peer.claimed_chain_metadata().best_block_height(),
3✔
315
                    sync_peer,
3✔
316
                );
317
                self.synchronize_headers(sync_peer.clone(), &mut client, *split_info, max_latency)
3✔
318
                    .await?;
3✔
319
                Ok(AttemptSyncResult {
3✔
320
                    headers_returned: peer_response.peer_headers.len() as u64,
3✔
321
                    peer_fork_hash_index: peer_response.peer_fork_hash_index,
3✔
322
                    header_sync_status,
3✔
323
                })
3✔
324
            },
325
        }
326
    }
4✔
327

328
    #[allow(clippy::too_many_lines)]
329
    async fn find_chain_split(
4✔
330
        &mut self,
4✔
331
        peer_node_id: &NodeId,
4✔
332
        max_reorg_depth_allowed: usize,
4✔
333
        client: &mut rpc::BaseNodeSyncRpcClient,
4✔
334
        header_count: u64,
4✔
335
    ) -> Result<FindChainSplitResult, BlockHeaderSyncError> {
4✔
336
        const NUM_CHAIN_SPLIT_HEADERS: usize = 500;
337
        // Limit how far back we're willing to go. A peer might just say it does not have a chain split
338
        // and keep us busy going back until the genesis.
339
        // 20 x 500 = max 10,000 block split can be detected. The 10_000 limit is default, but can be overridden
340
        let max_chain_split_iters = max_reorg_depth_allowed.saturating_div(NUM_CHAIN_SPLIT_HEADERS);
4✔
341

342
        let mut offset = 0;
4✔
343
        let mut iter_count = 0;
4✔
344
        loop {
345
            iter_count += 1;
4✔
346
            if iter_count > max_chain_split_iters {
4✔
347
                warn!(
×
348
                    target: LOG_TARGET,
×
349
                    "Peer `{}` did not provide a chain split after {} headers requested. Peer will be banned.",
×
350
                    peer_node_id,
351
                    NUM_CHAIN_SPLIT_HEADERS * max_chain_split_iters,
×
352
                );
353
                return Err(BlockHeaderSyncError::ChainSplitNotFound(peer_node_id.clone()));
×
354
            }
4✔
355

356
            let block_hashes = self
4✔
357
                .db
4✔
358
                .fetch_block_hashes_from_header_tip(NUM_CHAIN_SPLIT_HEADERS, offset)
4✔
359
                .await?;
4✔
360
            debug!(
4✔
361
                target: LOG_TARGET,
×
362
                "Determining if chain splits between {} and {} headers back from the tip (peer: `{}`, {} hashes sent)",
×
363
                offset,
364
                offset + NUM_CHAIN_SPLIT_HEADERS,
×
365
                peer_node_id,
366
                block_hashes.len()
×
367
            );
368

369
            // No further hashes to send.
370
            if block_hashes.is_empty() {
4✔
371
                warn!(
×
372
                    target: LOG_TARGET,
×
373
                    "Peer `{}` did not provide a chain split after {} headers requested. Peer will be banned.",
×
374
                    peer_node_id,
375
                    NUM_CHAIN_SPLIT_HEADERS * max_chain_split_iters,
×
376
                );
377
                return Err(BlockHeaderSyncError::ChainSplitNotFound(peer_node_id.clone()));
×
378
            }
4✔
379

380
            let request = FindChainSplitRequest {
4✔
381
                block_hashes: block_hashes.clone().iter().map(|v| v.to_vec()).collect(),
6✔
382
                header_count,
4✔
383
            };
384

385
            let resp = match client.find_chain_split(request).await {
4✔
386
                Ok(r) => r,
4✔
387
                Err(RpcError::RequestFailed(err)) if err.as_status_code().is_not_found() => {
×
388
                    // This round we sent less hashes than the max, so the next round will not have any more hashes to
389
                    // send. Exit early in this case.
390
                    if block_hashes.len() < NUM_CHAIN_SPLIT_HEADERS {
×
391
                        warn!(
×
392
                            target: LOG_TARGET,
×
393
                            "Peer `{}` did not provide a chain split after {} headers requested. Peer will be banned.",
×
394
                            peer_node_id,
395
                            NUM_CHAIN_SPLIT_HEADERS * max_chain_split_iters,
×
396
                        );
397
                        return Err(BlockHeaderSyncError::ChainSplitNotFound(peer_node_id.clone()));
×
398
                    }
×
399
                    // Chain split not found, let's go further back
400
                    offset = NUM_CHAIN_SPLIT_HEADERS * iter_count;
×
401
                    continue;
×
402
                },
UNCOV
403
                Err(err) => {
×
UNCOV
404
                    return Err(err.into());
×
405
                },
406
            };
407
            if resp.headers.len() > HEADER_SYNC_INITIAL_MAX_HEADERS {
4✔
408
                warn!(
×
409
                    target: LOG_TARGET,
×
410
                    "Peer `{}` sent too many headers {}, only requested {}. Peer will be banned.",
×
411
                    peer_node_id,
412
                    resp.headers.len(),
×
413
                    HEADER_SYNC_INITIAL_MAX_HEADERS,
414
                );
415
                return Err(BlockHeaderSyncError::PeerSentTooManyHeaders(resp.headers.len()));
×
416
            }
4✔
417
            if resp.fork_hash_index >= block_hashes.len() as u64 {
4✔
418
                warn!(
×
419
                    target: LOG_TARGET,
×
420
                    "Peer `{}` sent hash index {} out of range {}. Peer will be banned.",
×
421
                    peer_node_id,
422
                    resp.fork_hash_index,
423
                    block_hashes.len(),
×
424
                );
425
                return Err(BlockHeaderSyncError::FoundHashIndexOutOfRange(
×
426
                    block_hashes.len() as u64,
×
427
                    resp.fork_hash_index,
×
428
                ));
×
429
            }
4✔
430
            #[allow(clippy::cast_possible_truncation)]
431
            if !resp.headers.is_empty() &&
4✔
432
                *resp.headers.first().expect("Already checked").prev_hash !=
3✔
433
                    *block_hashes
3✔
434
                        .get(resp.fork_hash_index as usize)
3✔
435
                        .expect("Already checked")
3✔
436
            {
437
                warn!(
×
438
                    target: LOG_TARGET,
×
439
                    "Peer `{}` sent hash an invalid protocol response, incorrect fork hash index {}. Peer will be banned.",
×
440
                    peer_node_id,
441
                    resp.fork_hash_index,
442
                );
443
                return Err(BlockHeaderSyncError::InvalidProtocolResponse(
×
444
                    "Peer sent incorrect fork hash index".into(),
×
445
                ));
×
446
            }
4✔
447
            #[allow(clippy::cast_possible_truncation)]
448
            let chain_split_hash = *block_hashes
4✔
449
                .get(resp.fork_hash_index as usize)
4✔
450
                .expect("Already checked");
4✔
451

452
            return Ok(FindChainSplitResult {
4✔
453
                reorg_steps_back: resp.fork_hash_index.saturating_add(offset as u64),
4✔
454
                peer_headers: resp.headers,
4✔
455
                peer_fork_hash_index: resp.fork_hash_index,
4✔
456
                chain_split_hash,
4✔
457
            });
4✔
458
        }
459
    }
4✔
460

461
    /// Attempt to determine the point at which the remote and local chain diverge, returning the relevant information
462
    /// of the chain split (see [HeaderSyncStatus]).
463
    ///
464
    /// If the local node is behind the remote chain (i.e. `HeaderSyncStatus::Lagging`), the appropriate
465
    /// `ChainSplitInfo` is returned, the header validator is initialized and the preliminary headers are validated.
466
    async fn determine_sync_status(
4✔
467
        &mut self,
4✔
468
        sync_peer: &SyncPeer,
4✔
469
        best_header: ChainHeader,
4✔
470
        best_block_header: ChainHeader,
4✔
471
        max_reorg_depth_allowed: usize,
4✔
472
        client: &mut rpc::BaseNodeSyncRpcClient,
4✔
473
    ) -> Result<(HeaderSyncStatus, FindChainSplitResult), BlockHeaderSyncError> {
4✔
474
        // This method will return ban-able errors for certain offenses.
475
        let chain_split_result = self
4✔
476
            .find_chain_split(
4✔
477
                sync_peer.node_id(),
4✔
478
                max_reorg_depth_allowed,
4✔
479
                client,
4✔
480
                HEADER_SYNC_INITIAL_MAX_HEADERS as u64,
4✔
481
            )
4✔
482
            .await?;
4✔
483
        if chain_split_result.reorg_steps_back > 0 {
4✔
484
            debug!(
×
485
                target: LOG_TARGET,
×
486
                "Found chain split {} blocks back, received {} headers from peer `{}`",
×
487
                chain_split_result.reorg_steps_back,
488
                chain_split_result.peer_headers.len(),
×
489
                sync_peer
490
            );
491
        }
4✔
492

493
        // If the peer returned no new headers, they may still have more blocks than we have, thus have a higher
494
        // accumulated difficulty.
495
        if chain_split_result.peer_headers.is_empty() {
4✔
496
            // Our POW is less than the peer's POW, as verified before the attempted header sync, therefore, if the
497
            // peer did not supply any headers and we know we are behind based on the peer's claimed metadata, then
498
            // we can ban the peer.
499
            if best_header.height() == best_block_header.height() {
1✔
500
                warn!(
×
501
                    target: LOG_TARGET,
×
502
                    "Peer `{}` did not provide any headers although they have a better chain and more headers: their \
×
503
                    difficulty: {}, our difficulty: {}. Peer will be banned.",
×
504
                    sync_peer.node_id(),
×
505
                    sync_peer.claimed_chain_metadata().accumulated_difficulty(),
×
506
                    best_block_header.accumulated_data().total_accumulated_difficulty,
×
507
                );
508
                return Err(BlockHeaderSyncError::PeerSentInaccurateChainMetadata {
×
509
                    claimed: sync_peer.claimed_chain_metadata().accumulated_difficulty(),
×
510
                    actual: None,
×
511
                    local: best_block_header.accumulated_data().total_accumulated_difficulty,
×
512
                });
×
513
            }
1✔
514
            debug!(target: LOG_TARGET, "Peer `{}` sent no headers; headers already in sync with peer.", sync_peer.node_id());
1✔
515
            return Ok((HeaderSyncStatus::InSyncOrAhead, chain_split_result));
1✔
516
        }
3✔
517

518
        let headers = chain_split_result
3✔
519
            .peer_headers
3✔
520
            .clone()
3✔
521
            .into_iter()
3✔
522
            .map(BlockHeader::try_from)
3✔
523
            .collect::<Result<Vec<_>, _>>()
3✔
524
            .map_err(BlockHeaderSyncError::ReceivedInvalidHeader)?;
3✔
525
        let num_new_headers = headers.len();
3✔
526
        // Do a cheap check to verify that we do not have these series of headers in the db already - if the 1st one is
527
        // not there most probably the rest are not either - the peer could still have returned old headers later on in
528
        // the list
529
        if self
3✔
530
            .db
3✔
531
            .fetch_header_by_block_hash(headers.first().expect("Already checked").hash())
3✔
532
            .await?
3✔
533
            .is_some()
3✔
534
        {
535
            return Err(BlockHeaderSyncError::ReceivedInvalidHeader(
×
536
                "Header already in database".to_string(),
×
537
            ));
×
538
        };
3✔
539

540
        self.header_validator
3✔
541
            .initialize_state(&chain_split_result.chain_split_hash)
3✔
542
            .await?;
3✔
543
        for header in headers {
15✔
544
            debug!(
12✔
545
                target: LOG_TARGET,
×
546
                "Validating header #{} (Pow: {}) with hash: ({})",
×
547
                header.height,
548
                header.pow_algo(),
×
549
                header.hash().to_hex(),
×
550
            );
551
            self.header_validator.validate(header).await?;
12✔
552
        }
553

554
        debug!(
3✔
555
            target: LOG_TARGET,
×
556
            "Peer `{}` has submitted {} valid header(s)", sync_peer.node_id(), num_new_headers
×
557
        );
558

559
        let chain_split_info = ChainSplitInfo {
3✔
560
            best_block_header,
3✔
561
            reorg_steps_back: chain_split_result.reorg_steps_back,
3✔
562
            chain_split_hash: chain_split_result.chain_split_hash,
3✔
563
        };
3✔
564
        Ok((
3✔
565
            HeaderSyncStatus::Lagging(Box::new(chain_split_info)),
3✔
566
            chain_split_result,
3✔
567
        ))
3✔
568
    }
4✔
569

570
    async fn rewind_blockchain(&self, split_hash: HashOutput) -> Result<Vec<Arc<ChainBlock>>, BlockHeaderSyncError> {
×
571
        debug!(
×
572
            target: LOG_TARGET,
×
573
            "Deleting headers that no longer form part of the main chain up until split at {}",
×
574
            split_hash.to_hex()
×
575
        );
576

577
        let blocks = self.db.rewind_to_hash(split_hash).await?;
×
578
        debug!(
×
579
            target: LOG_TARGET,
×
580
            "Rewound {} block(s) in preparation for header sync",
×
581
            blocks.len()
×
582
        );
583
        Ok(blocks)
×
584
    }
×
585

586
    #[allow(clippy::too_many_lines)]
587
    async fn synchronize_headers(
3✔
588
        &mut self,
3✔
589
        mut sync_peer: SyncPeer,
3✔
590
        client: &mut rpc::BaseNodeSyncRpcClient,
3✔
591
        split_info: ChainSplitInfo,
3✔
592
        max_latency: Duration,
3✔
593
    ) -> Result<(), BlockHeaderSyncError> {
3✔
594
        info!(target: LOG_TARGET, "Starting header sync from peer {sync_peer}");
3✔
595
        const COMMIT_EVERY_N_HEADERS: usize = 1000;
596

597
        let mut has_switched_to_new_chain = false;
3✔
598
        let pending_len = self.header_validator.valid_headers().len();
3✔
599

600
        // Find the hash to start syncing the rest of the headers.
601
        // The expectation cannot fail because there has been at least one valid header returned (checked in
602
        // determine_sync_status)
603
        let (start_header_height, start_header_hash, total_accumulated_difficulty) = self
3✔
604
            .header_validator
3✔
605
            .current_valid_chain_tip_header()
3✔
606
            .map(|h| (h.height(), *h.hash(), h.accumulated_data().total_accumulated_difficulty))
3✔
607
            .expect("synchronize_headers: expected there to be a valid tip header but it was None");
3✔
608

609
        // If we already have a stronger chain at this point, switch over to it.
610
        // just in case we happen to be exactly HEADER_SYNC_INITIAL_MAX_HEADERS headers behind.
611
        let has_better_pow = self.pending_chain_has_higher_pow(&split_info.best_block_header);
3✔
612

613
        if has_better_pow {
3✔
614
            debug!(
3✔
615
                target: LOG_TARGET,
×
616
                "Remote chain from peer {} has higher PoW. Switching",
×
617
                sync_peer.node_id()
×
618
            );
619
            self.switch_to_pending_chain(&split_info).await?;
3✔
620
            has_switched_to_new_chain = true;
3✔
621
        }
×
622

623
        if pending_len < HEADER_SYNC_INITIAL_MAX_HEADERS {
3✔
624
            // Peer returned less than the max number of requested headers. This indicates that we have all the
625
            // available headers from the peer.
626
            if !has_better_pow {
3✔
627
                // Because the pow is less or equal than the current chain the peer had to have lied about their pow
628
                debug!(target: LOG_TARGET, "No further headers to download");
×
629
                return Err(BlockHeaderSyncError::PeerSentInaccurateChainMetadata {
×
630
                    claimed: sync_peer.claimed_chain_metadata().accumulated_difficulty(),
×
631
                    actual: Some(total_accumulated_difficulty),
×
632
                    local: split_info
×
633
                        .best_block_header
×
634
                        .accumulated_data()
×
635
                        .total_accumulated_difficulty,
×
636
                });
×
637
            }
3✔
638
            // The pow is higher, we swapped to the higher chain, we have all the better chain headers, we can move on
639
            // to block sync.
640
            return Ok(());
3✔
641
        }
×
642

643
        debug!(
×
644
            target: LOG_TARGET,
×
645
            "Download remaining headers starting from header #{} from peer `{}`",
×
646
            start_header_height,
647
            sync_peer.node_id()
×
648
        );
649
        let request = SyncHeadersRequest {
×
650
            start_hash: start_header_hash.to_vec(),
×
651
            // To the tip!
×
652
            count: 0,
×
653
        };
×
654

655
        let mut header_stream = client.sync_headers(request).await?;
×
656
        debug!(
×
657
            target: LOG_TARGET,
×
658
            "Reading headers from peer `{}`",
×
659
            sync_peer.node_id()
×
660
        );
661

662
        let mut last_sync_timer = Instant::now();
×
663

664
        let mut last_total_accumulated_difficulty = U512::zero();
×
665
        let mut avg_latency = RollingAverageTime::new(20);
×
666
        let mut prev_height: Option<u64> = None;
×
667
        while let Some(header) = header_stream.next().await {
×
668
            let latency = last_sync_timer.elapsed();
×
669
            avg_latency.add_sample(latency);
×
670
            let header = BlockHeader::try_from(header?).map_err(BlockHeaderSyncError::ReceivedInvalidHeader)?;
×
671
            debug!(
×
672
                target: LOG_TARGET,
×
673
                "Validating header #{} (Pow: {}) with hash: ({}). Latency: {:.2?}",
×
674
                header.height,
675
                header.pow_algo(),
×
676
                header.hash().to_hex(),
×
677
                latency
678
            );
679
            trace!(
×
680
                target: LOG_TARGET,
×
681
                "{header}"
×
682
            );
683
            if let Some(prev_header_height) = prev_height {
×
684
                if header.height != prev_header_height.saturating_add(1) {
×
685
                    warn!(
×
686
                        target: LOG_TARGET,
×
687
                        "Received header #{} `{}` does not follow previous header",
×
688
                        header.height,
689
                        header.hash().to_hex()
×
690
                    );
691
                    return Err(BlockHeaderSyncError::ReceivedInvalidHeader(
×
692
                        "Header does not follow previous header".to_string(),
×
693
                    ));
×
694
                }
×
695
            }
×
696
            let existing_header = self.db.fetch_header_by_block_hash(header.hash()).await?;
×
697
            if let Some(h) = existing_header {
×
698
                warn!(
×
699
                    target: LOG_TARGET,
×
700
                    "Received header #{} `{}` that we already have.",
×
701
                    h.height,
702
                    h.hash().to_hex()
×
703
                );
704
                return Err(BlockHeaderSyncError::ReceivedInvalidHeader(
×
705
                    "Header already in database".to_string(),
×
706
                ));
×
707
            }
×
708
            let current_height = header.height;
×
709
            last_total_accumulated_difficulty = self.header_validator.validate(header).await?;
×
710

711
            if has_switched_to_new_chain {
×
712
                // If we've switched to the new chain, we simply commit every COMMIT_EVERY_N_HEADERS headers
713
                if self.header_validator.valid_headers().len() >= COMMIT_EVERY_N_HEADERS {
×
714
                    self.commit_pending_headers().await?;
×
715
                }
×
716
            } else {
717
                // The remote chain has not (yet) been accepted.
718
                // We check the tip difficulties, switching over to the new chain if a higher accumulated difficulty is
719
                // achieved.
720
                if self.pending_chain_has_higher_pow(&split_info.best_block_header) {
×
721
                    self.switch_to_pending_chain(&split_info).await?;
×
722
                    has_switched_to_new_chain = true;
×
723
                }
×
724
            }
725

726
            sync_peer.set_latency(latency);
×
727
            sync_peer.add_sample(last_sync_timer.elapsed());
×
728
            self.hooks.call_on_progress_header_hooks(
×
729
                current_height,
×
730
                sync_peer.claimed_chain_metadata().best_block_height(),
×
731
                &sync_peer,
×
732
            );
733

734
            let last_avg_latency = avg_latency.calculate_average_with_min_samples(5);
×
735
            if let Some(avg_latency) = last_avg_latency {
×
736
                if avg_latency > max_latency {
×
737
                    return Err(BlockHeaderSyncError::MaxLatencyExceeded {
×
738
                        peer: sync_peer.node_id().clone(),
×
739
                        latency: avg_latency,
×
740
                        max_latency,
×
741
                    });
×
742
                }
×
743
            }
×
744

745
            last_sync_timer = Instant::now();
×
746
            prev_height = Some(current_height);
×
747
        }
748

749
        let claimed_total_accumulated_diff = sync_peer.claimed_chain_metadata().accumulated_difficulty();
×
750
        if !has_switched_to_new_chain {
×
751
            let best_local_before_sync = split_info
×
752
                .best_block_header
×
753
                .accumulated_data()
×
754
                .total_accumulated_difficulty;
×
755
            match self
×
756
                .header_validator
×
757
                .current_valid_chain_tip_header()
×
758
                .map(|h| h.accumulated_data().total_accumulated_difficulty)
×
759
            {
760
                Some(validated_total_accumulated_diff) => {
×
761
                    if claimed_total_accumulated_diff > validated_total_accumulated_diff {
×
762
                        // Over-claim: peer advertised more PoW than their headers actually provide.
763
                        return Err(BlockHeaderSyncError::PeerSentInaccurateChainMetadata {
×
764
                            claimed: claimed_total_accumulated_diff,
×
765
                            actual: Some(validated_total_accumulated_diff),
×
766
                            local: best_local_before_sync,
×
767
                        });
×
768
                    } else if self.pending_chain_has_higher_pow(&split_info.best_block_header) {
×
769
                        self.switch_to_pending_chain(&split_info).await?;
×
770
                        has_switched_to_new_chain = true;
×
771
                        info!(
×
772
                            target: LOG_TARGET,
×
773
                            "Received PoW from peer exceeds local tip. Before sync: {}, received: {}. Committed.",
×
774
                            best_local_before_sync,
775
                            validated_total_accumulated_diff,
776
                        );
777
                    } else {
778
                        // We have a stronger chain, so we do not commit the headers.
779
                        debug!(
×
780
                            target: LOG_TARGET,
×
781
                            "Not committing headers as we have a stronger chain, ours: {} theirs: {}",
×
782
                            best_local_before_sync,
783
                            validated_total_accumulated_diff,
784
                        );
785
                    };
786
                },
787
                None => {
788
                    // No validated headers at this stage, but there should have been.
789
                    return Err(BlockHeaderSyncError::PeerSentInaccurateChainMetadata {
×
790
                        claimed: claimed_total_accumulated_diff,
×
791
                        actual: None,
×
792
                        local: best_local_before_sync,
×
793
                    });
×
794
                },
795
            }
796
        }
×
797

798
        // Commit the last blocks only if we have switched to the new chain.
799
        if has_switched_to_new_chain && !self.header_validator.valid_headers().is_empty() {
×
800
            self.commit_pending_headers().await?;
×
801
        }
×
802

803
        // This rule is strict: if the peer advertised a higher PoW than they were able to provide (without
804
        // some other external factor like a disconnect etc), we detect the and ban the peer.
805
        if last_total_accumulated_difficulty < claimed_total_accumulated_diff {
×
806
            return Err(BlockHeaderSyncError::PeerSentInaccurateChainMetadata {
×
807
                claimed: claimed_total_accumulated_diff,
×
808
                actual: Some(last_total_accumulated_difficulty),
×
809
                local: split_info
×
810
                    .best_block_header
×
811
                    .accumulated_data()
×
812
                    .total_accumulated_difficulty,
×
813
            });
×
814
        }
×
815

816
        Ok(())
×
817
    }
3✔
818

819
    async fn commit_pending_headers(&mut self) -> Result<ChainHeader, BlockHeaderSyncError> {
3✔
820
        let chain_headers = self.header_validator.take_valid_headers();
3✔
821
        let num_headers = chain_headers.len();
3✔
822
        let start = Instant::now();
3✔
823

824
        let new_tip = chain_headers.last().cloned().unwrap();
3✔
825
        let mut txn = self.db.write_transaction();
3✔
826
        chain_headers.into_iter().for_each(|chain_header| {
12✔
827
            txn.insert_chain_header(chain_header);
12✔
828
        });
12✔
829

830
        txn.commit().await?;
3✔
831

832
        debug!(
3✔
833
            target: LOG_TARGET,
×
834
            "{} header(s) committed (tip = {}) to the blockchain db in {:.2?}",
×
835
            num_headers,
836
            new_tip.height(),
×
837
            start.elapsed()
×
838
        );
839

840
        Ok(new_tip)
3✔
841
    }
3✔
842

843
    fn pending_chain_has_higher_pow(&self, current_tip: &ChainHeader) -> bool {
3✔
844
        let chain_headers = self.header_validator.valid_headers();
3✔
845
        if chain_headers.is_empty() {
3✔
846
            return false;
×
847
        }
3✔
848

849
        // Check that the remote tip is stronger than the local tip, equal should not have ended up here, so we treat
850
        // equal as less
851
        let proposed_tip = chain_headers.last().unwrap();
3✔
852
        self.header_validator.compare_chains(current_tip, proposed_tip).is_lt()
3✔
853
    }
3✔
854

855
    async fn switch_to_pending_chain(&mut self, split_info: &ChainSplitInfo) -> Result<(), BlockHeaderSyncError> {
3✔
856
        // Reorg if required
857
        if split_info.reorg_steps_back > 0 {
3✔
858
            debug!(
×
859
                target: LOG_TARGET,
×
860
                "Reorg: Rewinding the chain by {} block(s) (split hash = {})",
×
861
                split_info.reorg_steps_back,
862
                split_info.chain_split_hash.to_hex()
×
863
            );
864
            let blocks = self.rewind_blockchain(split_info.chain_split_hash).await?;
×
865
            if !blocks.is_empty() {
×
866
                self.hooks.call_on_rewind_hooks(blocks);
×
867
            }
×
868
        }
3✔
869

870
        // Commit the forked chain. At this point
871
        // 1. Headers have been validated
872
        // 2. The forked chain has a higher PoW than the local chain
873
        //
874
        // After this we commit headers every `n` blocks
875
        self.commit_pending_headers().await?;
3✔
876

877
        Ok(())
3✔
878
    }
3✔
879

880
    // Sync peers are also removed from the list of sync peers if the ban duration is longer than the short ban period.
881
    fn remove_sync_peer(&mut self, node_id: &NodeId) {
8✔
882
        if let Some(pos) = self.sync_peers.iter().position(|p| p.node_id() == node_id) {
8✔
883
            self.sync_peers.remove(pos);
8✔
884
        }
8✔
885
    }
8✔
886

887
    // Helper function to get the index to the node_id inside of the vec of peers
888
    fn get_sync_peer_index(&mut self, node_id: &NodeId) -> Option<usize> {
12✔
889
        self.sync_peers.iter().position(|p| p.node_id() == node_id)
12✔
890
    }
12✔
891
}
892

893
#[derive(Debug, Clone)]
894
struct FindChainSplitResult {
895
    reorg_steps_back: u64,
896
    peer_headers: Vec<ProtoBlockHeader>,
897
    peer_fork_hash_index: u64,
898
    chain_split_hash: HashOutput,
899
}
900

901
/// Information about the chain split from the remote node.
902
#[derive(Debug, Clone, PartialEq)]
903
pub struct ChainSplitInfo {
904
    /// The best block's header on the local chain.
905
    pub best_block_header: ChainHeader,
906
    /// The number of blocks to reorg back to the fork.
907
    pub reorg_steps_back: u64,
908
    /// The hash of the block at the fork.
909
    pub chain_split_hash: HashOutput,
910
}
911

912
/// The result of an attempt to synchronize headers with a peer.
913
#[derive(Debug, Clone, PartialEq)]
914
pub struct AttemptSyncResult {
915
    /// The number of headers that were returned.
916
    pub headers_returned: u64,
917
    /// The fork hash index of the remote peer.
918
    pub peer_fork_hash_index: u64,
919
    /// The header sync status.
920
    pub header_sync_status: HeaderSyncStatus,
921
}
922

923
#[derive(Debug, Clone, PartialEq)]
924
pub enum HeaderSyncStatus {
925
    /// Local and remote node are in sync or ahead
926
    InSyncOrAhead,
927
    /// Local node is lagging behind remote node
928
    Lagging(Box<ChainSplitInfo>),
929
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc