• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In
Build has been canceled!

tari-project / tari / 21441807570

28 Jan 2026 02:18PM UTC coverage: 61.781% (-0.1%) from 61.907%
21441807570

push

github

web-flow
fix: user pay for fee and replace by fee (#7662)

Description
---
Fixes user pay for fee and replace by fee wallet calls
Fixes base node settings
Fixes some cucumber tests

0 of 19 new or added lines in 4 files covered. (0.0%)

513 existing lines in 28 files now uncovered.

70633 of 114328 relevant lines covered (61.78%)

228392.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.0
/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs
1
//  Copyright 2020, The Tari Project
2
//
3
//  Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
//  following conditions are met:
5
//
6
//  1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
//  disclaimer.
8
//
9
//  2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
//  following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
//  3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
//  products derived from this software without specific prior written permission.
14
//
15
//  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
//  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
//  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
//  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
//  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
//  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
//  USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22
use std::{
23
    sync::Arc,
24
    time::{Duration, Instant},
25
};
26

27
use futures::StreamExt;
28
use log::*;
29
use primitive_types::U512;
30
use tari_common_types::{chain_metadata::ChainMetadata, types::HashOutput};
31
use tari_comms::{
32
    connectivity::ConnectivityRequester,
33
    peer_manager::NodeId,
34
    protocol::rpc::{RpcClient, RpcError},
35
    PeerConnection,
36
};
37
use tari_node_components::blocks::{BlockHeader, ChainBlock, ChainHeader};
38
use tari_transaction_components::BanPeriod;
39
use tari_utilities::hex::Hex;
40

41
pub(crate) use super::{validator::BlockHeaderSyncValidator, BlockHeaderSyncError};
42
use crate::{
43
    base_node::sync::{
44
        ban::PeerBanManager,
45
        header_sync::HEADER_SYNC_INITIAL_MAX_HEADERS,
46
        hooks::Hooks,
47
        rpc,
48
        BlockchainSyncConfig,
49
        SyncPeer,
50
    },
51
    chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend, ChainStorageError},
52
    common::rolling_avg::RollingAverageTime,
53
    consensus::BaseNodeConsensusManager,
54
    proof_of_work::randomx_factory::RandomXFactory,
55
    proto::{
56
        base_node::{FindChainSplitRequest, SyncHeadersRequest},
57
        core::BlockHeader as ProtoBlockHeader,
58
    },
59
};
60

61
const LOG_TARGET: &str = "c::bn::header_sync";
62

63
const MAX_LATENCY_INCREASES: usize = 5;
64

65
pub struct HeaderSynchronizer<'a, B> {
66
    config: BlockchainSyncConfig,
67
    db: AsyncBlockchainDb<B>,
68
    header_validator: BlockHeaderSyncValidator<B>,
69
    connectivity: ConnectivityRequester,
70
    sync_peers: &'a mut Vec<SyncPeer>,
71
    hooks: Hooks,
72
    local_cached_metadata: &'a ChainMetadata,
73
    peer_ban_manager: PeerBanManager,
74
}
75

76
impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {
77
    pub fn new(
13✔
78
        config: BlockchainSyncConfig,
13✔
79
        db: AsyncBlockchainDb<B>,
13✔
80
        consensus_rules: BaseNodeConsensusManager,
13✔
81
        connectivity: ConnectivityRequester,
13✔
82
        sync_peers: &'a mut Vec<SyncPeer>,
13✔
83
        randomx_factory: RandomXFactory,
13✔
84
        local_metadata: &'a ChainMetadata,
13✔
85
    ) -> Self {
13✔
86
        let peer_ban_manager = PeerBanManager::new(config.clone(), connectivity.clone());
13✔
87
        Self {
13✔
88
            config,
13✔
89
            header_validator: BlockHeaderSyncValidator::new(db.clone(), consensus_rules, randomx_factory),
13✔
90
            db,
13✔
91
            connectivity,
13✔
92
            sync_peers,
13✔
93
            hooks: Default::default(),
13✔
94
            local_cached_metadata: local_metadata,
13✔
95
            peer_ban_manager,
13✔
96
        }
13✔
97
    }
13✔
98

99
    pub fn on_starting<H>(&mut self, hook: H)
13✔
100
    where for<'r> H: FnOnce(&SyncPeer) + Send + Sync + 'static {
13✔
101
        self.hooks.add_on_starting_hook(hook);
13✔
102
    }
13✔
103

104
    pub fn on_progress<H>(&mut self, hook: H)
13✔
105
    where H: Fn(u64, u64, &SyncPeer) + Send + Sync + 'static {
13✔
106
        self.hooks.add_on_progress_header_hook(hook);
13✔
107
    }
13✔
108

109
    pub fn on_rewind<H>(&mut self, hook: H)
13✔
110
    where H: Fn(Vec<Arc<ChainBlock>>) + Send + Sync + 'static {
13✔
111
        self.hooks.add_on_rewind_hook(hook);
13✔
112
    }
13✔
113

114
    pub async fn synchronize(&mut self) -> Result<(SyncPeer, AttemptSyncResult), BlockHeaderSyncError> {
13✔
115
        debug!(target: LOG_TARGET, "Starting header sync.",);
13✔
116

117
        info!(
13✔
118
            target: LOG_TARGET,
×
119
            "Synchronizing headers ({} candidate peers selected)",
120
            self.sync_peers.len()
×
121
        );
122
        let mut max_latency = self.config.initial_max_sync_latency;
13✔
123
        let mut latency_increases_counter = 0;
13✔
124
        loop {
125
            match self.try_sync_from_all_peers(max_latency).await {
13✔
126
                Ok((peer, sync_result)) => break Ok((peer, sync_result)),
12✔
127
                Err(err @ BlockHeaderSyncError::AllSyncPeersExceedLatency) => {
×
128
                    // If we have few sync peers, throw this out to be retried later
129
                    if self.sync_peers.len() < 2 {
×
130
                        return Err(err);
×
131
                    }
×
132
                    max_latency += self.config.max_latency_increase;
×
133
                    latency_increases_counter += 1;
×
134
                    if latency_increases_counter > MAX_LATENCY_INCREASES {
×
135
                        return Err(err);
×
136
                    }
×
137
                },
138
                Err(err) => break Err(err),
1✔
139
            }
140
        }
141
    }
13✔
142

143
    #[allow(clippy::too_many_lines)]
144
    pub async fn try_sync_from_all_peers(
13✔
145
        &mut self,
13✔
146
        max_latency: Duration,
13✔
147
    ) -> Result<(SyncPeer, AttemptSyncResult), BlockHeaderSyncError> {
13✔
148
        let sync_peer_node_ids = self.sync_peers.iter().map(|p| p.node_id()).cloned().collect::<Vec<_>>();
13✔
149
        info!(
13✔
150
            target: LOG_TARGET,
×
151
            "Attempting to sync headers ({} sync peers)",
152
            sync_peer_node_ids.len()
×
153
        );
154
        let mut latency_counter = 0usize;
13✔
155
        for node_id in sync_peer_node_ids {
13✔
156
            match self.connect_and_attempt_sync(&node_id, max_latency).await {
13✔
157
                Ok((peer, sync_result)) => return Ok((peer, sync_result)),
12✔
158
                Err(err) => {
1✔
159
                    let ban_reason = BlockHeaderSyncError::get_ban_reason(&err);
1✔
160
                    if let Some(reason) = ban_reason {
1✔
161
                        warn!(target: LOG_TARGET, "{err}");
1✔
162
                        let duration = match reason.ban_duration {
1✔
163
                            BanPeriod::Short => self.config.short_ban_period,
1✔
164
                            BanPeriod::Long => self.config.ban_period,
×
165
                        };
166
                        self.peer_ban_manager
1✔
167
                            .ban_peer_if_required(&node_id, reason.reason, duration)
1✔
168
                            .await;
1✔
UNCOV
169
                    }
×
170
                    if let BlockHeaderSyncError::MaxLatencyExceeded { .. } = err {
1✔
171
                        latency_counter += 1;
×
172
                    } else {
1✔
173
                        self.remove_sync_peer(&node_id);
1✔
174
                    }
1✔
175
                },
176
            }
177
        }
178

179
        if self.sync_peers.is_empty() {
1✔
180
            Err(BlockHeaderSyncError::NoMoreSyncPeers("Header sync failed".to_string()))
1✔
181
        } else if latency_counter >= self.sync_peers.len() {
×
182
            Err(BlockHeaderSyncError::AllSyncPeersExceedLatency)
×
183
        } else {
184
            Err(BlockHeaderSyncError::SyncFailedAllPeers)
×
185
        }
186
    }
13✔
187

188
    async fn connect_and_attempt_sync(
13✔
189
        &mut self,
13✔
190
        node_id: &NodeId,
13✔
191
        max_latency: Duration,
13✔
192
    ) -> Result<(SyncPeer, AttemptSyncResult), BlockHeaderSyncError> {
13✔
193
        let peer_index = self
13✔
194
            .get_sync_peer_index(node_id)
13✔
195
            .ok_or(BlockHeaderSyncError::PeerNotFound)?;
13✔
196
        let sync_peer = self.sync_peers.get(peer_index).expect("Already checked");
13✔
197
        self.hooks.call_on_starting_hook(sync_peer);
13✔
198

199
        let mut conn = self.dial_sync_peer(node_id).await?;
13✔
200
        debug!(
13✔
201
            target: LOG_TARGET,
×
202
            "Attempting to synchronize headers with `{node_id}`"
203
        );
204

205
        let config = RpcClient::builder()
13✔
206
            .with_deadline(self.config.rpc_deadline)
13✔
207
            .with_deadline_grace_period(Duration::from_secs(5));
13✔
208
        let mut client = conn
13✔
209
            .connect_rpc_using_builder::<rpc::BaseNodeSyncRpcClient>(config)
13✔
210
            .await?;
13✔
211

212
        let latency = client
13✔
213
            .get_last_request_latency()
13✔
214
            .expect("unreachable panic: last request latency must be set after connect");
13✔
215
        self.sync_peers
13✔
216
            .get_mut(peer_index)
13✔
217
            .ok_or(BlockHeaderSyncError::PeerNotFound)?
13✔
218
            .set_latency(latency);
13✔
219
        if latency > max_latency {
13✔
220
            return Err(BlockHeaderSyncError::MaxLatencyExceeded {
×
221
                peer: conn.peer_node_id().clone(),
×
222
                latency,
×
223
                max_latency,
×
224
            });
×
225
        }
13✔
226

227
        debug!(target: LOG_TARGET, "Sync peer latency is {latency:.2?}");
13✔
228
        let sync_peer = self
13✔
229
            .sync_peers
13✔
230
            .get(peer_index)
13✔
231
            .ok_or(BlockHeaderSyncError::PeerNotFound)?
13✔
232
            .clone();
13✔
233
        let sync_result = self.attempt_sync(&sync_peer, client, max_latency).await?;
13✔
234
        Ok((sync_peer, sync_result))
12✔
235
    }
13✔
236

237
    async fn dial_sync_peer(&self, node_id: &NodeId) -> Result<PeerConnection, BlockHeaderSyncError> {
13✔
238
        let timer = Instant::now();
13✔
239
        debug!(target: LOG_TARGET, "Dialing {node_id} sync peer");
13✔
240
        let conn = self.connectivity.dial_peer(node_id.clone()).await?;
13✔
241
        info!(
13✔
242
            target: LOG_TARGET,
×
243
            "Successfully dialed sync peer {} in {:.2?}",
244
            node_id,
245
            timer.elapsed()
×
246
        );
247
        Ok(conn)
13✔
248
    }
13✔
249

250
    async fn attempt_sync(
13✔
251
        &mut self,
13✔
252
        sync_peer: &SyncPeer,
13✔
253
        mut client: rpc::BaseNodeSyncRpcClient,
13✔
254
        max_latency: Duration,
13✔
255
    ) -> Result<AttemptSyncResult, BlockHeaderSyncError> {
13✔
256
        let latency = client.get_last_request_latency();
13✔
257
        debug!(
13✔
258
            target: LOG_TARGET,
×
259
            "Initiating header sync with peer `{}` (sync latency = {}ms)",
260
            sync_peer.node_id(),
×
261
            latency.unwrap_or_default().as_millis()
×
262
        );
263

264
        // Fetch best local data at the beginning of the sync process
265
        let best_block_metadata = self.db.get_chain_metadata().await?;
13✔
266
        let best_header = self.db.fetch_last_chain_header().await?;
13✔
267
        let best_block_header = self
13✔
268
            .db
13✔
269
            .fetch_chain_header(best_block_metadata.best_block_height())
13✔
270
            .await?;
13✔
271
        let best_header_height = best_header.height();
13✔
272
        let best_block_height = best_block_header.height();
13✔
273

274
        if best_header_height < best_block_height || best_block_height < self.local_cached_metadata.best_block_height()
13✔
275
        {
276
            return Err(BlockHeaderSyncError::ChainStorageError(
×
277
                ChainStorageError::CorruptedDatabase("Inconsistent block and header data".to_string()),
×
278
            ));
×
279
        }
13✔
280

281
        // - At this point we may have more (InSyncOrAhead), equal (InSyncOrAhead), or less headers (Lagging) than the
282
        //   peer, but they claimed better POW before we attempted sync.
283
        // - This method will return ban-able errors for certain offenses.
284
        let (header_sync_status, peer_response) = self
13✔
285
            .determine_sync_status(
13✔
286
                sync_peer,
13✔
287
                best_header.clone(),
13✔
288
                best_block_header.clone(),
13✔
289
                self.config.max_reorg_depth_allowed,
13✔
290
                &mut client,
13✔
291
            )
13✔
292
            .await?;
13✔
293

294
        match header_sync_status.clone() {
12✔
295
            HeaderSyncStatus::InSyncOrAhead => {
296
                debug!(
3✔
297
                    target: LOG_TARGET,
×
298
                    "Headers are in sync at height {best_header_height} but tip is {best_block_height}. Proceeding to archival/pruned block sync"
299
                );
300

301
                Ok(AttemptSyncResult {
3✔
302
                    headers_returned: peer_response.peer_headers.len() as u64,
3✔
303
                    peer_fork_hash_index: peer_response.peer_fork_hash_index,
3✔
304
                    header_sync_status,
3✔
305
                })
3✔
306
            },
307
            HeaderSyncStatus::Lagging(split_info) => {
9✔
308
                self.hooks.call_on_progress_header_hooks(
9✔
309
                    split_info
9✔
310
                        .best_block_header
9✔
311
                        .height()
9✔
312
                        .checked_sub(split_info.reorg_steps_back)
9✔
313
                        .unwrap_or_default(),
9✔
314
                    sync_peer.claimed_chain_metadata().best_block_height(),
9✔
315
                    sync_peer,
9✔
316
                );
317
                self.synchronize_headers(sync_peer.clone(), &mut client, *split_info, max_latency)
9✔
318
                    .await?;
9✔
319
                Ok(AttemptSyncResult {
9✔
320
                    headers_returned: peer_response.peer_headers.len() as u64,
9✔
321
                    peer_fork_hash_index: peer_response.peer_fork_hash_index,
9✔
322
                    header_sync_status,
9✔
323
                })
9✔
324
            },
325
        }
326
    }
13✔
327

328
    #[allow(clippy::too_many_lines)]
329
    async fn find_chain_split(
13✔
330
        &mut self,
13✔
331
        peer_node_id: &NodeId,
13✔
332
        max_reorg_depth_allowed: usize,
13✔
333
        client: &mut rpc::BaseNodeSyncRpcClient,
13✔
334
        header_count: u64,
13✔
335
    ) -> Result<FindChainSplitResult, BlockHeaderSyncError> {
13✔
336
        const NUM_CHAIN_SPLIT_HEADERS: usize = 500;
337
        // Limit how far back we're willing to go. A peer might just say it does not have a chain split
338
        // and keep us busy going back until the genesis.
339
        // 20 x 500 = max 10,000 block split can be detected. The 10_000 limit is default, but can be overridden
340
        let max_chain_split_iters = max_reorg_depth_allowed.saturating_div(NUM_CHAIN_SPLIT_HEADERS);
13✔
341

342
        let mut offset = 0;
13✔
343
        let mut iter_count = 0;
13✔
344
        loop {
345
            iter_count += 1;
13✔
346
            if iter_count > max_chain_split_iters {
13✔
347
                warn!(
×
348
                    target: LOG_TARGET,
×
349
                    "Peer `{}` did not provide a chain split after {} headers requested. Peer will be banned.",
350
                    peer_node_id,
351
                    NUM_CHAIN_SPLIT_HEADERS * max_chain_split_iters,
×
352
                );
353
                return Err(BlockHeaderSyncError::ChainSplitNotFound(peer_node_id.clone()));
×
354
            }
13✔
355

356
            let block_hashes = self
13✔
357
                .db
13✔
358
                .fetch_block_hashes_from_header_tip(NUM_CHAIN_SPLIT_HEADERS, offset)
13✔
359
                .await?;
13✔
360
            debug!(
13✔
361
                target: LOG_TARGET,
×
362
                "Determining if chain splits between {} and {} headers back from the tip (peer: `{}`, {} hashes sent)",
363
                offset,
364
                offset + NUM_CHAIN_SPLIT_HEADERS,
×
365
                peer_node_id,
366
                block_hashes.len()
×
367
            );
368

369
            // No further hashes to send.
370
            if block_hashes.is_empty() {
13✔
371
                warn!(
×
372
                    target: LOG_TARGET,
×
373
                    "Peer `{}` did not provide a chain split after {} headers requested. Peer will be banned.",
374
                    peer_node_id,
375
                    NUM_CHAIN_SPLIT_HEADERS * max_chain_split_iters,
×
376
                );
377
                return Err(BlockHeaderSyncError::ChainSplitNotFound(peer_node_id.clone()));
×
378
            }
13✔
379

380
            let request = FindChainSplitRequest {
13✔
381
                block_hashes: block_hashes.clone().iter().map(|v| v.to_vec()).collect(),
56✔
382
                header_count,
13✔
383
            };
384

385
            let resp = match client.find_chain_split(request).await {
13✔
386
                Ok(r) => r,
13✔
387
                Err(RpcError::RequestFailed(err)) if err.as_status_code().is_not_found() => {
×
388
                    // This round we sent less hashes than the max, so the next round will not have any more hashes to
389
                    // send. Exit early in this case.
390
                    if block_hashes.len() < NUM_CHAIN_SPLIT_HEADERS {
×
391
                        warn!(
×
392
                            target: LOG_TARGET,
×
393
                            "Peer `{}` did not provide a chain split after {} headers requested. Peer will be banned.",
394
                            peer_node_id,
395
                            NUM_CHAIN_SPLIT_HEADERS * max_chain_split_iters,
×
396
                        );
397
                        return Err(BlockHeaderSyncError::ChainSplitNotFound(peer_node_id.clone()));
×
398
                    }
×
399
                    // Chain split not found, let's go further back
400
                    offset = NUM_CHAIN_SPLIT_HEADERS * iter_count;
×
401
                    continue;
×
402
                },
403
                Err(err) => {
×
404
                    return Err(err.into());
×
405
                },
406
            };
407
            if resp.headers.len() > HEADER_SYNC_INITIAL_MAX_HEADERS {
13✔
408
                warn!(
×
409
                    target: LOG_TARGET,
×
410
                    "Peer `{}` sent too many headers {}, only requested {}. Peer will be banned.",
411
                    peer_node_id,
412
                    resp.headers.len(),
×
413
                    HEADER_SYNC_INITIAL_MAX_HEADERS,
414
                );
415
                return Err(BlockHeaderSyncError::PeerSentTooManyHeaders(resp.headers.len()));
×
416
            }
13✔
417
            if resp.fork_hash_index >= block_hashes.len() as u64 {
13✔
418
                warn!(
×
419
                    target: LOG_TARGET,
×
420
                    "Peer `{}` sent hash index {} out of range {}. Peer will be banned.",
421
                    peer_node_id,
422
                    resp.fork_hash_index,
423
                    block_hashes.len(),
×
424
                );
425
                return Err(BlockHeaderSyncError::FoundHashIndexOutOfRange(
×
426
                    block_hashes.len() as u64,
×
427
                    resp.fork_hash_index,
×
428
                ));
×
429
            }
13✔
430
            #[allow(clippy::cast_possible_truncation)]
431
            if !resp.headers.is_empty() &&
13✔
432
                *resp.headers.first().expect("Already checked").prev_hash !=
9✔
433
                    *block_hashes
9✔
434
                        .get(resp.fork_hash_index as usize)
9✔
435
                        .expect("Already checked")
9✔
436
            {
437
                warn!(
×
438
                    target: LOG_TARGET,
×
439
                    "Peer `{}` sent hash an invalid protocol response, incorrect fork hash index {}. Peer will be banned.",
440
                    peer_node_id,
441
                    resp.fork_hash_index,
442
                );
443
                return Err(BlockHeaderSyncError::InvalidProtocolResponse(
×
444
                    "Peer sent incorrect fork hash index".into(),
×
445
                ));
×
446
            }
13✔
447
            #[allow(clippy::cast_possible_truncation)]
448
            let chain_split_hash = *block_hashes
13✔
449
                .get(resp.fork_hash_index as usize)
13✔
450
                .expect("Already checked");
13✔
451

452
            return Ok(FindChainSplitResult {
13✔
453
                reorg_steps_back: resp.fork_hash_index.saturating_add(offset as u64),
13✔
454
                peer_headers: resp.headers,
13✔
455
                peer_fork_hash_index: resp.fork_hash_index,
13✔
456
                chain_split_hash,
13✔
457
            });
13✔
458
        }
459
    }
13✔
460

461
    /// Attempt to determine the point at which the remote and local chain diverge, returning the relevant information
462
    /// of the chain split (see [HeaderSyncStatus]).
463
    ///
464
    /// If the local node is behind the remote chain (i.e. `HeaderSyncStatus::Lagging`), the appropriate
465
    /// `ChainSplitInfo` is returned, the header validator is initialized and the preliminary headers are validated.
466
    async fn determine_sync_status(
13✔
467
        &mut self,
13✔
468
        sync_peer: &SyncPeer,
13✔
469
        best_header: ChainHeader,
13✔
470
        best_block_header: ChainHeader,
13✔
471
        max_reorg_depth_allowed: usize,
13✔
472
        client: &mut rpc::BaseNodeSyncRpcClient,
13✔
473
    ) -> Result<(HeaderSyncStatus, FindChainSplitResult), BlockHeaderSyncError> {
13✔
474
        // This method will return ban-able errors for certain offenses.
475
        let chain_split_result = self
13✔
476
            .find_chain_split(
13✔
477
                sync_peer.node_id(),
13✔
478
                max_reorg_depth_allowed,
13✔
479
                client,
13✔
480
                HEADER_SYNC_INITIAL_MAX_HEADERS as u64,
13✔
481
            )
13✔
482
            .await?;
13✔
483
        if chain_split_result.reorg_steps_back > 0 {
13✔
484
            debug!(
4✔
485
                target: LOG_TARGET,
×
486
                "Found chain split {} blocks back, received {} headers from peer `{}`",
487
                chain_split_result.reorg_steps_back,
488
                chain_split_result.peer_headers.len(),
×
489
                sync_peer
490
            );
491
        }
9✔
492

493
        // If the peer returned no new headers, they may still have more blocks than we have, thus have a higher
494
        // accumulated difficulty.
495
        if chain_split_result.peer_headers.is_empty() {
13✔
496
            // Our POW is less than the peer's POW, as verified before the attempted header sync, therefore, if the
497
            // peer did not supply any headers and we know we are behind based on the peer's claimed metadata, then
498
            // we can ban the peer.
499
            if best_header.height() == best_block_header.height() {
4✔
500
                warn!(
1✔
501
                    target: LOG_TARGET,
×
502
                    "Peer `{}` did not provide any headers although they have a better chain and more headers: their \
503
                    difficulty: {}, our difficulty: {}. Peer will be banned.",
504
                    sync_peer.node_id(),
×
505
                    sync_peer.claimed_chain_metadata().accumulated_difficulty(),
×
506
                    best_block_header.accumulated_data().total_accumulated_difficulty,
×
507
                );
508
                return Err(BlockHeaderSyncError::PeerSentInaccurateChainMetadata {
1✔
509
                    claimed: sync_peer.claimed_chain_metadata().accumulated_difficulty(),
1✔
510
                    actual: None,
1✔
511
                    local: best_block_header.accumulated_data().total_accumulated_difficulty,
1✔
512
                });
1✔
513
            }
3✔
514
            debug!(target: LOG_TARGET, "Peer `{}` sent no headers; headers already in sync with peer.", sync_peer.node_id());
3✔
515
            return Ok((HeaderSyncStatus::InSyncOrAhead, chain_split_result));
3✔
516
        }
9✔
517

518
        let headers = chain_split_result
9✔
519
            .peer_headers
9✔
520
            .clone()
9✔
521
            .into_iter()
9✔
522
            .map(BlockHeader::try_from)
9✔
523
            .collect::<Result<Vec<_>, _>>()
9✔
524
            .map_err(BlockHeaderSyncError::ReceivedInvalidHeader)?;
9✔
525
        let num_new_headers = headers.len();
9✔
526
        // Do a cheap check to verify that we do not have these series of headers in the db already - if the 1st one is
527
        // not there most probably the rest are not either - the peer could still have returned old headers later on in
528
        // the list
529
        if self
9✔
530
            .db
9✔
531
            .fetch_header_by_block_hash(headers.first().expect("Already checked").hash())
9✔
532
            .await?
9✔
533
            .is_some()
9✔
534
        {
535
            return Err(BlockHeaderSyncError::ReceivedInvalidHeader(
×
536
                "Header already in database".to_string(),
×
537
            ));
×
538
        };
9✔
539

540
        self.header_validator
9✔
541
            .initialize_state(&chain_split_result.chain_split_hash)
9✔
542
            .await?;
9✔
543
        for header in headers {
32✔
544
            debug!(
32✔
545
                target: LOG_TARGET,
×
546
                "Validating header #{} (Pow: {}) with hash: ({})",
547
                header.height,
548
                header.pow_algo(),
×
549
                header.hash().to_hex(),
×
550
            );
551
            self.header_validator.validate(header).await?;
32✔
552
        }
553

554
        debug!(
9✔
555
            target: LOG_TARGET,
×
556
            "Peer `{}` has submitted {} valid header(s)", sync_peer.node_id(), num_new_headers
×
557
        );
558

559
        let chain_split_info = ChainSplitInfo {
9✔
560
            best_block_header,
9✔
561
            reorg_steps_back: chain_split_result.reorg_steps_back,
9✔
562
            chain_split_hash: chain_split_result.chain_split_hash,
9✔
563
        };
9✔
564
        Ok((
9✔
565
            HeaderSyncStatus::Lagging(Box::new(chain_split_info)),
9✔
566
            chain_split_result,
9✔
567
        ))
9✔
568
    }
13✔
569

570
    async fn rewind_blockchain(&self, split_hash: HashOutput) -> Result<Vec<Arc<ChainBlock>>, BlockHeaderSyncError> {
1✔
571
        debug!(
1✔
572
            target: LOG_TARGET,
×
573
            "Deleting headers that no longer form part of the main chain up until split at {}",
574
            split_hash.to_hex()
×
575
        );
576

577
        let blocks = self.db.rewind_to_hash(split_hash).await?;
1✔
578
        debug!(
1✔
579
            target: LOG_TARGET,
×
580
            "Rewound {} block(s) in preparation for header sync",
581
            blocks.len()
×
582
        );
583
        Ok(blocks)
1✔
584
    }
1✔
585

586
    #[allow(clippy::too_many_lines)]
587
    async fn synchronize_headers(
9✔
588
        &mut self,
9✔
589
        mut sync_peer: SyncPeer,
9✔
590
        client: &mut rpc::BaseNodeSyncRpcClient,
9✔
591
        split_info: ChainSplitInfo,
9✔
592
        max_latency: Duration,
9✔
593
    ) -> Result<(), BlockHeaderSyncError> {
9✔
594
        info!(target: LOG_TARGET, "Starting header sync from peer {sync_peer}");
9✔
595
        const COMMIT_EVERY_N_HEADERS: usize = 1000;
596

597
        let mut has_switched_to_new_chain = false;
9✔
598
        let pending_len = self.header_validator.valid_headers().len();
9✔
599

600
        // Find the hash to start syncing the rest of the headers.
601
        // The expectation cannot fail because there has been at least one valid header returned (checked in
602
        // determine_sync_status)
603
        let (start_header_height, start_header_hash, total_accumulated_difficulty) = self
9✔
604
            .header_validator
9✔
605
            .current_valid_chain_tip_header()
9✔
606
            .map(|h| (h.height(), *h.hash(), h.accumulated_data().total_accumulated_difficulty))
9✔
607
            .expect("synchronize_headers: expected there to be a valid tip header but it was None");
9✔
608

609
        // If we already have a stronger chain at this point, switch over to it.
610
        // just in case we happen to be exactly HEADER_SYNC_INITIAL_MAX_HEADERS headers behind.
611
        let has_better_pow = self.pending_chain_has_higher_pow(&split_info.best_block_header);
9✔
612

613
        if has_better_pow {
9✔
614
            debug!(
9✔
615
                target: LOG_TARGET,
×
616
                "Remote chain from peer {} has higher PoW. Switching",
617
                sync_peer.node_id()
×
618
            );
619
            self.switch_to_pending_chain(&split_info).await?;
9✔
620
            has_switched_to_new_chain = true;
9✔
621
        }
×
622

623
        if pending_len < HEADER_SYNC_INITIAL_MAX_HEADERS {
9✔
624
            // Peer returned less than the max number of requested headers. This indicates that we have all the
625
            // available headers from the peer.
626
            if !has_better_pow {
9✔
627
                // Because the pow is less or equal than the current chain the peer had to have lied about their pow
628
                debug!(target: LOG_TARGET, "No further headers to download");
×
629
                return Err(BlockHeaderSyncError::PeerSentInaccurateChainMetadata {
×
630
                    claimed: sync_peer.claimed_chain_metadata().accumulated_difficulty(),
×
631
                    actual: Some(total_accumulated_difficulty),
×
632
                    local: split_info
×
633
                        .best_block_header
×
634
                        .accumulated_data()
×
635
                        .total_accumulated_difficulty,
×
636
                });
×
637
            }
9✔
638
            // The pow is higher, we swapped to the higher chain, we have all the better chain headers, we can move on
639
            // to block sync.
640
            return Ok(());
9✔
641
        }
×
642

643
        debug!(
×
644
            target: LOG_TARGET,
×
645
            "Download remaining headers starting from header #{} from peer `{}`",
646
            start_header_height,
647
            sync_peer.node_id()
×
648
        );
649
        let request = SyncHeadersRequest {
×
650
            start_hash: start_header_hash.to_vec(),
×
651
            // To the tip!
×
652
            count: 0,
×
653
        };
×
654

655
        let mut header_stream = client.sync_headers(request).await?;
×
656
        debug!(
×
657
            target: LOG_TARGET,
×
658
            "Reading headers from peer `{}`",
659
            sync_peer.node_id()
×
660
        );
661

662
        let mut last_sync_timer = Instant::now();
×
663

664
        let mut last_total_accumulated_difficulty = U512::zero();
×
665
        let mut avg_latency = RollingAverageTime::new(20);
×
666
        let mut prev_height: Option<u64> = None;
×
667
        while let Some(header) = header_stream.next().await {
×
668
            let latency = last_sync_timer.elapsed();
×
669
            avg_latency.add_sample(latency);
×
670
            let header = BlockHeader::try_from(header?).map_err(BlockHeaderSyncError::ReceivedInvalidHeader)?;
×
671
            debug!(
×
672
                target: LOG_TARGET,
×
673
                "Validating header #{} (Pow: {}) with hash: ({}). Latency: {:.2?}",
674
                header.height,
675
                header.pow_algo(),
×
676
                header.hash().to_hex(),
×
677
                latency
678
            );
679
            trace!(
×
680
                target: LOG_TARGET,
×
681
                "{header}"
682
            );
683
            if let Some(prev_header_height) = prev_height {
×
684
                if header.height != prev_header_height.saturating_add(1) {
×
685
                    warn!(
×
686
                        target: LOG_TARGET,
×
687
                        "Received header #{} `{}` does not follow previous header",
688
                        header.height,
689
                        header.hash().to_hex()
×
690
                    );
691
                    return Err(BlockHeaderSyncError::ReceivedInvalidHeader(
×
692
                        "Header does not follow previous header".to_string(),
×
693
                    ));
×
694
                }
×
695
            }
×
696
            let existing_header = self.db.fetch_header_by_block_hash(header.hash()).await?;
×
697
            if let Some(h) = existing_header {
×
698
                warn!(
×
699
                    target: LOG_TARGET,
×
700
                    "Received header #{} `{}` that we already have.",
701
                    h.height,
702
                    h.hash().to_hex()
×
703
                );
704
                return Err(BlockHeaderSyncError::ReceivedInvalidHeader(
×
705
                    "Header already in database".to_string(),
×
706
                ));
×
707
            }
×
708
            let current_height = header.height;
×
709
            last_total_accumulated_difficulty = self.header_validator.validate(header).await?;
×
710

711
            if has_switched_to_new_chain {
×
712
                // If we've switched to the new chain, we simply commit every COMMIT_EVERY_N_HEADERS headers
713
                if self.header_validator.valid_headers().len() >= COMMIT_EVERY_N_HEADERS {
×
714
                    self.commit_pending_headers().await?;
×
715
                }
×
716
            } else {
717
                // The remote chain has not (yet) been accepted.
718
                // We check the tip difficulties, switching over to the new chain if a higher accumulated difficulty is
719
                // achieved.
720
                if self.pending_chain_has_higher_pow(&split_info.best_block_header) {
×
721
                    self.switch_to_pending_chain(&split_info).await?;
×
722
                    has_switched_to_new_chain = true;
×
723
                }
×
724
            }
725

726
            sync_peer.set_latency(latency);
×
727
            sync_peer.add_sample(last_sync_timer.elapsed());
×
728
            self.hooks.call_on_progress_header_hooks(
×
729
                current_height,
×
730
                sync_peer.claimed_chain_metadata().best_block_height(),
×
731
                &sync_peer,
×
732
            );
733

734
            let last_avg_latency = avg_latency.calculate_average_with_min_samples(5);
×
735
            if let Some(avg_latency) = last_avg_latency {
×
736
                if avg_latency > max_latency {
×
737
                    return Err(BlockHeaderSyncError::MaxLatencyExceeded {
×
738
                        peer: sync_peer.node_id().clone(),
×
739
                        latency: avg_latency,
×
740
                        max_latency,
×
741
                    });
×
742
                }
×
743
            }
×
744

745
            last_sync_timer = Instant::now();
×
746
            prev_height = Some(current_height);
×
747
        }
748

749
        let claimed_total_accumulated_diff = sync_peer.claimed_chain_metadata().accumulated_difficulty();
×
750
        if !has_switched_to_new_chain {
×
751
            let best_local_before_sync = split_info
×
752
                .best_block_header
×
753
                .accumulated_data()
×
754
                .total_accumulated_difficulty;
×
755
            match self
×
756
                .header_validator
×
757
                .current_valid_chain_tip_header()
×
758
                .map(|h| h.accumulated_data().total_accumulated_difficulty)
×
759
            {
760
                Some(validated_total_accumulated_diff) => {
×
761
                    if claimed_total_accumulated_diff > validated_total_accumulated_diff {
×
762
                        // Over-claim: peer advertised more PoW than their headers actually provide.
763
                        return Err(BlockHeaderSyncError::PeerSentInaccurateChainMetadata {
×
764
                            claimed: claimed_total_accumulated_diff,
×
765
                            actual: Some(validated_total_accumulated_diff),
×
766
                            local: best_local_before_sync,
×
767
                        });
×
768
                    } else if self.pending_chain_has_higher_pow(&split_info.best_block_header) {
×
769
                        self.switch_to_pending_chain(&split_info).await?;
×
770
                        has_switched_to_new_chain = true;
×
771
                        info!(
×
772
                            target: LOG_TARGET,
×
773
                            "Received PoW from peer exceeds local tip. Before sync: {}, received: {}. Committed.",
774
                            best_local_before_sync,
775
                            validated_total_accumulated_diff,
776
                        );
777
                    } else {
778
                        // We have a stronger chain, so we do not commit the headers.
779
                        debug!(
×
780
                            target: LOG_TARGET,
×
781
                            "Not committing headers as we have a stronger chain, ours: {} theirs: {}",
782
                            best_local_before_sync,
783
                            validated_total_accumulated_diff,
784
                        );
785
                    };
786
                },
787
                None => {
788
                    // No validated headers at this stage, but there should have been.
789
                    return Err(BlockHeaderSyncError::PeerSentInaccurateChainMetadata {
×
790
                        claimed: claimed_total_accumulated_diff,
×
791
                        actual: None,
×
792
                        local: best_local_before_sync,
×
793
                    });
×
794
                },
795
            }
796
        }
×
797

798
        // Commit the last blocks only if we have switched to the new chain.
799
        if has_switched_to_new_chain && !self.header_validator.valid_headers().is_empty() {
×
800
            self.commit_pending_headers().await?;
×
801
        }
×
802

803
        // This rule is strict: if the peer advertised a higher PoW than they were able to provide (without
804
        // some other external factor like a disconnect etc), we detect the and ban the peer.
805
        if last_total_accumulated_difficulty < claimed_total_accumulated_diff {
×
806
            return Err(BlockHeaderSyncError::PeerSentInaccurateChainMetadata {
×
807
                claimed: claimed_total_accumulated_diff,
×
808
                actual: Some(last_total_accumulated_difficulty),
×
809
                local: split_info
×
810
                    .best_block_header
×
811
                    .accumulated_data()
×
812
                    .total_accumulated_difficulty,
×
813
            });
×
814
        }
×
815

816
        Ok(())
×
817
    }
9✔
818

819
    async fn commit_pending_headers(&mut self) -> Result<ChainHeader, BlockHeaderSyncError> {
9✔
820
        let chain_headers = self.header_validator.take_valid_headers();
9✔
821
        let num_headers = chain_headers.len();
9✔
822
        let start = Instant::now();
9✔
823

824
        let new_tip = chain_headers.last().cloned().unwrap();
9✔
825
        let mut txn = self.db.write_transaction();
9✔
826
        chain_headers.into_iter().for_each(|chain_header| {
32✔
827
            txn.insert_chain_header(chain_header);
32✔
828
        });
32✔
829

830
        txn.commit().await?;
9✔
831

832
        debug!(
9✔
833
            target: LOG_TARGET,
×
834
            "{} header(s) committed (tip = {}) to the blockchain db in {:.2?}",
835
            num_headers,
836
            new_tip.height(),
×
837
            start.elapsed()
×
838
        );
839

840
        Ok(new_tip)
9✔
841
    }
9✔
842

843
    fn pending_chain_has_higher_pow(&self, current_tip: &ChainHeader) -> bool {
9✔
844
        let chain_headers = self.header_validator.valid_headers();
9✔
845
        if chain_headers.is_empty() {
9✔
846
            return false;
×
847
        }
9✔
848

849
        // Check that the remote tip is stronger than the local tip, equal should not have ended up here, so we treat
850
        // equal as less
851
        let proposed_tip = chain_headers.last().unwrap();
9✔
852
        self.header_validator.compare_chains(current_tip, proposed_tip).is_lt()
9✔
853
    }
9✔
854

855
    async fn switch_to_pending_chain(&mut self, split_info: &ChainSplitInfo) -> Result<(), BlockHeaderSyncError> {
9✔
856
        // Reorg if required
857
        if split_info.reorg_steps_back > 0 {
9✔
858
            debug!(
1✔
859
                target: LOG_TARGET,
×
860
                "Reorg: Rewinding the chain by {} block(s) (split hash = {})",
861
                split_info.reorg_steps_back,
862
                split_info.chain_split_hash.to_hex()
×
863
            );
864
            let blocks = self.rewind_blockchain(split_info.chain_split_hash).await?;
1✔
865
            if !blocks.is_empty() {
1✔
866
                self.hooks.call_on_rewind_hooks(blocks);
1✔
867
            }
1✔
868
        }
8✔
869

870
        // Commit the forked chain. At this point
871
        // 1. Headers have been validated
872
        // 2. The forked chain has a higher PoW than the local chain
873
        //
874
        // After this we commit headers every `n` blocks
875
        self.commit_pending_headers().await?;
9✔
876

877
        Ok(())
9✔
878
    }
9✔
879

880
    // Sync peers are also removed from the list of sync peers if the ban duration is longer than the short ban period.
881
    fn remove_sync_peer(&mut self, node_id: &NodeId) {
1✔
882
        if let Some(pos) = self.sync_peers.iter().position(|p| p.node_id() == node_id) {
1✔
883
            self.sync_peers.remove(pos);
1✔
884
        }
1✔
885
    }
1✔
886

887
    // Helper function to get the index to the node_id inside of the vec of peers
888
    fn get_sync_peer_index(&mut self, node_id: &NodeId) -> Option<usize> {
13✔
889
        self.sync_peers.iter().position(|p| p.node_id() == node_id)
13✔
890
    }
13✔
891
}
892

893
#[derive(Debug, Clone)]
894
struct FindChainSplitResult {
895
    reorg_steps_back: u64,
896
    peer_headers: Vec<ProtoBlockHeader>,
897
    peer_fork_hash_index: u64,
898
    chain_split_hash: HashOutput,
899
}
900

901
/// Information about the chain split from the remote node.
902
#[derive(Debug, Clone, PartialEq)]
903
pub struct ChainSplitInfo {
904
    /// The best block's header on the local chain.
905
    pub best_block_header: ChainHeader,
906
    /// The number of blocks to reorg back to the fork.
907
    pub reorg_steps_back: u64,
908
    /// The hash of the block at the fork.
909
    pub chain_split_hash: HashOutput,
910
}
911

912
/// The result of an attempt to synchronize headers with a peer.
913
#[derive(Debug, Clone, PartialEq)]
914
pub struct AttemptSyncResult {
915
    /// The number of headers that were returned.
916
    pub headers_returned: u64,
917
    /// The fork hash index of the remote peer.
918
    pub peer_fork_hash_index: u64,
919
    /// The header sync status.
920
    pub header_sync_status: HeaderSyncStatus,
921
}
922

923
#[derive(Debug, Clone, PartialEq)]
924
pub enum HeaderSyncStatus {
925
    /// Local and remote node are in sync or ahead
926
    InSyncOrAhead,
927
    /// Local node is lagging behind remote node
928
    Lagging(Box<ChainSplitInfo>),
929
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc