• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 26223687530

21 May 2026 11:40AM UTC coverage: 60.422% (-0.02%) from 60.442%
26223687530

push

github

web-flow
feat(xmrig-proxy): add getinfo and getheight methods to the node's integrated xmrig_proxy (#7827)

Description
---
Some third-party applications request getinfo and/or getheight

This PR adds dedicated handlers for getinfo and getheight while
preserving compatibility with clients which may be calling get_height or
getblockcount

Modifications:
- Implemented handle_get_height_hash() and handle_get_info() with
explicit routing
- Extracted shared chain tip into get_chain_tip() for use in all
handlers
- Registered corresponding GET paths (/getheight, /getinfo, /get_info)
- Added Cucumber integration tests validating all endpoints and dynamic
height updates

These additions improve miner interoperability without altering the
existing proxy functionality. Redundant node service calls are
consolidated

Motivation and Context
---
Mining and mining pool software often expects a get_info/getinfo
endpoint and may use getheight instead of get_height. By giving
getheight its own handler rather than merging it with get_height, this
PR:

- Preserves compatibility — existing getblockcount/get_height clients
see no change
- Provides enriched data to getheight callers (block hash alongside
height)
- Adds compatible get_info and getinfo for applications expecting them

How Has This Been Tested?
---
Cucumber Test Coverage: Test Scenarios (6 total)
 **| Scenario                     | Method | Endpoint | Validation**

1 | JSON-RPC getheight | POST | getheight | Response contains height,
id, status
2 | JSON-RPC getinfo | POST | getinfo | Response contains height, id,
status, credits
3 | GET /getheight | GET | /getheight | Response contains height, id,
status
4 | GET /getinfo | GET | /getinfo | Response contains height, id,
status, credits
5 | JSON-RPC getheight reflects mined blocks | POST + mining | getheight
| Height matches base node after mining 3 blocks
6 | GET /getheight reflects mined blocks | GET + mining | /getheight |
Height matches base node after mining 3 blocks... (continued)

70473 of 116635 relevant lines covered (60.42%)

222750.16 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.34
/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs
1
//  Copyright 2020, The Tari Project
2
//
3
//  Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
//  following conditions are met:
5
//
6
//  1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
//  disclaimer.
8
//
9
//  2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
//  following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
//  3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
//  products derived from this software without specific prior written permission.
14
//
15
//  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
//  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
//  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
//  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
//  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
//  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
//  USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22
use std::{
23
    sync::Arc,
24
    time::{Duration, Instant},
25
};
26

27
use futures::StreamExt;
28
use log::*;
29
use primitive_types::U512;
30
use tari_common_types::{chain_metadata::ChainMetadata, types::HashOutput};
31
use tari_comms::{
32
    PeerConnection,
33
    connectivity::ConnectivityRequester,
34
    peer_manager::NodeId,
35
    protocol::rpc::{RpcClient, RpcError},
36
};
37
use tari_node_components::blocks::{BlockHeader, ChainBlock, ChainHeader};
38
use tari_transaction_components::BanPeriod;
39
use tari_utilities::hex::Hex;
40

41
pub(crate) use super::{BlockHeaderSyncError, validator::BlockHeaderSyncValidator};
42
use crate::{
43
    base_node::sync::{
44
        BlockchainSyncConfig,
45
        SyncPeer,
46
        ban::PeerBanManager,
47
        header_sync::HEADER_SYNC_INITIAL_MAX_HEADERS,
48
        hooks::Hooks,
49
        rpc,
50
    },
51
    chain_storage::{BlockchainBackend, ChainStorageError, async_db::AsyncBlockchainDb},
52
    common::rolling_avg::RollingAverageTime,
53
    consensus::BaseNodeConsensusManager,
54
    proof_of_work::randomx_factory::RandomXFactory,
55
    proto::{
56
        base_node::{FindChainSplitRequest, SyncHeadersRequest},
57
        core::BlockHeader as ProtoBlockHeader,
58
    },
59
};
60

61
const LOG_TARGET: &str = "c::bn::header_sync";
62

63
const MAX_LATENCY_INCREASES: usize = 5;
64

65
pub struct HeaderSynchronizer<'a, B> {
66
    config: BlockchainSyncConfig,
67
    db: AsyncBlockchainDb<B>,
68
    header_validator: BlockHeaderSyncValidator<B>,
69
    connectivity: ConnectivityRequester,
70
    sync_peers: &'a mut Vec<SyncPeer>,
71
    hooks: Hooks,
72
    local_cached_metadata: &'a ChainMetadata,
73
    peer_ban_manager: PeerBanManager,
74
}
75

76
impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {
77
    pub fn new(
13✔
78
        config: BlockchainSyncConfig,
13✔
79
        db: AsyncBlockchainDb<B>,
13✔
80
        consensus_rules: BaseNodeConsensusManager,
13✔
81
        connectivity: ConnectivityRequester,
13✔
82
        sync_peers: &'a mut Vec<SyncPeer>,
13✔
83
        randomx_factory: RandomXFactory,
13✔
84
        local_metadata: &'a ChainMetadata,
13✔
85
    ) -> Self {
13✔
86
        let peer_ban_manager = PeerBanManager::new(config.clone(), connectivity.clone());
13✔
87
        Self {
13✔
88
            config,
13✔
89
            header_validator: BlockHeaderSyncValidator::new(db.clone(), consensus_rules, randomx_factory),
13✔
90
            db,
13✔
91
            connectivity,
13✔
92
            sync_peers,
13✔
93
            hooks: Default::default(),
13✔
94
            local_cached_metadata: local_metadata,
13✔
95
            peer_ban_manager,
13✔
96
        }
13✔
97
    }
13✔
98

99
    pub fn on_starting<H>(&mut self, hook: H)
13✔
100
    where for<'r> H: FnOnce(&SyncPeer) + Send + Sync + 'static {
13✔
101
        self.hooks.add_on_starting_hook(hook);
13✔
102
    }
13✔
103

104
    pub fn on_progress<H>(&mut self, hook: H)
13✔
105
    where H: Fn(u64, u64, &SyncPeer) + Send + Sync + 'static {
13✔
106
        self.hooks.add_on_progress_header_hook(hook);
13✔
107
    }
13✔
108

109
    pub fn on_rewind<H>(&mut self, hook: H)
13✔
110
    where H: Fn(Vec<Arc<ChainBlock>>) + Send + Sync + 'static {
13✔
111
        self.hooks.add_on_rewind_hook(hook);
13✔
112
    }
13✔
113

114
    pub async fn synchronize(&mut self) -> Result<(SyncPeer, AttemptSyncResult), BlockHeaderSyncError> {
13✔
115
        debug!(target: LOG_TARGET, "Starting header sync.",);
13✔
116

117
        info!(
13✔
118
            target: LOG_TARGET,
×
119
            "Synchronizing headers ({} candidate peers selected)",
120
            self.sync_peers.len()
×
121
        );
122

123
        // Hold `Arc<NodeId>` sync-list handles for each candidate peer. While these guards are
124
        // alive the connectivity manager marks the peers as "in use by sync", which prevents
125
        // opportunistic disconnects (e.g. DhtConnectivity random-pool pruning). The guards are
126
        // dropped automatically when this function returns.
127
        let mut _sync_guards: Vec<Arc<NodeId>> = Vec::with_capacity(self.sync_peers.len());
13✔
128
        for peer in self.sync_peers.iter() {
13✔
129
            match self.connectivity.add_peer_to_sync_list(peer.node_id().clone()).await {
13✔
130
                Ok(handle) => _sync_guards.push(handle),
13✔
131
                Err(e) => debug!(
×
132
                    target: LOG_TARGET,
×
133
                    "Failed to register sync peer {} on sync list: {e}", peer.node_id()
×
134
                ),
135
            }
136
        }
137

138
        self.synchronize_inner().await
13✔
139
    }
13✔
140

141
    async fn synchronize_inner(&mut self) -> Result<(SyncPeer, AttemptSyncResult), BlockHeaderSyncError> {
13✔
142
        let mut max_latency = self.config.initial_max_sync_latency;
13✔
143
        let mut latency_increases_counter = 0;
13✔
144
        loop {
145
            match self.try_sync_from_all_peers(max_latency).await {
13✔
146
                Ok((peer, sync_result)) => break Ok((peer, sync_result)),
12✔
147
                Err(err @ BlockHeaderSyncError::AllSyncPeersExceedLatency) => {
×
148
                    // If we have few sync peers, throw this out to be retried later
149
                    if self.sync_peers.len() < 2 {
×
150
                        return Err(err);
×
151
                    }
×
152
                    max_latency += self.config.max_latency_increase;
×
153
                    latency_increases_counter += 1;
×
154
                    if latency_increases_counter > MAX_LATENCY_INCREASES {
×
155
                        return Err(err);
×
156
                    }
×
157
                },
158
                Err(err) => break Err(err),
1✔
159
            }
160
        }
161
    }
13✔
162

163
    #[allow(clippy::too_many_lines)]
164
    pub async fn try_sync_from_all_peers(
13✔
165
        &mut self,
13✔
166
        max_latency: Duration,
13✔
167
    ) -> Result<(SyncPeer, AttemptSyncResult), BlockHeaderSyncError> {
13✔
168
        let sync_peer_node_ids = self.sync_peers.iter().map(|p| p.node_id()).cloned().collect::<Vec<_>>();
13✔
169
        info!(
13✔
170
            target: LOG_TARGET,
×
171
            "Attempting to sync headers ({} sync peers)",
172
            sync_peer_node_ids.len()
×
173
        );
174
        let mut latency_counter = 0usize;
13✔
175
        for node_id in sync_peer_node_ids {
13✔
176
            match self.connect_and_attempt_sync(&node_id, max_latency).await {
13✔
177
                Ok((peer, sync_result)) => return Ok((peer, sync_result)),
12✔
178
                Err(err) => {
1✔
179
                    let ban_reason = BlockHeaderSyncError::get_ban_reason(&err);
1✔
180
                    if let Some(reason) = ban_reason {
1✔
181
                        warn!(target: LOG_TARGET, "{err}");
1✔
182
                        let duration = match reason.ban_duration {
1✔
183
                            BanPeriod::Short => self.config.short_ban_period,
1✔
184
                            BanPeriod::Long => self.config.ban_period,
×
185
                        };
186
                        self.peer_ban_manager
1✔
187
                            .ban_peer_if_required(&node_id, reason.reason, duration)
1✔
188
                            .await;
1✔
189
                    }
×
190
                    if let BlockHeaderSyncError::MaxLatencyExceeded { .. } = err {
1✔
191
                        latency_counter += 1;
×
192
                    } else {
1✔
193
                        self.remove_sync_peer(&node_id);
1✔
194
                    }
1✔
195
                },
196
            }
197
        }
198

199
        if self.sync_peers.is_empty() {
1✔
200
            Err(BlockHeaderSyncError::NoMoreSyncPeers("Header sync failed".to_string()))
1✔
201
        } else if latency_counter >= self.sync_peers.len() {
×
202
            Err(BlockHeaderSyncError::AllSyncPeersExceedLatency)
×
203
        } else {
204
            Err(BlockHeaderSyncError::SyncFailedAllPeers)
×
205
        }
206
    }
13✔
207

208
    async fn connect_and_attempt_sync(
13✔
209
        &mut self,
13✔
210
        node_id: &NodeId,
13✔
211
        max_latency: Duration,
13✔
212
    ) -> Result<(SyncPeer, AttemptSyncResult), BlockHeaderSyncError> {
13✔
213
        let peer_index = self
13✔
214
            .get_sync_peer_index(node_id)
13✔
215
            .ok_or(BlockHeaderSyncError::PeerNotFound)?;
13✔
216
        let sync_peer = self.sync_peers.get(peer_index).expect("Already checked");
13✔
217
        self.hooks.call_on_starting_hook(sync_peer);
13✔
218

219
        let mut conn = self.dial_sync_peer(node_id).await?;
13✔
220
        debug!(
13✔
221
            target: LOG_TARGET,
×
222
            "Attempting to synchronize headers with `{node_id}`"
223
        );
224
        // Defensive: the connection may have been torn down by another subsystem between
225
        // dial returning and this point (e.g. DhtConnectivity pruning). This is not the peer's
226
        // fault, so use NotInSync (no ban) to trigger a skip-and-retry with the next peer.
227
        if !conn.is_connected() {
13✔
228
            warn!(
×
229
                target: LOG_TARGET,
×
230
                "Sync peer `{node_id}` was disconnected before RPC negotiation could begin"
231
            );
232
            return Err(BlockHeaderSyncError::NotInSync);
×
233
        }
13✔
234

235
        let config = RpcClient::builder()
13✔
236
            .with_deadline(self.config.rpc_deadline)
13✔
237
            .with_deadline_grace_period(Duration::from_secs(5));
13✔
238
        // Bound RPC negotiation so a stuck negotiation cannot wedge the sync loop.
239
        let mut client = tokio::time::timeout(
13✔
240
            self.config.rpc_deadline,
13✔
241
            conn.connect_rpc_using_builder::<rpc::BaseNodeSyncRpcClient>(config),
13✔
242
        )
13✔
243
        .await
13✔
244
        .map_err(|_| BlockHeaderSyncError::RpcError(RpcError::ReplyTimeout))??;
13✔
245

246
        let latency = client
13✔
247
            .get_last_request_latency()
13✔
248
            .expect("unreachable panic: last request latency must be set after connect");
13✔
249
        self.sync_peers
13✔
250
            .get_mut(peer_index)
13✔
251
            .ok_or(BlockHeaderSyncError::PeerNotFound)?
13✔
252
            .set_latency(latency);
13✔
253
        if latency > max_latency {
13✔
254
            return Err(BlockHeaderSyncError::MaxLatencyExceeded {
×
255
                peer: conn.peer_node_id().clone(),
×
256
                latency,
×
257
                max_latency,
×
258
            });
×
259
        }
13✔
260

261
        debug!(target: LOG_TARGET, "Sync peer latency is {latency:.2?}");
13✔
262
        let sync_peer = self
13✔
263
            .sync_peers
13✔
264
            .get(peer_index)
13✔
265
            .ok_or(BlockHeaderSyncError::PeerNotFound)?
13✔
266
            .clone();
13✔
267
        let sync_result = self.attempt_sync(&sync_peer, client, max_latency).await?;
13✔
268
        Ok((sync_peer, sync_result))
12✔
269
    }
13✔
270

271
    async fn dial_sync_peer(&self, node_id: &NodeId) -> Result<PeerConnection, BlockHeaderSyncError> {
13✔
272
        let timer = Instant::now();
13✔
273
        debug!(target: LOG_TARGET, "Dialing {node_id} sync peer");
13✔
274
        let conn = self.connectivity.dial_peer(node_id.clone()).await?;
13✔
275
        info!(
13✔
276
            target: LOG_TARGET,
×
277
            "Successfully dialed sync peer {} in {:.2?}",
278
            node_id,
279
            timer.elapsed()
×
280
        );
281
        Ok(conn)
13✔
282
    }
13✔
283

284
    async fn attempt_sync(
13✔
285
        &mut self,
13✔
286
        sync_peer: &SyncPeer,
13✔
287
        mut client: rpc::BaseNodeSyncRpcClient,
13✔
288
        max_latency: Duration,
13✔
289
    ) -> Result<AttemptSyncResult, BlockHeaderSyncError> {
13✔
290
        let latency = client.get_last_request_latency();
13✔
291
        debug!(
13✔
292
            target: LOG_TARGET,
×
293
            "Initiating header sync with peer `{}` (sync latency = {}ms)",
294
            sync_peer.node_id(),
×
295
            latency.unwrap_or_default().as_millis()
×
296
        );
297

298
        // Fetch best local data at the beginning of the sync process
299
        let best_block_metadata = self.db.get_chain_metadata().await?;
13✔
300
        let best_header = self.db.fetch_last_chain_header().await?;
13✔
301
        let best_block_header = self
13✔
302
            .db
13✔
303
            .fetch_chain_header(best_block_metadata.best_block_height())
13✔
304
            .await?;
13✔
305
        let best_header_height = best_header.height();
13✔
306
        let best_block_height = best_block_header.height();
13✔
307

308
        if best_header_height < best_block_height || best_block_height < self.local_cached_metadata.best_block_height()
13✔
309
        {
310
            return Err(BlockHeaderSyncError::ChainStorageError(
×
311
                ChainStorageError::CorruptedDatabase("Inconsistent block and header data".to_string()),
×
312
            ));
×
313
        }
13✔
314

315
        // - At this point we may have more (InSyncOrAhead), equal (InSyncOrAhead), or less headers (Lagging) than the
316
        //   peer, but they claimed better POW before we attempted sync.
317
        // - This method will return ban-able errors for certain offenses.
318
        let (header_sync_status, peer_response) = self
13✔
319
            .determine_sync_status(
13✔
320
                sync_peer,
13✔
321
                best_header.clone(),
13✔
322
                best_block_header.clone(),
13✔
323
                self.config.max_reorg_depth_allowed,
13✔
324
                &mut client,
13✔
325
            )
13✔
326
            .await?;
13✔
327

328
        match header_sync_status.clone() {
12✔
329
            HeaderSyncStatus::InSyncOrAhead => {
330
                debug!(
3✔
331
                    target: LOG_TARGET,
×
332
                    "Headers are in sync at height {best_header_height} but tip is {best_block_height}. Proceeding to archival/pruned block sync"
333
                );
334

335
                Ok(AttemptSyncResult {
3✔
336
                    headers_returned: peer_response.peer_headers.len() as u64,
3✔
337
                    peer_fork_hash_index: peer_response.peer_fork_hash_index,
3✔
338
                    header_sync_status,
3✔
339
                })
3✔
340
            },
341
            HeaderSyncStatus::Lagging(split_info) => {
9✔
342
                self.hooks.call_on_progress_header_hooks(
9✔
343
                    split_info
9✔
344
                        .best_block_header
9✔
345
                        .height()
9✔
346
                        .saturating_sub(split_info.reorg_steps_back),
9✔
347
                    sync_peer.claimed_chain_metadata().best_block_height(),
9✔
348
                    sync_peer,
9✔
349
                );
350
                self.synchronize_headers(sync_peer.clone(), &mut client, *split_info, max_latency)
9✔
351
                    .await?;
9✔
352
                Ok(AttemptSyncResult {
9✔
353
                    headers_returned: peer_response.peer_headers.len() as u64,
9✔
354
                    peer_fork_hash_index: peer_response.peer_fork_hash_index,
9✔
355
                    header_sync_status,
9✔
356
                })
9✔
357
            },
358
        }
359
    }
13✔
360

361
    #[allow(clippy::too_many_lines)]
362
    async fn find_chain_split(
13✔
363
        &mut self,
13✔
364
        peer_node_id: &NodeId,
13✔
365
        max_reorg_depth_allowed: usize,
13✔
366
        client: &mut rpc::BaseNodeSyncRpcClient,
13✔
367
        header_count: u64,
13✔
368
    ) -> Result<FindChainSplitResult, BlockHeaderSyncError> {
13✔
369
        const NUM_CHAIN_SPLIT_HEADERS: usize = 500;
370
        // Limit how far back we're willing to go. A peer might just say it does not have a chain split
371
        // and keep us busy going back until the genesis.
372
        // 20 x 500 = max 10,000 block split can be detected. The 10_000 limit is default, but can be overridden
373
        let max_chain_split_iters = max_reorg_depth_allowed.saturating_div(NUM_CHAIN_SPLIT_HEADERS);
13✔
374

375
        let mut offset = 0;
13✔
376
        let mut iter_count = 0;
13✔
377
        loop {
378
            iter_count += 1;
13✔
379
            if iter_count > max_chain_split_iters {
13✔
380
                warn!(
×
381
                    target: LOG_TARGET,
×
382
                    "Peer `{}` did not provide a chain split after {} headers requested. Peer will be banned.",
383
                    peer_node_id,
384
                    NUM_CHAIN_SPLIT_HEADERS * max_chain_split_iters,
×
385
                );
386
                return Err(BlockHeaderSyncError::ChainSplitNotFound(peer_node_id.clone()));
×
387
            }
13✔
388

389
            let block_hashes = self
13✔
390
                .db
13✔
391
                .fetch_block_hashes_from_header_tip(NUM_CHAIN_SPLIT_HEADERS, offset)
13✔
392
                .await?;
13✔
393
            debug!(
13✔
394
                target: LOG_TARGET,
×
395
                "Determining if chain splits between {} and {} headers back from the tip (peer: `{}`, {} hashes sent)",
396
                offset,
397
                offset + NUM_CHAIN_SPLIT_HEADERS,
×
398
                peer_node_id,
399
                block_hashes.len()
×
400
            );
401

402
            // No further hashes to send.
403
            if block_hashes.is_empty() {
13✔
404
                warn!(
×
405
                    target: LOG_TARGET,
×
406
                    "Peer `{}` did not provide a chain split after {} headers requested. Peer will be banned.",
407
                    peer_node_id,
408
                    NUM_CHAIN_SPLIT_HEADERS * max_chain_split_iters,
×
409
                );
410
                return Err(BlockHeaderSyncError::ChainSplitNotFound(peer_node_id.clone()));
×
411
            }
13✔
412

413
            let request = FindChainSplitRequest {
13✔
414
                block_hashes: block_hashes.clone().iter().map(|v| v.to_vec()).collect(),
56✔
415
                header_count,
13✔
416
            };
417

418
            let resp = match client.find_chain_split(request).await {
13✔
419
                Ok(r) => r,
13✔
420
                Err(RpcError::RequestFailed(err)) if err.as_status_code().is_not_found() => {
×
421
                    // This round we sent less hashes than the max, so the next round will not have any more hashes to
422
                    // send. Exit early in this case.
423
                    if block_hashes.len() < NUM_CHAIN_SPLIT_HEADERS {
×
424
                        warn!(
×
425
                            target: LOG_TARGET,
×
426
                            "Peer `{}` did not provide a chain split after {} headers requested. Peer will be banned.",
427
                            peer_node_id,
428
                            NUM_CHAIN_SPLIT_HEADERS * max_chain_split_iters,
×
429
                        );
430
                        return Err(BlockHeaderSyncError::ChainSplitNotFound(peer_node_id.clone()));
×
431
                    }
×
432
                    // Chain split not found, let's go further back
433
                    offset = NUM_CHAIN_SPLIT_HEADERS * iter_count;
×
434
                    continue;
×
435
                },
436
                Err(err) => {
×
437
                    return Err(err.into());
×
438
                },
439
            };
440
            if resp.headers.len() > HEADER_SYNC_INITIAL_MAX_HEADERS {
13✔
441
                warn!(
×
442
                    target: LOG_TARGET,
×
443
                    "Peer `{}` sent too many headers {}, only requested {}. Peer will be banned.",
444
                    peer_node_id,
445
                    resp.headers.len(),
×
446
                    HEADER_SYNC_INITIAL_MAX_HEADERS,
447
                );
448
                return Err(BlockHeaderSyncError::PeerSentTooManyHeaders(resp.headers.len()));
×
449
            }
13✔
450
            if resp.fork_hash_index >= block_hashes.len() as u64 {
13✔
451
                warn!(
×
452
                    target: LOG_TARGET,
×
453
                    "Peer `{}` sent hash index {} out of range {}. Peer will be banned.",
454
                    peer_node_id,
455
                    resp.fork_hash_index,
456
                    block_hashes.len(),
×
457
                );
458
                return Err(BlockHeaderSyncError::FoundHashIndexOutOfRange(
×
459
                    block_hashes.len() as u64,
×
460
                    resp.fork_hash_index,
×
461
                ));
×
462
            }
13✔
463
            #[allow(clippy::cast_possible_truncation)]
464
            if !resp.headers.is_empty() &&
13✔
465
                *resp.headers.first().expect("Already checked").prev_hash !=
9✔
466
                    *block_hashes
9✔
467
                        .get(resp.fork_hash_index as usize)
9✔
468
                        .expect("Already checked")
9✔
469
            {
470
                warn!(
×
471
                    target: LOG_TARGET,
×
472
                    "Peer `{}` sent hash an invalid protocol response, incorrect fork hash index {}. Peer will be banned.",
473
                    peer_node_id,
474
                    resp.fork_hash_index,
475
                );
476
                return Err(BlockHeaderSyncError::InvalidProtocolResponse(
×
477
                    "Peer sent incorrect fork hash index".into(),
×
478
                ));
×
479
            }
13✔
480
            #[allow(clippy::cast_possible_truncation)]
481
            let chain_split_hash = *block_hashes
13✔
482
                .get(resp.fork_hash_index as usize)
13✔
483
                .expect("Already checked");
13✔
484

485
            return Ok(FindChainSplitResult {
13✔
486
                reorg_steps_back: resp.fork_hash_index.saturating_add(offset as u64),
13✔
487
                peer_headers: resp.headers,
13✔
488
                peer_fork_hash_index: resp.fork_hash_index,
13✔
489
                chain_split_hash,
13✔
490
            });
13✔
491
        }
492
    }
13✔
493

494
    /// Attempt to determine the point at which the remote and local chain diverge, returning the relevant information
495
    /// of the chain split (see [HeaderSyncStatus]).
496
    ///
497
    /// If the local node is behind the remote chain (i.e. `HeaderSyncStatus::Lagging`), the appropriate
498
    /// `ChainSplitInfo` is returned, the header validator is initialized and the preliminary headers are validated.
499
    async fn determine_sync_status(
13✔
500
        &mut self,
13✔
501
        sync_peer: &SyncPeer,
13✔
502
        best_header: ChainHeader,
13✔
503
        best_block_header: ChainHeader,
13✔
504
        max_reorg_depth_allowed: usize,
13✔
505
        client: &mut rpc::BaseNodeSyncRpcClient,
13✔
506
    ) -> Result<(HeaderSyncStatus, FindChainSplitResult), BlockHeaderSyncError> {
13✔
507
        // This method will return ban-able errors for certain offenses.
508
        let chain_split_result = self
13✔
509
            .find_chain_split(
13✔
510
                sync_peer.node_id(),
13✔
511
                max_reorg_depth_allowed,
13✔
512
                client,
13✔
513
                HEADER_SYNC_INITIAL_MAX_HEADERS as u64,
13✔
514
            )
13✔
515
            .await?;
13✔
516
        if chain_split_result.reorg_steps_back > 0 {
13✔
517
            debug!(
4✔
518
                target: LOG_TARGET,
×
519
                "Found chain split {} blocks back, received {} headers from peer `{}`",
520
                chain_split_result.reorg_steps_back,
521
                chain_split_result.peer_headers.len(),
×
522
                sync_peer
523
            );
524
        }
9✔
525

526
        // If the peer returned no new headers, they may still have more blocks than we have, thus have a higher
527
        // accumulated difficulty.
528
        if chain_split_result.peer_headers.is_empty() {
13✔
529
            // Our POW is less than the peer's POW, as verified before the attempted header sync, therefore, if the
530
            // peer did not supply any headers and we know we are behind based on the peer's claimed metadata, then
531
            // we can ban the peer.
532
            if best_header.height() == best_block_header.height() {
4✔
533
                warn!(
1✔
534
                    target: LOG_TARGET,
×
535
                    "Peer `{}` did not provide any headers although they have a better chain and more headers: their \
536
                    difficulty: {}, our difficulty: {}. Peer will be banned.",
537
                    sync_peer.node_id(),
×
538
                    sync_peer.claimed_chain_metadata().accumulated_difficulty(),
×
539
                    best_block_header.accumulated_data().total_accumulated_difficulty,
×
540
                );
541
                return Err(BlockHeaderSyncError::PeerSentInaccurateChainMetadata {
1✔
542
                    claimed: sync_peer.claimed_chain_metadata().accumulated_difficulty(),
1✔
543
                    actual: None,
1✔
544
                    local: best_block_header.accumulated_data().total_accumulated_difficulty,
1✔
545
                });
1✔
546
            }
3✔
547
            debug!(target: LOG_TARGET, "Peer `{}` sent no headers; headers already in sync with peer.", sync_peer.node_id());
3✔
548
            return Ok((HeaderSyncStatus::InSyncOrAhead, chain_split_result));
3✔
549
        }
9✔
550

551
        let headers = chain_split_result
9✔
552
            .peer_headers
9✔
553
            .clone()
9✔
554
            .into_iter()
9✔
555
            .map(BlockHeader::try_from)
9✔
556
            .collect::<Result<Vec<_>, _>>()
9✔
557
            .map_err(BlockHeaderSyncError::ReceivedInvalidHeader)?;
9✔
558
        let num_new_headers = headers.len();
9✔
559
        // Do a cheap check to verify that we do not have these series of headers in the db already - if the 1st one is
560
        // not there most probably the rest are not either - the peer could still have returned old headers later on in
561
        // the list
562
        if self
9✔
563
            .db
9✔
564
            .fetch_header_by_block_hash(headers.first().expect("Already checked").hash())
9✔
565
            .await?
9✔
566
            .is_some()
9✔
567
        {
568
            return Err(BlockHeaderSyncError::ReceivedInvalidHeader(
×
569
                "Header already in database".to_string(),
×
570
            ));
×
571
        };
9✔
572

573
        self.header_validator
9✔
574
            .initialize_state(&chain_split_result.chain_split_hash)
9✔
575
            .await?;
9✔
576
        for header in headers {
32✔
577
            debug!(
32✔
578
                target: LOG_TARGET,
×
579
                "Validating header #{} (Pow: {}) with hash: ({})",
580
                header.height,
581
                header.pow_algo(),
×
582
                header.hash().to_hex(),
×
583
            );
584
            self.header_validator.validate(header).await?;
32✔
585
        }
586

587
        debug!(
9✔
588
            target: LOG_TARGET,
×
589
            "Peer `{}` has submitted {} valid header(s)", sync_peer.node_id(), num_new_headers
×
590
        );
591

592
        let chain_split_info = ChainSplitInfo {
9✔
593
            best_block_header,
9✔
594
            reorg_steps_back: chain_split_result.reorg_steps_back,
9✔
595
            chain_split_hash: chain_split_result.chain_split_hash,
9✔
596
        };
9✔
597
        Ok((
9✔
598
            HeaderSyncStatus::Lagging(Box::new(chain_split_info)),
9✔
599
            chain_split_result,
9✔
600
        ))
9✔
601
    }
13✔
602

603
    async fn rewind_blockchain(&self, split_hash: HashOutput) -> Result<Vec<Arc<ChainBlock>>, BlockHeaderSyncError> {
1✔
604
        debug!(
1✔
605
            target: LOG_TARGET,
×
606
            "Deleting headers that no longer form part of the main chain up until split at {}",
607
            split_hash.to_hex()
×
608
        );
609

610
        let blocks = self.db.rewind_to_hash(split_hash).await?;
1✔
611
        debug!(
1✔
612
            target: LOG_TARGET,
×
613
            "Rewound {} block(s) in preparation for header sync",
614
            blocks.len()
×
615
        );
616
        Ok(blocks)
1✔
617
    }
1✔
618

619
    #[allow(clippy::too_many_lines)]
620
    async fn synchronize_headers(
9✔
621
        &mut self,
9✔
622
        mut sync_peer: SyncPeer,
9✔
623
        client: &mut rpc::BaseNodeSyncRpcClient,
9✔
624
        split_info: ChainSplitInfo,
9✔
625
        max_latency: Duration,
9✔
626
    ) -> Result<(), BlockHeaderSyncError> {
9✔
627
        info!(target: LOG_TARGET, "Starting header sync from peer {sync_peer}");
9✔
628
        const COMMIT_EVERY_N_HEADERS: usize = 1000;
629

630
        let mut has_switched_to_new_chain = false;
9✔
631
        let pending_len = self.header_validator.valid_headers().len();
9✔
632

633
        // Find the hash to start syncing the rest of the headers.
634
        // The expectation cannot fail because there has been at least one valid header returned (checked in
635
        // determine_sync_status)
636
        let (start_header_height, start_header_hash, total_accumulated_difficulty) = self
9✔
637
            .header_validator
9✔
638
            .current_valid_chain_tip_header()
9✔
639
            .map(|h| (h.height(), *h.hash(), h.accumulated_data().total_accumulated_difficulty))
9✔
640
            .expect("synchronize_headers: expected there to be a valid tip header but it was None");
9✔
641

642
        // If we already have a stronger chain at this point, switch over to it.
643
        // just in case we happen to be exactly HEADER_SYNC_INITIAL_MAX_HEADERS headers behind.
644
        let has_better_pow = self.pending_chain_has_higher_pow(&split_info.best_block_header);
9✔
645

646
        if has_better_pow {
9✔
647
            debug!(
9✔
648
                target: LOG_TARGET,
×
649
                "Remote chain from peer {} has higher PoW. Switching",
650
                sync_peer.node_id()
×
651
            );
652
            self.switch_to_pending_chain(&split_info).await?;
9✔
653
            has_switched_to_new_chain = true;
9✔
654
        }
×
655

656
        if pending_len < HEADER_SYNC_INITIAL_MAX_HEADERS {
9✔
657
            // Peer returned less than the max number of requested headers. This indicates that we have all the
658
            // available headers from the peer.
659
            if !has_better_pow {
9✔
660
                // Because the pow is less or equal than the current chain the peer had to have lied about their pow
661
                debug!(target: LOG_TARGET, "No further headers to download");
×
662
                return Err(BlockHeaderSyncError::PeerSentInaccurateChainMetadata {
×
663
                    claimed: sync_peer.claimed_chain_metadata().accumulated_difficulty(),
×
664
                    actual: Some(total_accumulated_difficulty),
×
665
                    local: split_info
×
666
                        .best_block_header
×
667
                        .accumulated_data()
×
668
                        .total_accumulated_difficulty,
×
669
                });
×
670
            }
9✔
671
            // The pow is higher, we swapped to the higher chain, we have all the better chain headers, we can move on
672
            // to block sync.
673
            return Ok(());
9✔
674
        }
×
675

676
        debug!(
×
677
            target: LOG_TARGET,
×
678
            "Download remaining headers starting from header #{} from peer `{}`",
679
            start_header_height,
680
            sync_peer.node_id()
×
681
        );
682
        let request = SyncHeadersRequest {
×
683
            start_hash: start_header_hash.to_vec(),
×
684
            // To the tip!
×
685
            count: 0,
×
686
        };
×
687

688
        let mut header_stream = client.sync_headers(request).await?;
×
689
        debug!(
×
690
            target: LOG_TARGET,
×
691
            "Reading headers from peer `{}`",
692
            sync_peer.node_id()
×
693
        );
694

695
        let mut last_sync_timer = Instant::now();
×
696

697
        let mut last_total_accumulated_difficulty = U512::zero();
×
698
        let mut avg_latency = RollingAverageTime::new(20);
×
699
        let mut prev_height: Option<u64> = None;
×
700
        while let Some(header) = header_stream.next().await {
×
701
            let latency = last_sync_timer.elapsed();
×
702
            avg_latency.add_sample(latency);
×
703
            let header = BlockHeader::try_from(header?).map_err(BlockHeaderSyncError::ReceivedInvalidHeader)?;
×
704
            debug!(
×
705
                target: LOG_TARGET,
×
706
                "Validating header #{} (Pow: {}) with hash: ({}). Latency: {:.2?}",
707
                header.height,
708
                header.pow_algo(),
×
709
                header.hash().to_hex(),
×
710
                latency
711
            );
712
            trace!(
×
713
                target: LOG_TARGET,
×
714
                "{header}"
715
            );
716
            if let Some(prev_header_height) = prev_height &&
×
717
                header.height != prev_header_height.saturating_add(1)
×
718
            {
719
                warn!(
×
720
                    target: LOG_TARGET,
×
721
                    "Received header #{} `{}` does not follow previous header",
722
                    header.height,
723
                    header.hash().to_hex()
×
724
                );
725
                return Err(BlockHeaderSyncError::ReceivedInvalidHeader(
×
726
                    "Header does not follow previous header".to_string(),
×
727
                ));
×
728
            }
×
729
            let existing_header = self.db.fetch_header_by_block_hash(header.hash()).await?;
×
730
            if let Some(h) = existing_header {
×
731
                warn!(
×
732
                    target: LOG_TARGET,
×
733
                    "Received header #{} `{}` that we already have.",
734
                    h.height,
735
                    h.hash().to_hex()
×
736
                );
737
                return Err(BlockHeaderSyncError::ReceivedInvalidHeader(
×
738
                    "Header already in database".to_string(),
×
739
                ));
×
740
            }
×
741
            let current_height = header.height;
×
742
            last_total_accumulated_difficulty = self.header_validator.validate(header).await?;
×
743

744
            if has_switched_to_new_chain {
×
745
                // If we've switched to the new chain, we simply commit every COMMIT_EVERY_N_HEADERS headers
746
                if self.header_validator.valid_headers().len() >= COMMIT_EVERY_N_HEADERS {
×
747
                    self.commit_pending_headers().await?;
×
748
                }
×
749
            } else {
750
                // The remote chain has not (yet) been accepted.
751
                // We check the tip difficulties, switching over to the new chain if a higher accumulated difficulty is
752
                // achieved.
753
                if self.pending_chain_has_higher_pow(&split_info.best_block_header) {
×
754
                    self.switch_to_pending_chain(&split_info).await?;
×
755
                    has_switched_to_new_chain = true;
×
756
                }
×
757
            }
758

759
            sync_peer.set_latency(latency);
×
760
            sync_peer.add_sample(last_sync_timer.elapsed());
×
761
            self.hooks.call_on_progress_header_hooks(
×
762
                current_height,
×
763
                sync_peer.claimed_chain_metadata().best_block_height(),
×
764
                &sync_peer,
×
765
            );
766

767
            let last_avg_latency = avg_latency.calculate_average_with_min_samples(5);
×
768
            if let Some(avg_latency) = last_avg_latency &&
×
769
                avg_latency > max_latency
×
770
            {
771
                return Err(BlockHeaderSyncError::MaxLatencyExceeded {
×
772
                    peer: sync_peer.node_id().clone(),
×
773
                    latency: avg_latency,
×
774
                    max_latency,
×
775
                });
×
776
            }
×
777

778
            last_sync_timer = Instant::now();
×
779
            prev_height = Some(current_height);
×
780
        }
781

782
        let claimed_total_accumulated_diff = sync_peer.claimed_chain_metadata().accumulated_difficulty();
×
783
        if !has_switched_to_new_chain {
×
784
            let best_local_before_sync = split_info
×
785
                .best_block_header
×
786
                .accumulated_data()
×
787
                .total_accumulated_difficulty;
×
788
            match self
×
789
                .header_validator
×
790
                .current_valid_chain_tip_header()
×
791
                .map(|h| h.accumulated_data().total_accumulated_difficulty)
×
792
            {
793
                Some(validated_total_accumulated_diff) => {
×
794
                    if claimed_total_accumulated_diff > validated_total_accumulated_diff {
×
795
                        // Over-claim: peer advertised more PoW than their headers actually provide.
796
                        return Err(BlockHeaderSyncError::PeerSentInaccurateChainMetadata {
×
797
                            claimed: claimed_total_accumulated_diff,
×
798
                            actual: Some(validated_total_accumulated_diff),
×
799
                            local: best_local_before_sync,
×
800
                        });
×
801
                    } else if self.pending_chain_has_higher_pow(&split_info.best_block_header) {
×
802
                        self.switch_to_pending_chain(&split_info).await?;
×
803
                        has_switched_to_new_chain = true;
×
804
                        info!(
×
805
                            target: LOG_TARGET,
×
806
                            "Received PoW from peer exceeds local tip. Before sync: {}, received: {}. Committed.",
807
                            best_local_before_sync,
808
                            validated_total_accumulated_diff,
809
                        );
810
                    } else {
811
                        // We have a stronger chain, so we do not commit the headers.
812
                        debug!(
×
813
                            target: LOG_TARGET,
×
814
                            "Not committing headers as we have a stronger chain, ours: {} theirs: {}",
815
                            best_local_before_sync,
816
                            validated_total_accumulated_diff,
817
                        );
818
                    };
819
                },
820
                None => {
821
                    // No validated headers at this stage, but there should have been.
822
                    return Err(BlockHeaderSyncError::PeerSentInaccurateChainMetadata {
×
823
                        claimed: claimed_total_accumulated_diff,
×
824
                        actual: None,
×
825
                        local: best_local_before_sync,
×
826
                    });
×
827
                },
828
            }
829
        }
×
830

831
        // Commit the last blocks only if we have switched to the new chain.
832
        if has_switched_to_new_chain && !self.header_validator.valid_headers().is_empty() {
×
833
            self.commit_pending_headers().await?;
×
834
        }
×
835

836
        // This rule is strict: if the peer advertised a higher PoW than they were able to provide (without
837
        // some other external factor like a disconnect etc), we detect the and ban the peer.
838
        if last_total_accumulated_difficulty < claimed_total_accumulated_diff {
×
839
            return Err(BlockHeaderSyncError::PeerSentInaccurateChainMetadata {
×
840
                claimed: claimed_total_accumulated_diff,
×
841
                actual: Some(last_total_accumulated_difficulty),
×
842
                local: split_info
×
843
                    .best_block_header
×
844
                    .accumulated_data()
×
845
                    .total_accumulated_difficulty,
×
846
            });
×
847
        }
×
848

849
        Ok(())
×
850
    }
9✔
851

852
    async fn commit_pending_headers(&mut self) -> Result<ChainHeader, BlockHeaderSyncError> {
9✔
853
        let chain_headers = self.header_validator.take_valid_headers();
9✔
854
        let num_headers = chain_headers.len();
9✔
855
        let start = Instant::now();
9✔
856

857
        let new_tip = chain_headers.last().cloned().unwrap();
9✔
858
        let mut txn = self.db.write_transaction();
9✔
859
        chain_headers.into_iter().for_each(|chain_header| {
32✔
860
            txn.insert_chain_header(chain_header);
32✔
861
        });
32✔
862

863
        txn.commit().await?;
9✔
864

865
        debug!(
9✔
866
            target: LOG_TARGET,
×
867
            "{} header(s) committed (tip = {}) to the blockchain db in {:.2?}",
868
            num_headers,
869
            new_tip.height(),
×
870
            start.elapsed()
×
871
        );
872

873
        Ok(new_tip)
9✔
874
    }
9✔
875

876
    fn pending_chain_has_higher_pow(&self, current_tip: &ChainHeader) -> bool {
9✔
877
        let chain_headers = self.header_validator.valid_headers();
9✔
878
        if chain_headers.is_empty() {
9✔
879
            return false;
×
880
        }
9✔
881

882
        // Check that the remote tip is stronger than the local tip, equal should not have ended up here, so we treat
883
        // equal as less
884
        let proposed_tip = chain_headers.last().unwrap();
9✔
885
        self.header_validator.compare_chains(current_tip, proposed_tip).is_lt()
9✔
886
    }
9✔
887

888
    async fn switch_to_pending_chain(&mut self, split_info: &ChainSplitInfo) -> Result<(), BlockHeaderSyncError> {
9✔
889
        // Reorg if required
890
        if split_info.reorg_steps_back > 0 {
9✔
891
            debug!(
1✔
892
                target: LOG_TARGET,
×
893
                "Reorg: Rewinding the chain by {} block(s) (split hash = {})",
894
                split_info.reorg_steps_back,
895
                split_info.chain_split_hash.to_hex()
×
896
            );
897
            let blocks = self.rewind_blockchain(split_info.chain_split_hash).await?;
1✔
898
            if !blocks.is_empty() {
1✔
899
                self.hooks.call_on_rewind_hooks(blocks);
1✔
900
            }
1✔
901
        }
8✔
902

903
        // Commit the forked chain. At this point
904
        // 1. Headers have been validated
905
        // 2. The forked chain has a higher PoW than the local chain
906
        //
907
        // After this we commit headers every `n` blocks
908
        self.commit_pending_headers().await?;
9✔
909

910
        Ok(())
9✔
911
    }
9✔
912

913
    // Sync peers are also removed from the list of sync peers if the ban duration is longer than the short ban period.
914
    fn remove_sync_peer(&mut self, node_id: &NodeId) {
1✔
915
        if let Some(pos) = self.sync_peers.iter().position(|p| p.node_id() == node_id) {
1✔
916
            self.sync_peers.remove(pos);
1✔
917
        }
1✔
918
    }
1✔
919

920
    // Helper function to get the index to the node_id inside of the vec of peers
921
    fn get_sync_peer_index(&mut self, node_id: &NodeId) -> Option<usize> {
13✔
922
        self.sync_peers.iter().position(|p| p.node_id() == node_id)
13✔
923
    }
13✔
924
}
925

926
#[derive(Debug, Clone)]
927
struct FindChainSplitResult {
928
    reorg_steps_back: u64,
929
    peer_headers: Vec<ProtoBlockHeader>,
930
    peer_fork_hash_index: u64,
931
    chain_split_hash: HashOutput,
932
}
933

934
/// Information about the chain split from the remote node.
935
#[derive(Debug, Clone, PartialEq)]
936
pub struct ChainSplitInfo {
937
    /// The best block's header on the local chain.
938
    pub best_block_header: ChainHeader,
939
    /// The number of blocks to reorg back to the fork.
940
    pub reorg_steps_back: u64,
941
    /// The hash of the block at the fork.
942
    pub chain_split_hash: HashOutput,
943
}
944

945
/// The result of an attempt to synchronize headers with a peer.
946
#[derive(Debug, Clone, PartialEq)]
947
pub struct AttemptSyncResult {
948
    /// The number of headers that were returned.
949
    pub headers_returned: u64,
950
    /// The fork hash index of the remote peer.
951
    pub peer_fork_hash_index: u64,
952
    /// The header sync status.
953
    pub header_sync_status: HeaderSyncStatus,
954
}
955

956
#[derive(Debug, Clone, PartialEq)]
957
pub enum HeaderSyncStatus {
958
    /// Local and remote node are in sync or ahead
959
    InSyncOrAhead,
960
    /// Local node is lagging behind remote node
961
    Lagging(Box<ChainSplitInfo>),
962
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc