• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 15120110241

19 May 2025 06:08PM UTC coverage: 73.213% (-0.06%) from 73.269%
15120110241

push

github

web-flow
feat!: add second tari only randomx mining (#7057)

Description
---

Adds in a second randomx algo mining option, only mining tari.

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced distinct support for Monero RandomX and Tari RandomX
proof-of-work algorithms with separate difficulty tracking, hash rate
reporting, and block template caching.
- Added a new VM key field in block results to enhance mining and
validation processes.
- Extended miner configuration and mining logic to support multiple
proof-of-work algorithms including Tari RandomX.

- **Bug Fixes**
- Improved difficulty and hash rate accuracy by separating Monero and
Tari RandomX calculations and metrics.

- **Refactor**
- Renamed and split data structures, enums, protobuf messages, and
methods to differentiate between Monero and Tari RandomX.
- Updated consensus, validation, and chain strength comparison to handle
multiple RandomX variants.
- Migrated accumulated difficulty representations from 256-bit to
512-bit integers for enhanced precision.
- Generalized difficulty window handling to support multiple
proof-of-work algorithms dynamically.

- **Documentation**
- Clarified comments and field descriptions to reflect the distinction
between Monero and Tari RandomX algorithms.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

170 of 371 new or added lines in 26 files covered. (45.82%)

40 existing lines in 12 files now uncovered.

82064 of 112089 relevant lines covered (73.21%)

274996.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

54.37
/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs
1
//  Copyright 2020, The Tari Project
2
//
3
//  Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
//  following conditions are met:
5
//
6
//  1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
//  disclaimer.
8
//
9
//  2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
//  following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
//  3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
//  products derived from this software without specific prior written permission.
14
//
15
//  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
//  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
//  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
//  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
//  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
//  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
//  USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22
use std::{
23
    convert::TryFrom,
24
    sync::Arc,
25
    time::{Duration, Instant},
26
};
27

28
use futures::StreamExt;
29
use log::*;
30
use primitive_types::U512;
31
use tari_common_types::{chain_metadata::ChainMetadata, types::HashOutput};
32
use tari_comms::{
33
    connectivity::ConnectivityRequester,
34
    peer_manager::NodeId,
35
    protocol::rpc::{RpcClient, RpcError},
36
    PeerConnection,
37
};
38
use tari_utilities::hex::Hex;
39

40
use super::{validator::BlockHeaderSyncValidator, BlockHeaderSyncError};
41
use crate::{
42
    base_node::sync::{
43
        ban::PeerBanManager,
44
        header_sync::HEADER_SYNC_INITIAL_MAX_HEADERS,
45
        hooks::Hooks,
46
        rpc,
47
        BlockchainSyncConfig,
48
        SyncPeer,
49
    },
50
    blocks::{BlockHeader, ChainBlock, ChainHeader},
51
    chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend, ChainStorageError},
52
    common::{rolling_avg::RollingAverageTime, BanPeriod},
53
    consensus::ConsensusManager,
54
    proof_of_work::randomx_factory::RandomXFactory,
55
    proto::{
56
        base_node::{FindChainSplitRequest, SyncHeadersRequest},
57
        core::BlockHeader as ProtoBlockHeader,
58
    },
59
};
60

61
const LOG_TARGET: &str = "c::bn::header_sync";
62

63
const MAX_LATENCY_INCREASES: usize = 5;
64

65
pub struct HeaderSynchronizer<'a, B> {
66
    config: BlockchainSyncConfig,
67
    db: AsyncBlockchainDb<B>,
68
    header_validator: BlockHeaderSyncValidator<B>,
69
    connectivity: ConnectivityRequester,
70
    sync_peers: &'a mut Vec<SyncPeer>,
71
    hooks: Hooks,
72
    local_cached_metadata: &'a ChainMetadata,
73
    peer_ban_manager: PeerBanManager,
74
}
75

76
impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {
77
    pub fn new(
16✔
78
        config: BlockchainSyncConfig,
16✔
79
        db: AsyncBlockchainDb<B>,
16✔
80
        consensus_rules: ConsensusManager,
16✔
81
        connectivity: ConnectivityRequester,
16✔
82
        sync_peers: &'a mut Vec<SyncPeer>,
16✔
83
        randomx_factory: RandomXFactory,
16✔
84
        local_metadata: &'a ChainMetadata,
16✔
85
    ) -> Self {
16✔
86
        let peer_ban_manager = PeerBanManager::new(config.clone(), connectivity.clone());
16✔
87
        Self {
16✔
88
            config,
16✔
89
            header_validator: BlockHeaderSyncValidator::new(db.clone(), consensus_rules, randomx_factory),
16✔
90
            db,
16✔
91
            connectivity,
16✔
92
            sync_peers,
16✔
93
            hooks: Default::default(),
16✔
94
            local_cached_metadata: local_metadata,
16✔
95
            peer_ban_manager,
16✔
96
        }
16✔
97
    }
16✔
98

99
    pub fn on_starting<H>(&mut self, hook: H)
16✔
100
    where for<'r> H: FnOnce(&SyncPeer) + Send + Sync + 'static {
16✔
101
        self.hooks.add_on_starting_hook(hook);
16✔
102
    }
16✔
103

104
    pub fn on_progress<H>(&mut self, hook: H)
16✔
105
    where H: Fn(u64, u64, &SyncPeer) + Send + Sync + 'static {
16✔
106
        self.hooks.add_on_progress_header_hook(hook);
16✔
107
    }
16✔
108

109
    pub fn on_rewind<H>(&mut self, hook: H)
16✔
110
    where H: Fn(Vec<Arc<ChainBlock>>) + Send + Sync + 'static {
16✔
111
        self.hooks.add_on_rewind_hook(hook);
16✔
112
    }
16✔
113

114
    pub async fn synchronize(&mut self) -> Result<(SyncPeer, AttemptSyncResult), BlockHeaderSyncError> {
16✔
115
        debug!(target: LOG_TARGET, "Starting header sync.",);
16✔
116

117
        info!(
16✔
118
            target: LOG_TARGET,
×
119
            "Synchronizing headers ({} candidate peers selected)",
×
120
            self.sync_peers.len()
×
121
        );
122
        let mut max_latency = self.config.initial_max_sync_latency;
16✔
123
        let mut latency_increases_counter = 0;
16✔
124
        loop {
125
            match self.try_sync_from_all_peers(max_latency).await {
16✔
126
                Ok((peer, sync_result)) => break Ok((peer, sync_result)),
15✔
127
                Err(err @ BlockHeaderSyncError::AllSyncPeersExceedLatency) => {
×
128
                    // If we have few sync peers, throw this out to be retried later
×
129
                    if self.sync_peers.len() < 2 {
×
130
                        return Err(err);
×
131
                    }
×
132
                    max_latency += self.config.max_latency_increase;
×
133
                    latency_increases_counter += 1;
×
134
                    if latency_increases_counter > MAX_LATENCY_INCREASES {
×
135
                        return Err(err);
×
136
                    }
×
137
                },
138
                Err(err) => break Err(err),
1✔
139
            }
140
        }
141
    }
16✔
142

143
    #[allow(clippy::too_many_lines)]
144
    pub async fn try_sync_from_all_peers(
16✔
145
        &mut self,
16✔
146
        max_latency: Duration,
16✔
147
    ) -> Result<(SyncPeer, AttemptSyncResult), BlockHeaderSyncError> {
16✔
148
        let sync_peer_node_ids = self.sync_peers.iter().map(|p| p.node_id()).cloned().collect::<Vec<_>>();
16✔
149
        info!(
16✔
150
            target: LOG_TARGET,
×
151
            "Attempting to sync headers ({} sync peers)",
×
152
            sync_peer_node_ids.len()
×
153
        );
154
        let mut latency_counter = 0usize;
16✔
155
        for node_id in sync_peer_node_ids {
17✔
156
            match self.connect_and_attempt_sync(&node_id, max_latency).await {
16✔
157
                Ok((peer, sync_result)) => return Ok((peer, sync_result)),
15✔
158
                Err(err) => {
1✔
159
                    let ban_reason = BlockHeaderSyncError::get_ban_reason(&err);
1✔
160
                    if let Some(reason) = ban_reason {
1✔
161
                        warn!(target: LOG_TARGET, "{}", err);
1✔
162
                        let duration = match reason.ban_duration {
1✔
163
                            BanPeriod::Short => self.config.short_ban_period,
×
164
                            BanPeriod::Long => self.config.ban_period,
1✔
165
                        };
166
                        self.peer_ban_manager
1✔
167
                            .ban_peer_if_required(&node_id, reason.reason, duration)
1✔
168
                            .await;
1✔
169
                    }
×
170
                    if let BlockHeaderSyncError::MaxLatencyExceeded { .. } = err {
1✔
171
                        latency_counter += 1;
×
172
                    } else {
1✔
173
                        self.remove_sync_peer(&node_id);
1✔
174
                    }
1✔
175
                },
176
            }
177
        }
178

179
        if self.sync_peers.is_empty() {
1✔
180
            Err(BlockHeaderSyncError::NoMoreSyncPeers("Header sync failed".to_string()))
1✔
181
        } else if latency_counter >= self.sync_peers.len() {
×
182
            Err(BlockHeaderSyncError::AllSyncPeersExceedLatency)
×
183
        } else {
184
            Err(BlockHeaderSyncError::SyncFailedAllPeers)
×
185
        }
186
    }
16✔
187

188
    async fn connect_and_attempt_sync(
16✔
189
        &mut self,
16✔
190
        node_id: &NodeId,
16✔
191
        max_latency: Duration,
16✔
192
    ) -> Result<(SyncPeer, AttemptSyncResult), BlockHeaderSyncError> {
16✔
193
        let peer_index = self
16✔
194
            .get_sync_peer_index(node_id)
16✔
195
            .ok_or(BlockHeaderSyncError::PeerNotFound)?;
16✔
196
        let sync_peer = &self.sync_peers[peer_index];
16✔
197
        self.hooks.call_on_starting_hook(sync_peer);
16✔
198

199
        let mut conn = self.dial_sync_peer(node_id).await?;
16✔
200
        debug!(
16✔
201
            target: LOG_TARGET,
×
202
            "Attempting to synchronize headers with `{}`", node_id
×
203
        );
204

205
        let config = RpcClient::builder()
16✔
206
            .with_deadline(self.config.rpc_deadline)
16✔
207
            .with_deadline_grace_period(Duration::from_secs(5));
16✔
208
        let mut client = conn
16✔
209
            .connect_rpc_using_builder::<rpc::BaseNodeSyncRpcClient>(config)
16✔
210
            .await?;
16✔
211

212
        let latency = client
16✔
213
            .get_last_request_latency()
16✔
214
            .expect("unreachable panic: last request latency must be set after connect");
16✔
215
        self.sync_peers[peer_index].set_latency(latency);
16✔
216
        if latency > max_latency {
16✔
217
            return Err(BlockHeaderSyncError::MaxLatencyExceeded {
×
218
                peer: conn.peer_node_id().clone(),
×
219
                latency,
×
220
                max_latency,
×
221
            });
×
222
        }
16✔
223

16✔
224
        debug!(target: LOG_TARGET, "Sync peer latency is {:.2?}", latency);
16✔
225
        let sync_peer = self.sync_peers[peer_index].clone();
16✔
226
        let sync_result = self.attempt_sync(&sync_peer, client, max_latency).await?;
16✔
227
        Ok((sync_peer, sync_result))
15✔
228
    }
16✔
229

230
    async fn dial_sync_peer(&self, node_id: &NodeId) -> Result<PeerConnection, BlockHeaderSyncError> {
16✔
231
        let timer = Instant::now();
16✔
232
        debug!(target: LOG_TARGET, "Dialing {} sync peer", node_id);
16✔
233
        let conn = self.connectivity.dial_peer(node_id.clone()).await?;
16✔
234
        info!(
16✔
235
            target: LOG_TARGET,
×
236
            "Successfully dialed sync peer {} in {:.2?}",
×
237
            node_id,
×
238
            timer.elapsed()
×
239
        );
240
        Ok(conn)
16✔
241
    }
16✔
242

243
    async fn attempt_sync(
16✔
244
        &mut self,
16✔
245
        sync_peer: &SyncPeer,
16✔
246
        mut client: rpc::BaseNodeSyncRpcClient,
16✔
247
        max_latency: Duration,
16✔
248
    ) -> Result<AttemptSyncResult, BlockHeaderSyncError> {
16✔
249
        let latency = client.get_last_request_latency();
16✔
250
        debug!(
16✔
251
            target: LOG_TARGET,
×
252
            "Initiating header sync with peer `{}` (sync latency = {}ms)",
×
253
            sync_peer.node_id(),
×
254
            latency.unwrap_or_default().as_millis()
×
255
        );
256

257
        // Fetch best local data at the beginning of the sync process
258
        let best_block_metadata = self.db.get_chain_metadata().await?;
16✔
259
        let best_header = self.db.fetch_last_chain_header().await?;
16✔
260
        let best_block_header = self
16✔
261
            .db
16✔
262
            .fetch_chain_header(best_block_metadata.best_block_height())
16✔
263
            .await?;
16✔
264
        let best_header_height = best_header.height();
16✔
265
        let best_block_height = best_block_header.height();
16✔
266

16✔
267
        if best_header_height < best_block_height || best_block_height < self.local_cached_metadata.best_block_height()
16✔
268
        {
269
            return Err(BlockHeaderSyncError::ChainStorageError(
×
270
                ChainStorageError::CorruptedDatabase("Inconsistent block and header data".to_string()),
×
271
            ));
×
272
        }
16✔
273

274
        // - At this point we may have more (InSyncOrAhead), equal (InSyncOrAhead), or less headers (Lagging) than the
275
        //   peer, but they claimed better POW before we attempted sync.
276
        // - This method will return ban-able errors for certain offenses.
277
        let (header_sync_status, peer_response) = self
16✔
278
            .determine_sync_status(sync_peer, best_header.clone(), best_block_header.clone(), &mut client)
16✔
279
            .await?;
16✔
280

281
        match header_sync_status.clone() {
15✔
282
            HeaderSyncStatus::InSyncOrAhead => {
283
                debug!(
3✔
284
                    target: LOG_TARGET,
×
285
                    "Headers are in sync at height {} but tip is {}. Proceeding to archival/pruned block sync",
×
286
                    best_header_height,
287
                    best_block_height
288
                );
289

290
                Ok(AttemptSyncResult {
3✔
291
                    headers_returned: peer_response.peer_headers.len() as u64,
3✔
292
                    peer_fork_hash_index: peer_response.peer_fork_hash_index,
3✔
293
                    header_sync_status,
3✔
294
                })
3✔
295
            },
296
            HeaderSyncStatus::Lagging(split_info) => {
12✔
297
                self.hooks.call_on_progress_header_hooks(
12✔
298
                    split_info
12✔
299
                        .best_block_header
12✔
300
                        .height()
12✔
301
                        .checked_sub(split_info.reorg_steps_back)
12✔
302
                        .unwrap_or_default(),
12✔
303
                    sync_peer.claimed_chain_metadata().best_block_height(),
12✔
304
                    sync_peer,
12✔
305
                );
12✔
306
                self.synchronize_headers(sync_peer.clone(), &mut client, *split_info, max_latency)
12✔
307
                    .await?;
12✔
308
                Ok(AttemptSyncResult {
12✔
309
                    headers_returned: peer_response.peer_headers.len() as u64,
12✔
310
                    peer_fork_hash_index: peer_response.peer_fork_hash_index,
12✔
311
                    header_sync_status,
12✔
312
                })
12✔
313
            },
314
        }
315
    }
16✔
316

317
    #[allow(clippy::too_many_lines)]
318
    async fn find_chain_split(
16✔
319
        &mut self,
16✔
320
        peer_node_id: &NodeId,
16✔
321
        client: &mut rpc::BaseNodeSyncRpcClient,
16✔
322
        header_count: u64,
16✔
323
    ) -> Result<FindChainSplitResult, BlockHeaderSyncError> {
16✔
324
        const NUM_CHAIN_SPLIT_HEADERS: usize = 500;
325
        // Limit how far back we're willing to go. A peer might just say it does not have a chain split
326
        // and keep us busy going back until the genesis.
327
        // 20 x 500 = max 10,000 block split can be detected
328
        const MAX_CHAIN_SPLIT_ITERS: usize = 20;
329

330
        let mut offset = 0;
16✔
331
        let mut iter_count = 0;
16✔
332
        loop {
333
            iter_count += 1;
16✔
334
            if iter_count > MAX_CHAIN_SPLIT_ITERS {
16✔
335
                warn!(
×
336
                    target: LOG_TARGET,
×
337
                    "Peer `{}` did not provide a chain split after {} headers requested. Peer will be banned.",
×
338
                    peer_node_id,
×
339
                    NUM_CHAIN_SPLIT_HEADERS * MAX_CHAIN_SPLIT_ITERS,
×
340
                );
341
                return Err(BlockHeaderSyncError::ChainSplitNotFound(peer_node_id.clone()));
×
342
            }
16✔
343

344
            let block_hashes = self
16✔
345
                .db
16✔
346
                .fetch_block_hashes_from_header_tip(NUM_CHAIN_SPLIT_HEADERS, offset)
16✔
347
                .await?;
16✔
348
            debug!(
16✔
349
                target: LOG_TARGET,
×
350
                "Determining if chain splits between {} and {} headers back from the tip (peer: `{}`, {} hashes sent)",
×
351
                offset,
×
352
                offset + NUM_CHAIN_SPLIT_HEADERS,
×
353
                peer_node_id,
×
354
                block_hashes.len()
×
355
            );
356

357
            // No further hashes to send.
358
            if block_hashes.is_empty() {
16✔
359
                warn!(
×
360
                    target: LOG_TARGET,
×
361
                    "Peer `{}` did not provide a chain split after {} headers requested. Peer will be banned.",
×
362
                    peer_node_id,
×
363
                    NUM_CHAIN_SPLIT_HEADERS * MAX_CHAIN_SPLIT_ITERS,
×
364
                );
365
                return Err(BlockHeaderSyncError::ChainSplitNotFound(peer_node_id.clone()));
×
366
            }
16✔
367

16✔
368
            let request = FindChainSplitRequest {
16✔
369
                block_hashes: block_hashes.clone().iter().map(|v| v.to_vec()).collect(),
59✔
370
                header_count,
16✔
371
            };
16✔
372

373
            let resp = match client.find_chain_split(request).await {
16✔
374
                Ok(r) => r,
16✔
375
                Err(RpcError::RequestFailed(err)) if err.as_status_code().is_not_found() => {
×
376
                    // This round we sent less hashes than the max, so the next round will not have any more hashes to
×
377
                    // send. Exit early in this case.
×
378
                    if block_hashes.len() < NUM_CHAIN_SPLIT_HEADERS {
×
379
                        warn!(
×
380
                            target: LOG_TARGET,
×
381
                            "Peer `{}` did not provide a chain split after {} headers requested. Peer will be banned.",
×
382
                            peer_node_id,
×
383
                            NUM_CHAIN_SPLIT_HEADERS * MAX_CHAIN_SPLIT_ITERS,
×
384
                        );
385
                        return Err(BlockHeaderSyncError::ChainSplitNotFound(peer_node_id.clone()));
×
386
                    }
×
387
                    // Chain split not found, let's go further back
×
388
                    offset = NUM_CHAIN_SPLIT_HEADERS * iter_count;
×
389
                    continue;
×
390
                },
391
                Err(err) => {
×
392
                    return Err(err.into());
×
393
                },
394
            };
395
            if resp.headers.len() > HEADER_SYNC_INITIAL_MAX_HEADERS {
16✔
396
                warn!(
×
397
                    target: LOG_TARGET,
×
398
                    "Peer `{}` sent too many headers {}, only requested {}. Peer will be banned.",
×
399
                    peer_node_id,
×
400
                    resp.headers.len(),
×
401
                    HEADER_SYNC_INITIAL_MAX_HEADERS,
402
                );
403
                return Err(BlockHeaderSyncError::PeerSentTooManyHeaders(resp.headers.len()));
×
404
            }
16✔
405
            if resp.fork_hash_index >= block_hashes.len() as u64 {
16✔
406
                warn!(
×
407
                    target: LOG_TARGET,
×
408
                    "Peer `{}` sent hash index {} out of range {}. Peer will be banned.",
×
409
                    peer_node_id,
×
410
                    resp.fork_hash_index,
×
411
                    block_hashes.len(),
×
412
                );
413
                return Err(BlockHeaderSyncError::FoundHashIndexOutOfRange(
×
414
                    block_hashes.len() as u64,
×
415
                    resp.fork_hash_index,
×
416
                ));
×
417
            }
16✔
418
            #[allow(clippy::cast_possible_truncation)]
16✔
419
            if !resp.headers.is_empty() && resp.headers[0].prev_hash != block_hashes[resp.fork_hash_index as usize] {
16✔
420
                warn!(
×
421
                    target: LOG_TARGET,
×
422
                    "Peer `{}` sent hash an invalid protocol response, incorrect fork hash index {}. Peer will be banned.",
×
423
                    peer_node_id,
424
                    resp.fork_hash_index,
425
                );
426
                return Err(BlockHeaderSyncError::InvalidProtocolResponse(
×
427
                    "Peer sent incorrect fork hash index".into(),
×
428
                ));
×
429
            }
16✔
430
            #[allow(clippy::cast_possible_truncation)]
16✔
431
            let chain_split_hash = block_hashes[resp.fork_hash_index as usize];
16✔
432

16✔
433
            return Ok(FindChainSplitResult {
16✔
434
                reorg_steps_back: resp.fork_hash_index.saturating_add(offset as u64),
16✔
435
                peer_headers: resp.headers,
16✔
436
                peer_fork_hash_index: resp.fork_hash_index,
16✔
437
                chain_split_hash,
16✔
438
            });
16✔
439
        }
440
    }
16✔
441

442
    /// Attempt to determine the point at which the remote and local chain diverge, returning the relevant information
443
    /// of the chain split (see [HeaderSyncStatus]).
444
    ///
445
    /// If the local node is behind the remote chain (i.e. `HeaderSyncStatus::Lagging`), the appropriate
446
    /// `ChainSplitInfo` is returned, the header validator is initialized and the preliminary headers are validated.
447
    async fn determine_sync_status(
16✔
448
        &mut self,
16✔
449
        sync_peer: &SyncPeer,
16✔
450
        best_header: ChainHeader,
16✔
451
        best_block_header: ChainHeader,
16✔
452
        client: &mut rpc::BaseNodeSyncRpcClient,
16✔
453
    ) -> Result<(HeaderSyncStatus, FindChainSplitResult), BlockHeaderSyncError> {
16✔
454
        // This method will return ban-able errors for certain offenses.
455
        let chain_split_result = self
16✔
456
            .find_chain_split(sync_peer.node_id(), client, HEADER_SYNC_INITIAL_MAX_HEADERS as u64)
16✔
457
            .await?;
16✔
458
        if chain_split_result.reorg_steps_back > 0 {
16✔
459
            debug!(
4✔
460
                target: LOG_TARGET,
×
461
                "Found chain split {} blocks back, received {} headers from peer `{}`",
×
462
                chain_split_result.reorg_steps_back,
×
463
                chain_split_result.peer_headers.len(),
×
464
                sync_peer
465
            );
466
        }
12✔
467

468
        // If the peer returned no new headers, they may still have more blocks than we have, thus have a higher
469
        // accumulated difficulty.
470
        if chain_split_result.peer_headers.is_empty() {
16✔
471
            // Our POW is less than the peer's POW, as verified before the attempted header sync, therefore, if the
472
            // peer did not supply any headers and we know we are behind based on the peer's claimed metadata, then
473
            // we can ban the peer.
474
            if best_header.height() == best_block_header.height() {
4✔
475
                warn!(
1✔
476
                    target: LOG_TARGET,
×
477
                    "Peer `{}` did not provide any headers although they have a better chain and more headers: their \
×
478
                    difficulty: {}, our difficulty: {}. Peer will be banned.",
×
479
                    sync_peer.node_id(),
×
480
                    sync_peer.claimed_chain_metadata().accumulated_difficulty(),
×
481
                    best_block_header.accumulated_data().total_accumulated_difficulty,
×
482
                );
483
                return Err(BlockHeaderSyncError::PeerSentInaccurateChainMetadata {
1✔
484
                    claimed: sync_peer.claimed_chain_metadata().accumulated_difficulty(),
1✔
485
                    actual: None,
1✔
486
                    local: best_block_header.accumulated_data().total_accumulated_difficulty,
1✔
487
                });
1✔
488
            }
3✔
489
            debug!(target: LOG_TARGET, "Peer `{}` sent no headers; headers already in sync with peer.", sync_peer.node_id());
3✔
490
            return Ok((HeaderSyncStatus::InSyncOrAhead, chain_split_result));
3✔
491
        }
12✔
492

493
        let headers = chain_split_result
12✔
494
            .peer_headers
12✔
495
            .clone()
12✔
496
            .into_iter()
12✔
497
            .map(BlockHeader::try_from)
12✔
498
            .collect::<Result<Vec<_>, _>>()
12✔
499
            .map_err(BlockHeaderSyncError::ReceivedInvalidHeader)?;
12✔
500
        let num_new_headers = headers.len();
12✔
501
        // Do a cheap check to verify that we do not have these series of headers in the db already - if the 1st one is
12✔
502
        // not there most probably the rest are not either - the peer could still have returned old headers later on in
12✔
503
        // the list
12✔
504
        if self.db.fetch_header_by_block_hash(headers[0].hash()).await?.is_some() {
12✔
505
            return Err(BlockHeaderSyncError::ReceivedInvalidHeader(
×
506
                "Header already in database".to_string(),
×
507
            ));
×
508
        };
12✔
509

12✔
510
        self.header_validator
12✔
511
            .initialize_state(&chain_split_result.chain_split_hash)
12✔
512
            .await?;
12✔
513
        for header in headers {
90✔
514
            debug!(
78✔
515
                target: LOG_TARGET,
×
516
                "Validating header #{} (Pow: {}) with hash: ({})",
×
517
                header.height,
×
518
                header.pow_algo(),
×
519
                header.hash().to_hex(),
×
520
            );
521
            self.header_validator.validate(header).await?;
78✔
522
        }
523

524
        debug!(
12✔
525
            target: LOG_TARGET,
×
526
            "Peer `{}` has submitted {} valid header(s)", sync_peer.node_id(), num_new_headers
×
527
        );
528

529
        let chain_split_info = ChainSplitInfo {
12✔
530
            best_block_header,
12✔
531
            reorg_steps_back: chain_split_result.reorg_steps_back,
12✔
532
            chain_split_hash: chain_split_result.chain_split_hash,
12✔
533
        };
12✔
534
        Ok((
12✔
535
            HeaderSyncStatus::Lagging(Box::new(chain_split_info)),
12✔
536
            chain_split_result,
12✔
537
        ))
12✔
538
    }
16✔
539

540
    async fn rewind_blockchain(&self, split_hash: HashOutput) -> Result<Vec<Arc<ChainBlock>>, BlockHeaderSyncError> {
1✔
541
        debug!(
1✔
542
            target: LOG_TARGET,
×
543
            "Deleting headers that no longer form part of the main chain up until split at {}",
×
544
            split_hash.to_hex()
×
545
        );
546

547
        let blocks = self.db.rewind_to_hash(split_hash).await?;
1✔
548
        debug!(
1✔
549
            target: LOG_TARGET,
×
550
            "Rewound {} block(s) in preparation for header sync",
×
551
            blocks.len()
×
552
        );
553
        Ok(blocks)
1✔
554
    }
1✔
555

556
    #[allow(clippy::too_many_lines)]
557
    async fn synchronize_headers(
12✔
558
        &mut self,
12✔
559
        mut sync_peer: SyncPeer,
12✔
560
        client: &mut rpc::BaseNodeSyncRpcClient,
12✔
561
        split_info: ChainSplitInfo,
12✔
562
        max_latency: Duration,
12✔
563
    ) -> Result<(), BlockHeaderSyncError> {
12✔
564
        info!(target: LOG_TARGET, "Starting header sync from peer {}", sync_peer);
12✔
565
        const COMMIT_EVERY_N_HEADERS: usize = 1000;
566

567
        let mut has_switched_to_new_chain = false;
12✔
568
        let pending_len = self.header_validator.valid_headers().len();
12✔
569

12✔
570
        // Find the hash to start syncing the rest of the headers.
12✔
571
        // The expectation cannot fail because there has been at least one valid header returned (checked in
12✔
572
        // determine_sync_status)
12✔
573
        let (start_header_height, start_header_hash, total_accumulated_difficulty) = self
12✔
574
            .header_validator
12✔
575
            .current_valid_chain_tip_header()
12✔
576
            .map(|h| (h.height(), *h.hash(), h.accumulated_data().total_accumulated_difficulty))
12✔
577
            .expect("synchronize_headers: expected there to be a valid tip header but it was None");
12✔
578

12✔
579
        // If we already have a stronger chain at this point, switch over to it.
12✔
580
        // just in case we happen to be exactly HEADER_SYNC_INITIAL_MAX_HEADERS headers behind.
12✔
581
        let has_better_pow = self.pending_chain_has_higher_pow(&split_info.best_block_header);
12✔
582

12✔
583
        if has_better_pow {
12✔
584
            debug!(
12✔
585
                target: LOG_TARGET,
×
586
                "Remote chain from peer {} has higher PoW. Switching",
×
587
                sync_peer.node_id()
×
588
            );
589
            self.switch_to_pending_chain(&split_info).await?;
12✔
590
            has_switched_to_new_chain = true;
12✔
591
        }
×
592

593
        if pending_len < HEADER_SYNC_INITIAL_MAX_HEADERS {
12✔
594
            // Peer returned less than the max number of requested headers. This indicates that we have all the
595
            // available headers from the peer.
596
            if !has_better_pow {
12✔
597
                // Because the pow is less or equal than the current chain the peer had to have lied about their pow
598
                debug!(target: LOG_TARGET, "No further headers to download");
×
599
                return Err(BlockHeaderSyncError::PeerSentInaccurateChainMetadata {
×
600
                    claimed: sync_peer.claimed_chain_metadata().accumulated_difficulty(),
×
601
                    actual: Some(total_accumulated_difficulty),
×
602
                    local: split_info
×
603
                        .best_block_header
×
604
                        .accumulated_data()
×
605
                        .total_accumulated_difficulty,
×
606
                });
×
607
            }
12✔
608
            // The pow is higher, we swapped to the higher chain, we have all the better chain headers, we can move on
12✔
609
            // to block sync.
12✔
610
            return Ok(());
12✔
611
        }
×
612

×
613
        debug!(
×
614
            target: LOG_TARGET,
×
615
            "Download remaining headers starting from header #{} from peer `{}`",
×
616
            start_header_height,
×
617
            sync_peer.node_id()
×
618
        );
619
        let request = SyncHeadersRequest {
×
620
            start_hash: start_header_hash.to_vec(),
×
621
            // To the tip!
×
622
            count: 0,
×
623
        };
×
624

625
        let mut header_stream = client.sync_headers(request).await?;
×
626
        debug!(
×
627
            target: LOG_TARGET,
×
628
            "Reading headers from peer `{}`",
×
629
            sync_peer.node_id()
×
630
        );
631

632
        let mut last_sync_timer = Instant::now();
×
633

×
NEW
634
        let mut last_total_accumulated_difficulty = U512::zero();
×
635
        let mut avg_latency = RollingAverageTime::new(20);
×
636
        let mut prev_height: Option<u64> = None;
×
637
        while let Some(header) = header_stream.next().await {
×
638
            let latency = last_sync_timer.elapsed();
×
639
            avg_latency.add_sample(latency);
×
640
            let header = BlockHeader::try_from(header?).map_err(BlockHeaderSyncError::ReceivedInvalidHeader)?;
×
641
            debug!(
×
642
                target: LOG_TARGET,
×
643
                "Validating header #{} (Pow: {}) with hash: ({}). Latency: {:.2?}",
×
644
                header.height,
×
645
                header.pow_algo(),
×
646
                header.hash().to_hex(),
×
647
                latency
648
            );
649
            trace!(
×
650
                target: LOG_TARGET,
×
651
                "{}",
×
652
                header
653
            );
654
            if let Some(prev_header_height) = prev_height {
×
655
                if header.height != prev_header_height.saturating_add(1) {
×
656
                    warn!(
×
657
                        target: LOG_TARGET,
×
658
                        "Received header #{} `{}` does not follow previous header",
×
659
                        header.height,
×
660
                        header.hash().to_hex()
×
661
                    );
662
                    return Err(BlockHeaderSyncError::ReceivedInvalidHeader(
×
663
                        "Header does not follow previous header".to_string(),
×
664
                    ));
×
665
                }
×
666
            }
×
667
            let existing_header = self.db.fetch_header_by_block_hash(header.hash()).await?;
×
668
            if let Some(h) = existing_header {
×
669
                warn!(
×
670
                    target: LOG_TARGET,
×
671
                    "Received header #{} `{}` that we already have.",
×
672
                    h.height,
×
673
                    h.hash().to_hex()
×
674
                );
675
                return Err(BlockHeaderSyncError::ReceivedInvalidHeader(
×
676
                    "Header already in database".to_string(),
×
677
                ));
×
678
            }
×
679
            let current_height = header.height;
×
680
            last_total_accumulated_difficulty = self.header_validator.validate(header).await?;
×
681

682
            if has_switched_to_new_chain {
×
683
                // If we've switched to the new chain, we simply commit every COMMIT_EVERY_N_HEADERS headers
684
                if self.header_validator.valid_headers().len() >= COMMIT_EVERY_N_HEADERS {
×
685
                    self.commit_pending_headers().await?;
×
686
                }
×
687
            } else {
688
                // The remote chain has not (yet) been accepted.
689
                // We check the tip difficulties, switching over to the new chain if a higher accumulated difficulty is
690
                // achieved.
691
                if self.pending_chain_has_higher_pow(&split_info.best_block_header) {
×
692
                    self.switch_to_pending_chain(&split_info).await?;
×
693
                    has_switched_to_new_chain = true;
×
694
                }
×
695
            }
696

697
            sync_peer.set_latency(latency);
×
698
            sync_peer.add_sample(last_sync_timer.elapsed());
×
699
            self.hooks.call_on_progress_header_hooks(
×
700
                current_height,
×
701
                sync_peer.claimed_chain_metadata().best_block_height(),
×
702
                &sync_peer,
×
703
            );
×
704

×
705
            let last_avg_latency = avg_latency.calculate_average_with_min_samples(5);
×
706
            if let Some(avg_latency) = last_avg_latency {
×
707
                if avg_latency > max_latency {
×
708
                    return Err(BlockHeaderSyncError::MaxLatencyExceeded {
×
709
                        peer: sync_peer.node_id().clone(),
×
710
                        latency: avg_latency,
×
711
                        max_latency,
×
712
                    });
×
713
                }
×
714
            }
×
715

716
            last_sync_timer = Instant::now();
×
717
            prev_height = Some(current_height);
×
718
        }
719

720
        if !has_switched_to_new_chain {
×
721
            if sync_peer.claimed_chain_metadata().accumulated_difficulty() <
×
722
                self.header_validator
×
723
                    .current_valid_chain_tip_header()
×
724
                    .map(|h| h.accumulated_data().total_accumulated_difficulty)
×
725
                    .unwrap_or_default()
×
726
            {
727
                // We should only return this error if the peer sent a PoW less than they advertised.
728
                return Err(BlockHeaderSyncError::PeerSentInaccurateChainMetadata {
×
729
                    claimed: sync_peer.claimed_chain_metadata().accumulated_difficulty(),
×
730
                    actual: self
×
731
                        .header_validator
×
732
                        .current_valid_chain_tip_header()
×
733
                        .map(|h| h.accumulated_data().total_accumulated_difficulty),
×
734
                    local: split_info
×
735
                        .best_block_header
×
736
                        .accumulated_data()
×
737
                        .total_accumulated_difficulty,
×
738
                });
×
739
            } else {
740
                warn!(
×
741
                    target: LOG_TARGET,
×
742
                    "Received pow from peer matches claimed, difficulty #{} but local is higher: ({}) and we have not \
×
743
                     swapped. Ignoring",
×
744
                    sync_peer.claimed_chain_metadata().accumulated_difficulty(),
×
745
                    split_info
×
746
                        .best_block_header
×
747
                        .accumulated_data()
×
748
                        .total_accumulated_difficulty
749
                );
750
                return Ok(());
×
751
            }
752
        }
×
753

×
754
        // Commit the last blocks that don't fit into the COMMIT_EVENT_N_HEADERS blocks
×
755
        if !self.header_validator.valid_headers().is_empty() {
×
756
            self.commit_pending_headers().await?;
×
757
        }
×
758

759
        let claimed_total_accumulated_diff = sync_peer.claimed_chain_metadata().accumulated_difficulty();
×
760
        // This rule is strict: if the peer advertised a higher PoW than they were able to provide (without
×
761
        // some other external factor like a disconnect etc), we detect the and ban the peer.
×
762
        if last_total_accumulated_difficulty < claimed_total_accumulated_diff {
×
763
            return Err(BlockHeaderSyncError::PeerSentInaccurateChainMetadata {
×
764
                claimed: claimed_total_accumulated_diff,
×
765
                actual: Some(last_total_accumulated_difficulty),
×
766
                local: split_info
×
767
                    .best_block_header
×
768
                    .accumulated_data()
×
769
                    .total_accumulated_difficulty,
×
770
            });
×
771
        }
×
772

×
773
        Ok(())
×
774
    }
12✔
775

776
    async fn commit_pending_headers(&mut self) -> Result<ChainHeader, BlockHeaderSyncError> {
12✔
777
        let chain_headers = self.header_validator.take_valid_headers();
12✔
778
        let num_headers = chain_headers.len();
12✔
779
        let start = Instant::now();
12✔
780

12✔
781
        let new_tip = chain_headers.last().cloned().unwrap();
12✔
782
        let mut txn = self.db.write_transaction();
12✔
783
        chain_headers.into_iter().for_each(|chain_header| {
78✔
784
            txn.insert_chain_header(chain_header);
78✔
785
        });
78✔
786

12✔
787
        txn.commit().await?;
12✔
788

789
        debug!(
12✔
790
            target: LOG_TARGET,
×
791
            "{} header(s) committed (tip = {}) to the blockchain db in {:.2?}",
×
792
            num_headers,
×
793
            new_tip.height(),
×
794
            start.elapsed()
×
795
        );
796

797
        Ok(new_tip)
12✔
798
    }
12✔
799

800
    fn pending_chain_has_higher_pow(&self, current_tip: &ChainHeader) -> bool {
12✔
801
        let chain_headers = self.header_validator.valid_headers();
12✔
802
        if chain_headers.is_empty() {
12✔
803
            return false;
×
804
        }
12✔
805

12✔
806
        // Check that the remote tip is stronger than the local tip, equal should not have ended up here, so we treat
12✔
807
        // equal as less
12✔
808
        let proposed_tip = chain_headers.last().unwrap();
12✔
809
        self.header_validator.compare_chains(current_tip, proposed_tip).is_lt()
12✔
810
    }
12✔
811

812
    async fn switch_to_pending_chain(&mut self, split_info: &ChainSplitInfo) -> Result<(), BlockHeaderSyncError> {
12✔
813
        // Reorg if required
12✔
814
        if split_info.reorg_steps_back > 0 {
12✔
815
            debug!(
1✔
816
                target: LOG_TARGET,
×
817
                "Reorg: Rewinding the chain by {} block(s) (split hash = {})",
×
818
                split_info.reorg_steps_back,
×
819
                split_info.chain_split_hash.to_hex()
×
820
            );
821
            let blocks = self.rewind_blockchain(split_info.chain_split_hash).await?;
1✔
822
            if !blocks.is_empty() {
1✔
823
                self.hooks.call_on_rewind_hooks(blocks);
1✔
824
            }
1✔
825
        }
11✔
826

827
        // Commit the forked chain. At this point
828
        // 1. Headers have been validated
829
        // 2. The forked chain has a higher PoW than the local chain
830
        //
831
        // After this we commit headers every `n` blocks
832
        self.commit_pending_headers().await?;
12✔
833

834
        Ok(())
12✔
835
    }
12✔
836

837
    // Sync peers are also removed from the list of sync peers if the ban duration is longer than the short ban period.
838
    fn remove_sync_peer(&mut self, node_id: &NodeId) {
1✔
839
        if let Some(pos) = self.sync_peers.iter().position(|p| p.node_id() == node_id) {
1✔
840
            self.sync_peers.remove(pos);
1✔
841
        }
1✔
842
    }
1✔
843

844
    // Helper function to get the index to the node_id inside of the vec of peers
845
    fn get_sync_peer_index(&mut self, node_id: &NodeId) -> Option<usize> {
16✔
846
        self.sync_peers.iter().position(|p| p.node_id() == node_id)
16✔
847
    }
16✔
848
}
849

850
#[derive(Debug, Clone)]
851
struct FindChainSplitResult {
852
    reorg_steps_back: u64,
853
    peer_headers: Vec<ProtoBlockHeader>,
854
    peer_fork_hash_index: u64,
855
    chain_split_hash: HashOutput,
856
}
857

858
/// Information about the chain split from the remote node.
859
#[derive(Debug, Clone, PartialEq)]
860
pub struct ChainSplitInfo {
861
    /// The best block's header on the local chain.
862
    pub best_block_header: ChainHeader,
863
    /// The number of blocks to reorg back to the fork.
864
    pub reorg_steps_back: u64,
865
    /// The hash of the block at the fork.
866
    pub chain_split_hash: HashOutput,
867
}
868

869
/// The result of an attempt to synchronize headers with a peer.
870
#[derive(Debug, Clone, PartialEq)]
871
pub struct AttemptSyncResult {
872
    /// The number of headers that were returned.
873
    pub headers_returned: u64,
874
    /// The fork hash index of the remote peer.
875
    pub peer_fork_hash_index: u64,
876
    /// The header sync status.
877
    pub header_sync_status: HeaderSyncStatus,
878
}
879

880
#[derive(Debug, Clone, PartialEq)]
881
pub enum HeaderSyncStatus {
882
    /// Local and remote node are in sync or ahead
883
    InSyncOrAhead,
884
    /// Local node is lagging behind remote node
885
    Lagging(Box<ChainSplitInfo>),
886
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc