• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 25106105978

29 Apr 2026 11:25AM UTC coverage: 61.025% (+0.08%) from 60.949%
25106105978

push

github

web-flow
fix(deps): upgrade diesel to 2.3.8 for RUSTSEC-2026-0111 (#7790)

Description
---
Upgrade Diesel dependencies from 2.2.10 to 2.3.8 across Tari crates that
use the SQLite backend, and refresh diesel_migrations to 2.3.

This addresses the RUSTSEC-2026-0111 advisory (possible UTF-8 corruption
unsoundness in Diesel's SQLite backend).

Motivation and Context
---
Issue #7787 reports that the workspace lockfile includes vulnerable
diesel 2.2.10.

This patch keeps the scope limited to dependency updates in crates that
directly depend on Diesel:
- common_sqlite
- comms/core
- comms/dht
- base_layer/wallet
- base_layer/transaction_key_manager

How Has This Been Tested?
---
- cargo check --locked --ignore-rust-version -p tari_common_sqlite -p
tari_comms -p tari_comms_dht -p tari_transaction_key_manager -p
minotari_wallet

Note: local toolchain in this environment is rustc 1.92.0 while
workspace requires 1.93.0; therefore --ignore-rust-version was used for
local verification.

What process can a PR reviewer use to test or verify this change?
---
1. Confirm Cargo.toml Diesel version bumps in the five crates above.
2. Confirm Cargo.lock no longer contains diesel 2.2.10.
3. Run:
cargo check --locked -p tari_common_sqlite -p tari_comms -p
tari_comms_dht -p tari_transaction_key_manager -p minotari_wallet

Breaking Changes
---
- [x] None
- [ ] Requires data directory on base node to be deleted
- [ ] Requires hard fork
- [ ] Other - Please specify

Closes #7787.

70852 of 116103 relevant lines covered (61.03%)

223856.56 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.34
/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs
1
//  Copyright 2020, The Tari Project
2
//
3
//  Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
//  following conditions are met:
5
//
6
//  1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
//  disclaimer.
8
//
9
//  2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
//  following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
//  3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
//  products derived from this software without specific prior written permission.
14
//
15
//  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
//  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
//  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
//  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
//  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
//  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
//  USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22
use std::{
23
    sync::Arc,
24
    time::{Duration, Instant},
25
};
26

27
use futures::StreamExt;
28
use log::*;
29
use primitive_types::U512;
30
use tari_common_types::{chain_metadata::ChainMetadata, types::HashOutput};
31
use tari_comms::{
32
    PeerConnection,
33
    connectivity::ConnectivityRequester,
34
    peer_manager::NodeId,
35
    protocol::rpc::{RpcClient, RpcError},
36
};
37
use tari_node_components::blocks::{BlockHeader, ChainBlock, ChainHeader};
38
use tari_transaction_components::BanPeriod;
39
use tari_utilities::hex::Hex;
40

41
pub(crate) use super::{BlockHeaderSyncError, validator::BlockHeaderSyncValidator};
42
use crate::{
43
    base_node::sync::{
44
        BlockchainSyncConfig,
45
        SyncPeer,
46
        ban::PeerBanManager,
47
        header_sync::HEADER_SYNC_INITIAL_MAX_HEADERS,
48
        hooks::Hooks,
49
        rpc,
50
    },
51
    chain_storage::{BlockchainBackend, ChainStorageError, async_db::AsyncBlockchainDb},
52
    common::rolling_avg::RollingAverageTime,
53
    consensus::BaseNodeConsensusManager,
54
    proof_of_work::randomx_factory::RandomXFactory,
55
    proto::{
56
        base_node::{FindChainSplitRequest, SyncHeadersRequest},
57
        core::BlockHeader as ProtoBlockHeader,
58
    },
59
};
60

61
const LOG_TARGET: &str = "c::bn::header_sync";
62

63
const MAX_LATENCY_INCREASES: usize = 5;
64

65
pub struct HeaderSynchronizer<'a, B> {
66
    config: BlockchainSyncConfig,
67
    db: AsyncBlockchainDb<B>,
68
    header_validator: BlockHeaderSyncValidator<B>,
69
    connectivity: ConnectivityRequester,
70
    sync_peers: &'a mut Vec<SyncPeer>,
71
    hooks: Hooks,
72
    local_cached_metadata: &'a ChainMetadata,
73
    peer_ban_manager: PeerBanManager,
74
}
75

76
impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {
77
    pub fn new(
13✔
78
        config: BlockchainSyncConfig,
13✔
79
        db: AsyncBlockchainDb<B>,
13✔
80
        consensus_rules: BaseNodeConsensusManager,
13✔
81
        connectivity: ConnectivityRequester,
13✔
82
        sync_peers: &'a mut Vec<SyncPeer>,
13✔
83
        randomx_factory: RandomXFactory,
13✔
84
        local_metadata: &'a ChainMetadata,
13✔
85
    ) -> Self {
13✔
86
        let peer_ban_manager = PeerBanManager::new(config.clone(), connectivity.clone());
13✔
87
        Self {
13✔
88
            config,
13✔
89
            header_validator: BlockHeaderSyncValidator::new(db.clone(), consensus_rules, randomx_factory),
13✔
90
            db,
13✔
91
            connectivity,
13✔
92
            sync_peers,
13✔
93
            hooks: Default::default(),
13✔
94
            local_cached_metadata: local_metadata,
13✔
95
            peer_ban_manager,
13✔
96
        }
13✔
97
    }
13✔
98

99
    pub fn on_starting<H>(&mut self, hook: H)
13✔
100
    where for<'r> H: FnOnce(&SyncPeer) + Send + Sync + 'static {
13✔
101
        self.hooks.add_on_starting_hook(hook);
13✔
102
    }
13✔
103

104
    pub fn on_progress<H>(&mut self, hook: H)
13✔
105
    where H: Fn(u64, u64, &SyncPeer) + Send + Sync + 'static {
13✔
106
        self.hooks.add_on_progress_header_hook(hook);
13✔
107
    }
13✔
108

109
    pub fn on_rewind<H>(&mut self, hook: H)
13✔
110
    where H: Fn(Vec<Arc<ChainBlock>>) + Send + Sync + 'static {
13✔
111
        self.hooks.add_on_rewind_hook(hook);
13✔
112
    }
13✔
113

114
    pub async fn synchronize(&mut self) -> Result<(SyncPeer, AttemptSyncResult), BlockHeaderSyncError> {
13✔
115
        debug!(target: LOG_TARGET, "Starting header sync.",);
13✔
116

117
        info!(
13✔
118
            target: LOG_TARGET,
×
119
            "Synchronizing headers ({} candidate peers selected)",
120
            self.sync_peers.len()
×
121
        );
122

123
        // Hold `Arc<NodeId>` sync-list handles for each candidate peer. While these guards are
124
        // alive the connectivity manager marks the peers as "in use by sync", which prevents
125
        // opportunistic disconnects (e.g. DhtConnectivity random-pool pruning). The guards are
126
        // dropped automatically when this function returns.
127
        let mut _sync_guards: Vec<Arc<NodeId>> = Vec::with_capacity(self.sync_peers.len());
13✔
128
        for peer in self.sync_peers.iter() {
13✔
129
            match self.connectivity.add_peer_to_sync_list(peer.node_id().clone()).await {
13✔
130
                Ok(handle) => _sync_guards.push(handle),
13✔
131
                Err(e) => debug!(
×
132
                    target: LOG_TARGET,
×
133
                    "Failed to register sync peer {} on sync list: {e}", peer.node_id()
×
134
                ),
135
            }
136
        }
137

138
        self.synchronize_inner().await
13✔
139
    }
13✔
140

141
    async fn synchronize_inner(&mut self) -> Result<(SyncPeer, AttemptSyncResult), BlockHeaderSyncError> {
13✔
142
        let mut max_latency = self.config.initial_max_sync_latency;
13✔
143
        let mut latency_increases_counter = 0;
13✔
144
        loop {
145
            match self.try_sync_from_all_peers(max_latency).await {
13✔
146
                Ok((peer, sync_result)) => break Ok((peer, sync_result)),
12✔
147
                Err(err @ BlockHeaderSyncError::AllSyncPeersExceedLatency) => {
×
148
                    // If we have few sync peers, throw this out to be retried later
149
                    if self.sync_peers.len() < 2 {
×
150
                        return Err(err);
×
151
                    }
×
152
                    max_latency += self.config.max_latency_increase;
×
153
                    latency_increases_counter += 1;
×
154
                    if latency_increases_counter > MAX_LATENCY_INCREASES {
×
155
                        return Err(err);
×
156
                    }
×
157
                },
158
                Err(err) => break Err(err),
1✔
159
            }
160
        }
161
    }
13✔
162

163
    #[allow(clippy::too_many_lines)]
164
    pub async fn try_sync_from_all_peers(
13✔
165
        &mut self,
13✔
166
        max_latency: Duration,
13✔
167
    ) -> Result<(SyncPeer, AttemptSyncResult), BlockHeaderSyncError> {
13✔
168
        let sync_peer_node_ids = self.sync_peers.iter().map(|p| p.node_id()).cloned().collect::<Vec<_>>();
13✔
169
        info!(
13✔
170
            target: LOG_TARGET,
×
171
            "Attempting to sync headers ({} sync peers)",
172
            sync_peer_node_ids.len()
×
173
        );
174
        let mut latency_counter = 0usize;
13✔
175
        for node_id in sync_peer_node_ids {
13✔
176
            match self.connect_and_attempt_sync(&node_id, max_latency).await {
13✔
177
                Ok((peer, sync_result)) => return Ok((peer, sync_result)),
12✔
178
                Err(err) => {
1✔
179
                    let ban_reason = BlockHeaderSyncError::get_ban_reason(&err);
1✔
180
                    if let Some(reason) = ban_reason {
1✔
181
                        warn!(target: LOG_TARGET, "{err}");
1✔
182
                        let duration = match reason.ban_duration {
1✔
183
                            BanPeriod::Short => self.config.short_ban_period,
1✔
184
                            BanPeriod::Long => self.config.ban_period,
×
185
                        };
186
                        self.peer_ban_manager
1✔
187
                            .ban_peer_if_required(&node_id, reason.reason, duration)
1✔
188
                            .await;
1✔
189
                    }
×
190
                    if let BlockHeaderSyncError::MaxLatencyExceeded { .. } = err {
1✔
191
                        latency_counter += 1;
×
192
                    } else {
1✔
193
                        self.remove_sync_peer(&node_id);
1✔
194
                    }
1✔
195
                },
196
            }
197
        }
198

199
        if self.sync_peers.is_empty() {
1✔
200
            Err(BlockHeaderSyncError::NoMoreSyncPeers("Header sync failed".to_string()))
1✔
201
        } else if latency_counter >= self.sync_peers.len() {
×
202
            Err(BlockHeaderSyncError::AllSyncPeersExceedLatency)
×
203
        } else {
204
            Err(BlockHeaderSyncError::SyncFailedAllPeers)
×
205
        }
206
    }
13✔
207

208
    async fn connect_and_attempt_sync(
13✔
209
        &mut self,
13✔
210
        node_id: &NodeId,
13✔
211
        max_latency: Duration,
13✔
212
    ) -> Result<(SyncPeer, AttemptSyncResult), BlockHeaderSyncError> {
13✔
213
        let peer_index = self
13✔
214
            .get_sync_peer_index(node_id)
13✔
215
            .ok_or(BlockHeaderSyncError::PeerNotFound)?;
13✔
216
        let sync_peer = self.sync_peers.get(peer_index).expect("Already checked");
13✔
217
        self.hooks.call_on_starting_hook(sync_peer);
13✔
218

219
        let mut conn = self.dial_sync_peer(node_id).await?;
13✔
220
        debug!(
13✔
221
            target: LOG_TARGET,
×
222
            "Attempting to synchronize headers with `{node_id}`"
223
        );
224
        // Defensive: the connection may have been torn down by another subsystem between
225
        // dial returning and this point (e.g. DhtConnectivity pruning). This is not the peer's
226
        // fault, so use NotInSync (no ban) to trigger a skip-and-retry with the next peer.
227
        if !conn.is_connected() {
13✔
228
            warn!(
×
229
                target: LOG_TARGET,
×
230
                "Sync peer `{node_id}` was disconnected before RPC negotiation could begin"
231
            );
232
            return Err(BlockHeaderSyncError::NotInSync);
×
233
        }
13✔
234

235
        let config = RpcClient::builder()
13✔
236
            .with_deadline(self.config.rpc_deadline)
13✔
237
            .with_deadline_grace_period(Duration::from_secs(5));
13✔
238
        // Bound RPC negotiation so a stuck negotiation cannot wedge the sync loop.
239
        let mut client = tokio::time::timeout(
13✔
240
            self.config.rpc_deadline,
13✔
241
            conn.connect_rpc_using_builder::<rpc::BaseNodeSyncRpcClient>(config),
13✔
242
        )
13✔
243
        .await
13✔
244
        .map_err(|_| BlockHeaderSyncError::RpcError(RpcError::ReplyTimeout))??;
13✔
245

246
        let latency = client
13✔
247
            .get_last_request_latency()
13✔
248
            .expect("unreachable panic: last request latency must be set after connect");
13✔
249
        self.sync_peers
13✔
250
            .get_mut(peer_index)
13✔
251
            .ok_or(BlockHeaderSyncError::PeerNotFound)?
13✔
252
            .set_latency(latency);
13✔
253
        if latency > max_latency {
13✔
254
            return Err(BlockHeaderSyncError::MaxLatencyExceeded {
×
255
                peer: conn.peer_node_id().clone(),
×
256
                latency,
×
257
                max_latency,
×
258
            });
×
259
        }
13✔
260

261
        debug!(target: LOG_TARGET, "Sync peer latency is {latency:.2?}");
13✔
262
        let sync_peer = self
13✔
263
            .sync_peers
13✔
264
            .get(peer_index)
13✔
265
            .ok_or(BlockHeaderSyncError::PeerNotFound)?
13✔
266
            .clone();
13✔
267
        let sync_result = self.attempt_sync(&sync_peer, client, max_latency).await?;
13✔
268
        Ok((sync_peer, sync_result))
12✔
269
    }
13✔
270

271
    async fn dial_sync_peer(&self, node_id: &NodeId) -> Result<PeerConnection, BlockHeaderSyncError> {
13✔
272
        let timer = Instant::now();
13✔
273
        debug!(target: LOG_TARGET, "Dialing {node_id} sync peer");
13✔
274
        let conn = self.connectivity.dial_peer(node_id.clone()).await?;
13✔
275
        info!(
13✔
276
            target: LOG_TARGET,
×
277
            "Successfully dialed sync peer {} in {:.2?}",
278
            node_id,
279
            timer.elapsed()
×
280
        );
281
        Ok(conn)
13✔
282
    }
13✔
283

284
    async fn attempt_sync(
13✔
285
        &mut self,
13✔
286
        sync_peer: &SyncPeer,
13✔
287
        mut client: rpc::BaseNodeSyncRpcClient,
13✔
288
        max_latency: Duration,
13✔
289
    ) -> Result<AttemptSyncResult, BlockHeaderSyncError> {
13✔
290
        let latency = client.get_last_request_latency();
13✔
291
        debug!(
13✔
292
            target: LOG_TARGET,
×
293
            "Initiating header sync with peer `{}` (sync latency = {}ms)",
294
            sync_peer.node_id(),
×
295
            latency.unwrap_or_default().as_millis()
×
296
        );
297

298
        // Fetch best local data at the beginning of the sync process
299
        let best_block_metadata = self.db.get_chain_metadata().await?;
13✔
300
        let best_header = self.db.fetch_last_chain_header().await?;
13✔
301
        let best_block_header = self
13✔
302
            .db
13✔
303
            .fetch_chain_header(best_block_metadata.best_block_height())
13✔
304
            .await?;
13✔
305
        let best_header_height = best_header.height();
13✔
306
        let best_block_height = best_block_header.height();
13✔
307

308
        if best_header_height < best_block_height || best_block_height < self.local_cached_metadata.best_block_height()
13✔
309
        {
310
            return Err(BlockHeaderSyncError::ChainStorageError(
×
311
                ChainStorageError::CorruptedDatabase("Inconsistent block and header data".to_string()),
×
312
            ));
×
313
        }
13✔
314

315
        // - At this point we may have more (InSyncOrAhead), equal (InSyncOrAhead), or less headers (Lagging) than the
316
        //   peer, but they claimed better POW before we attempted sync.
317
        // - This method will return ban-able errors for certain offenses.
318
        let (header_sync_status, peer_response) = self
13✔
319
            .determine_sync_status(
13✔
320
                sync_peer,
13✔
321
                best_header.clone(),
13✔
322
                best_block_header.clone(),
13✔
323
                self.config.max_reorg_depth_allowed,
13✔
324
                &mut client,
13✔
325
            )
13✔
326
            .await?;
13✔
327

328
        match header_sync_status.clone() {
12✔
329
            HeaderSyncStatus::InSyncOrAhead => {
330
                debug!(
3✔
331
                    target: LOG_TARGET,
×
332
                    "Headers are in sync at height {best_header_height} but tip is {best_block_height}. Proceeding to archival/pruned block sync"
333
                );
334

335
                Ok(AttemptSyncResult {
3✔
336
                    headers_returned: peer_response.peer_headers.len() as u64,
3✔
337
                    peer_fork_hash_index: peer_response.peer_fork_hash_index,
3✔
338
                    header_sync_status,
3✔
339
                })
3✔
340
            },
341
            HeaderSyncStatus::Lagging(split_info) => {
9✔
342
                self.hooks.call_on_progress_header_hooks(
9✔
343
                    split_info
9✔
344
                        .best_block_header
9✔
345
                        .height()
9✔
346
                        .saturating_sub(split_info.reorg_steps_back),
9✔
347
                    sync_peer.claimed_chain_metadata().best_block_height(),
9✔
348
                    sync_peer,
9✔
349
                );
350
                self.synchronize_headers(sync_peer.clone(), &mut client, *split_info, max_latency)
9✔
351
                    .await?;
9✔
352
                Ok(AttemptSyncResult {
9✔
353
                    headers_returned: peer_response.peer_headers.len() as u64,
9✔
354
                    peer_fork_hash_index: peer_response.peer_fork_hash_index,
9✔
355
                    header_sync_status,
9✔
356
                })
9✔
357
            },
358
        }
359
    }
13✔
360

361
    #[allow(clippy::too_many_lines)]
362
    async fn find_chain_split(
13✔
363
        &mut self,
13✔
364
        peer_node_id: &NodeId,
13✔
365
        max_reorg_depth_allowed: usize,
13✔
366
        client: &mut rpc::BaseNodeSyncRpcClient,
13✔
367
        header_count: u64,
13✔
368
    ) -> Result<FindChainSplitResult, BlockHeaderSyncError> {
13✔
369
        const NUM_CHAIN_SPLIT_HEADERS: usize = 500;
370
        // Limit how far back we're willing to go. A peer might just say it does not have a chain split
371
        // and keep us busy going back until the genesis.
372
        // 20 x 500 = max 10,000 block split can be detected. The 10_000 limit is default, but can be overridden
373
        let max_chain_split_iters = max_reorg_depth_allowed.saturating_div(NUM_CHAIN_SPLIT_HEADERS);
13✔
374

375
        let mut offset = 0;
13✔
376
        let mut iter_count = 0;
13✔
377
        loop {
378
            iter_count += 1;
13✔
379
            if iter_count > max_chain_split_iters {
13✔
380
                warn!(
×
381
                    target: LOG_TARGET,
×
382
                    "Peer `{}` did not provide a chain split after {} headers requested. Peer will be banned.",
383
                    peer_node_id,
384
                    NUM_CHAIN_SPLIT_HEADERS * max_chain_split_iters,
×
385
                );
386
                return Err(BlockHeaderSyncError::ChainSplitNotFound(peer_node_id.clone()));
×
387
            }
13✔
388

389
            let block_hashes = self
13✔
390
                .db
13✔
391
                .fetch_block_hashes_from_header_tip(NUM_CHAIN_SPLIT_HEADERS, offset)
13✔
392
                .await?;
13✔
393
            debug!(
13✔
394
                target: LOG_TARGET,
×
395
                "Determining if chain splits between {} and {} headers back from the tip (peer: `{}`, {} hashes sent)",
396
                offset,
397
                offset + NUM_CHAIN_SPLIT_HEADERS,
×
398
                peer_node_id,
399
                block_hashes.len()
×
400
            );
401

402
            // No further hashes to send.
403
            if block_hashes.is_empty() {
13✔
404
                warn!(
×
405
                    target: LOG_TARGET,
×
406
                    "Peer `{}` did not provide a chain split after {} headers requested. Peer will be banned.",
407
                    peer_node_id,
408
                    NUM_CHAIN_SPLIT_HEADERS * max_chain_split_iters,
×
409
                );
410
                return Err(BlockHeaderSyncError::ChainSplitNotFound(peer_node_id.clone()));
×
411
            }
13✔
412

413
            let request = FindChainSplitRequest {
13✔
414
                block_hashes: block_hashes.clone().iter().map(|v| v.to_vec()).collect(),
56✔
415
                header_count,
13✔
416
            };
417

418
            let resp = match client.find_chain_split(request).await {
13✔
419
                Ok(r) => r,
13✔
420
                Err(RpcError::RequestFailed(err)) if err.as_status_code().is_not_found() => {
×
421
                    // This round we sent less hashes than the max, so the next round will not have any more hashes to
422
                    // send. Exit early in this case.
423
                    if block_hashes.len() < NUM_CHAIN_SPLIT_HEADERS {
×
424
                        warn!(
×
425
                            target: LOG_TARGET,
×
426
                            "Peer `{}` did not provide a chain split after {} headers requested. Peer will be banned.",
427
                            peer_node_id,
428
                            NUM_CHAIN_SPLIT_HEADERS * max_chain_split_iters,
×
429
                        );
430
                        return Err(BlockHeaderSyncError::ChainSplitNotFound(peer_node_id.clone()));
×
431
                    }
×
432
                    // Chain split not found, let's go further back
433
                    offset = NUM_CHAIN_SPLIT_HEADERS * iter_count;
×
434
                    continue;
×
435
                },
436
                Err(err) => {
×
437
                    return Err(err.into());
×
438
                },
439
            };
440
            if resp.headers.len() > HEADER_SYNC_INITIAL_MAX_HEADERS {
13✔
441
                warn!(
×
442
                    target: LOG_TARGET,
×
443
                    "Peer `{}` sent too many headers {}, only requested {}. Peer will be banned.",
444
                    peer_node_id,
445
                    resp.headers.len(),
×
446
                    HEADER_SYNC_INITIAL_MAX_HEADERS,
447
                );
448
                return Err(BlockHeaderSyncError::PeerSentTooManyHeaders(resp.headers.len()));
×
449
            }
13✔
450
            if resp.fork_hash_index >= block_hashes.len() as u64 {
13✔
451
                warn!(
×
452
                    target: LOG_TARGET,
×
453
                    "Peer `{}` sent hash index {} out of range {}. Peer will be banned.",
454
                    peer_node_id,
455
                    resp.fork_hash_index,
456
                    block_hashes.len(),
×
457
                );
458
                return Err(BlockHeaderSyncError::FoundHashIndexOutOfRange(
×
459
                    block_hashes.len() as u64,
×
460
                    resp.fork_hash_index,
×
461
                ));
×
462
            }
13✔
463
            #[allow(clippy::cast_possible_truncation)]
464
            if !resp.headers.is_empty() &&
13✔
465
                *resp.headers.first().expect("Already checked").prev_hash !=
9✔
466
                    *block_hashes
9✔
467
                        .get(resp.fork_hash_index as usize)
9✔
468
                        .expect("Already checked")
9✔
469
            {
470
                warn!(
×
471
                    target: LOG_TARGET,
×
472
                    "Peer `{}` sent hash an invalid protocol response, incorrect fork hash index {}. Peer will be banned.",
473
                    peer_node_id,
474
                    resp.fork_hash_index,
475
                );
476
                return Err(BlockHeaderSyncError::InvalidProtocolResponse(
×
477
                    "Peer sent incorrect fork hash index".into(),
×
478
                ));
×
479
            }
13✔
480
            #[allow(clippy::cast_possible_truncation)]
481
            let chain_split_hash = *block_hashes
13✔
482
                .get(resp.fork_hash_index as usize)
13✔
483
                .expect("Already checked");
13✔
484

485
            return Ok(FindChainSplitResult {
13✔
486
                reorg_steps_back: resp.fork_hash_index.saturating_add(offset as u64),
13✔
487
                peer_headers: resp.headers,
13✔
488
                peer_fork_hash_index: resp.fork_hash_index,
13✔
489
                chain_split_hash,
13✔
490
            });
13✔
491
        }
492
    }
13✔
493

494
    /// Attempt to determine the point at which the remote and local chain diverge, returning the relevant information
495
    /// of the chain split (see [HeaderSyncStatus]).
496
    ///
497
    /// If the local node is behind the remote chain (i.e. `HeaderSyncStatus::Lagging`), the appropriate
498
    /// `ChainSplitInfo` is returned, the header validator is initialized and the preliminary headers are validated.
499
    async fn determine_sync_status(
13✔
500
        &mut self,
13✔
501
        sync_peer: &SyncPeer,
13✔
502
        best_header: ChainHeader,
13✔
503
        best_block_header: ChainHeader,
13✔
504
        max_reorg_depth_allowed: usize,
13✔
505
        client: &mut rpc::BaseNodeSyncRpcClient,
13✔
506
    ) -> Result<(HeaderSyncStatus, FindChainSplitResult), BlockHeaderSyncError> {
13✔
507
        // This method will return ban-able errors for certain offenses.
508
        let chain_split_result = self
13✔
509
            .find_chain_split(
13✔
510
                sync_peer.node_id(),
13✔
511
                max_reorg_depth_allowed,
13✔
512
                client,
13✔
513
                HEADER_SYNC_INITIAL_MAX_HEADERS as u64,
13✔
514
            )
13✔
515
            .await?;
13✔
516
        if chain_split_result.reorg_steps_back > 0 {
13✔
517
            debug!(
4✔
518
                target: LOG_TARGET,
×
519
                "Found chain split {} blocks back, received {} headers from peer `{}`",
520
                chain_split_result.reorg_steps_back,
521
                chain_split_result.peer_headers.len(),
×
522
                sync_peer
523
            );
524
        }
9✔
525

526
        // If the peer returned no new headers, they may still have more blocks than we have, thus have a higher
527
        // accumulated difficulty.
528
        if chain_split_result.peer_headers.is_empty() {
13✔
529
            // Our POW is less than the peer's POW, as verified before the attempted header sync, therefore, if the
530
            // peer did not supply any headers and we know we are behind based on the peer's claimed metadata, then
531
            // we can ban the peer.
532
            if best_header.height() == best_block_header.height() {
4✔
533
                warn!(
1✔
534
                    target: LOG_TARGET,
×
535
                    "Peer `{}` did not provide any headers although they have a better chain and more headers: their \
536
                    difficulty: {}, our difficulty: {}. Peer will be banned.",
537
                    sync_peer.node_id(),
×
538
                    sync_peer.claimed_chain_metadata().accumulated_difficulty(),
×
539
                    best_block_header.accumulated_data().total_accumulated_difficulty,
×
540
                );
541
                return Err(BlockHeaderSyncError::PeerSentInaccurateChainMetadata {
1✔
542
                    claimed: sync_peer.claimed_chain_metadata().accumulated_difficulty(),
1✔
543
                    actual: None,
1✔
544
                    local: best_block_header.accumulated_data().total_accumulated_difficulty,
1✔
545
                });
1✔
546
            }
3✔
547
            debug!(target: LOG_TARGET, "Peer `{}` sent no headers; headers already in sync with peer.", sync_peer.node_id());
3✔
548
            return Ok((HeaderSyncStatus::InSyncOrAhead, chain_split_result));
3✔
549
        }
9✔
550

551
        let headers = chain_split_result
9✔
552
            .peer_headers
9✔
553
            .clone()
9✔
554
            .into_iter()
9✔
555
            .map(BlockHeader::try_from)
9✔
556
            .collect::<Result<Vec<_>, _>>()
9✔
557
            .map_err(BlockHeaderSyncError::ReceivedInvalidHeader)?;
9✔
558
        let num_new_headers = headers.len();
9✔
559
        // Do a cheap check to verify that we do not have these series of headers in the db already - if the 1st one is
560
        // not there most probably the rest are not either - the peer could still have returned old headers later on in
561
        // the list
562
        if self
9✔
563
            .db
9✔
564
            .fetch_header_by_block_hash(headers.first().expect("Already checked").hash())
9✔
565
            .await?
9✔
566
            .is_some()
9✔
567
        {
568
            return Err(BlockHeaderSyncError::ReceivedInvalidHeader(
×
569
                "Header already in database".to_string(),
×
570
            ));
×
571
        };
9✔
572

573
        self.header_validator
9✔
574
            .initialize_state(&chain_split_result.chain_split_hash)
9✔
575
            .await?;
9✔
576
        for header in headers {
32✔
577
            debug!(
32✔
578
                target: LOG_TARGET,
×
579
                "Validating header #{} (Pow: {}) with hash: ({})",
580
                header.height,
581
                header.pow_algo(),
×
582
                header.hash().to_hex(),
×
583
            );
584
            self.header_validator.validate(header).await?;
32✔
585
        }
586

587
        debug!(
9✔
588
            target: LOG_TARGET,
×
589
            "Peer `{}` has submitted {} valid header(s)", sync_peer.node_id(), num_new_headers
×
590
        );
591

592
        let chain_split_info = ChainSplitInfo {
9✔
593
            best_block_header,
9✔
594
            reorg_steps_back: chain_split_result.reorg_steps_back,
9✔
595
            chain_split_hash: chain_split_result.chain_split_hash,
9✔
596
        };
9✔
597
        Ok((
9✔
598
            HeaderSyncStatus::Lagging(Box::new(chain_split_info)),
9✔
599
            chain_split_result,
9✔
600
        ))
9✔
601
    }
13✔
602

603
    async fn rewind_blockchain(&self, split_hash: HashOutput) -> Result<Vec<Arc<ChainBlock>>, BlockHeaderSyncError> {
1✔
604
        debug!(
1✔
605
            target: LOG_TARGET,
×
606
            "Deleting headers that no longer form part of the main chain up until split at {}",
607
            split_hash.to_hex()
×
608
        );
609

610
        let blocks = self.db.rewind_to_hash(split_hash).await?;
1✔
611
        debug!(
1✔
612
            target: LOG_TARGET,
×
613
            "Rewound {} block(s) in preparation for header sync",
614
            blocks.len()
×
615
        );
616
        Ok(blocks)
1✔
617
    }
1✔
618

619
    #[allow(clippy::too_many_lines)]
620
    async fn synchronize_headers(
9✔
621
        &mut self,
9✔
622
        mut sync_peer: SyncPeer,
9✔
623
        client: &mut rpc::BaseNodeSyncRpcClient,
9✔
624
        split_info: ChainSplitInfo,
9✔
625
        max_latency: Duration,
9✔
626
    ) -> Result<(), BlockHeaderSyncError> {
9✔
627
        info!(target: LOG_TARGET, "Starting header sync from peer {sync_peer}");
9✔
628
        const COMMIT_EVERY_N_HEADERS: usize = 1000;
629

630
        let mut has_switched_to_new_chain = false;
9✔
631
        let pending_len = self.header_validator.valid_headers().len();
9✔
632

633
        // Find the hash to start syncing the rest of the headers.
634
        // The expectation cannot fail because there has been at least one valid header returned (checked in
635
        // determine_sync_status)
636
        let (start_header_height, start_header_hash, total_accumulated_difficulty) = self
9✔
637
            .header_validator
9✔
638
            .current_valid_chain_tip_header()
9✔
639
            .map(|h| (h.height(), *h.hash(), h.accumulated_data().total_accumulated_difficulty))
9✔
640
            .expect("synchronize_headers: expected there to be a valid tip header but it was None");
9✔
641

642
        // If we already have a stronger chain at this point, switch over to it.
643
        // just in case we happen to be exactly HEADER_SYNC_INITIAL_MAX_HEADERS headers behind.
644
        let has_better_pow = self.pending_chain_has_higher_pow(&split_info.best_block_header);
9✔
645

646
        if has_better_pow {
9✔
647
            debug!(
9✔
648
                target: LOG_TARGET,
×
649
                "Remote chain from peer {} has higher PoW. Switching",
650
                sync_peer.node_id()
×
651
            );
652
            self.switch_to_pending_chain(&split_info).await?;
9✔
653
            has_switched_to_new_chain = true;
9✔
654
        }
×
655

656
        if pending_len < HEADER_SYNC_INITIAL_MAX_HEADERS {
9✔
657
            // Peer returned less than the max number of requested headers. This indicates that we have all the
658
            // available headers from the peer.
659
            if !has_better_pow {
9✔
660
                // Because the pow is less or equal than the current chain the peer had to have lied about their pow
661
                debug!(target: LOG_TARGET, "No further headers to download");
×
662
                return Err(BlockHeaderSyncError::PeerSentInaccurateChainMetadata {
×
663
                    claimed: sync_peer.claimed_chain_metadata().accumulated_difficulty(),
×
664
                    actual: Some(total_accumulated_difficulty),
×
665
                    local: split_info
×
666
                        .best_block_header
×
667
                        .accumulated_data()
×
668
                        .total_accumulated_difficulty,
×
669
                });
×
670
            }
9✔
671
            // The pow is higher, we swapped to the higher chain, we have all the better chain headers, we can move on
672
            // to block sync.
673
            return Ok(());
9✔
674
        }
×
675

676
        debug!(
×
677
            target: LOG_TARGET,
×
678
            "Download remaining headers starting from header #{} from peer `{}`",
679
            start_header_height,
680
            sync_peer.node_id()
×
681
        );
682
        let request = SyncHeadersRequest {
×
683
            start_hash: start_header_hash.to_vec(),
×
684
            // To the tip!
×
685
            count: 0,
×
686
        };
×
687

688
        let mut header_stream = client.sync_headers(request).await?;
×
689
        debug!(
×
690
            target: LOG_TARGET,
×
691
            "Reading headers from peer `{}`",
692
            sync_peer.node_id()
×
693
        );
694

695
        let mut last_sync_timer = Instant::now();
×
696

697
        let mut last_total_accumulated_difficulty = U512::zero();
×
698
        let mut avg_latency = RollingAverageTime::new(20);
×
699
        let mut prev_height: Option<u64> = None;
×
700
        while let Some(header) = header_stream.next().await {
×
701
            let latency = last_sync_timer.elapsed();
×
702
            avg_latency.add_sample(latency);
×
703
            let header = BlockHeader::try_from(header?).map_err(BlockHeaderSyncError::ReceivedInvalidHeader)?;
×
704
            debug!(
×
705
                target: LOG_TARGET,
×
706
                "Validating header #{} (Pow: {}) with hash: ({}). Latency: {:.2?}",
707
                header.height,
708
                header.pow_algo(),
×
709
                header.hash().to_hex(),
×
710
                latency
711
            );
712
            trace!(
×
713
                target: LOG_TARGET,
×
714
                "{header}"
715
            );
716
            if let Some(prev_header_height) = prev_height &&
×
717
                header.height != prev_header_height.saturating_add(1)
×
718
            {
719
                warn!(
×
720
                    target: LOG_TARGET,
×
721
                    "Received header #{} `{}` does not follow previous header",
722
                    header.height,
723
                    header.hash().to_hex()
×
724
                );
725
                return Err(BlockHeaderSyncError::ReceivedInvalidHeader(
×
726
                    "Header does not follow previous header".to_string(),
×
727
                ));
×
728
            }
×
729
            let existing_header = self.db.fetch_header_by_block_hash(header.hash()).await?;
×
730
            if let Some(h) = existing_header {
×
731
                warn!(
×
732
                    target: LOG_TARGET,
×
733
                    "Received header #{} `{}` that we already have.",
734
                    h.height,
735
                    h.hash().to_hex()
×
736
                );
737
                return Err(BlockHeaderSyncError::ReceivedInvalidHeader(
×
738
                    "Header already in database".to_string(),
×
739
                ));
×
740
            }
×
741
            let current_height = header.height;
×
742
            last_total_accumulated_difficulty = self.header_validator.validate(header).await?;
×
743

744
            if has_switched_to_new_chain {
×
745
                // If we've switched to the new chain, we simply commit every COMMIT_EVERY_N_HEADERS headers
746
                if self.header_validator.valid_headers().len() >= COMMIT_EVERY_N_HEADERS {
×
747
                    self.commit_pending_headers().await?;
×
748
                }
×
749
            } else {
750
                // The remote chain has not (yet) been accepted.
751
                // We check the tip difficulties, switching over to the new chain if a higher accumulated difficulty is
752
                // achieved.
753
                if self.pending_chain_has_higher_pow(&split_info.best_block_header) {
×
754
                    self.switch_to_pending_chain(&split_info).await?;
×
755
                    has_switched_to_new_chain = true;
×
756
                }
×
757
            }
758

759
            sync_peer.set_latency(latency);
×
760
            sync_peer.add_sample(last_sync_timer.elapsed());
×
761
            self.hooks.call_on_progress_header_hooks(
×
762
                current_height,
×
763
                sync_peer.claimed_chain_metadata().best_block_height(),
×
764
                &sync_peer,
×
765
            );
766

767
            let last_avg_latency = avg_latency.calculate_average_with_min_samples(5);
×
768
            if let Some(avg_latency) = last_avg_latency &&
×
769
                avg_latency > max_latency
×
770
            {
771
                return Err(BlockHeaderSyncError::MaxLatencyExceeded {
×
772
                    peer: sync_peer.node_id().clone(),
×
773
                    latency: avg_latency,
×
774
                    max_latency,
×
775
                });
×
776
            }
×
777

778
            last_sync_timer = Instant::now();
×
779
            prev_height = Some(current_height);
×
780
        }
781

782
        let claimed_total_accumulated_diff = sync_peer.claimed_chain_metadata().accumulated_difficulty();
×
783
        if !has_switched_to_new_chain {
×
784
            let best_local_before_sync = split_info
×
785
                .best_block_header
×
786
                .accumulated_data()
×
787
                .total_accumulated_difficulty;
×
788
            match self
×
789
                .header_validator
×
790
                .current_valid_chain_tip_header()
×
791
                .map(|h| h.accumulated_data().total_accumulated_difficulty)
×
792
            {
793
                Some(validated_total_accumulated_diff) => {
×
794
                    if claimed_total_accumulated_diff > validated_total_accumulated_diff {
×
795
                        // Over-claim: peer advertised more PoW than their headers actually provide.
796
                        return Err(BlockHeaderSyncError::PeerSentInaccurateChainMetadata {
×
797
                            claimed: claimed_total_accumulated_diff,
×
798
                            actual: Some(validated_total_accumulated_diff),
×
799
                            local: best_local_before_sync,
×
800
                        });
×
801
                    } else if self.pending_chain_has_higher_pow(&split_info.best_block_header) {
×
802
                        self.switch_to_pending_chain(&split_info).await?;
×
803
                        has_switched_to_new_chain = true;
×
804
                        info!(
×
805
                            target: LOG_TARGET,
×
806
                            "Received PoW from peer exceeds local tip. Before sync: {}, received: {}. Committed.",
807
                            best_local_before_sync,
808
                            validated_total_accumulated_diff,
809
                        );
810
                    } else {
811
                        // We have a stronger chain, so we do not commit the headers.
812
                        debug!(
×
813
                            target: LOG_TARGET,
×
814
                            "Not committing headers as we have a stronger chain, ours: {} theirs: {}",
815
                            best_local_before_sync,
816
                            validated_total_accumulated_diff,
817
                        );
818
                    };
819
                },
820
                None => {
821
                    // No validated headers at this stage, but there should have been.
822
                    return Err(BlockHeaderSyncError::PeerSentInaccurateChainMetadata {
×
823
                        claimed: claimed_total_accumulated_diff,
×
824
                        actual: None,
×
825
                        local: best_local_before_sync,
×
826
                    });
×
827
                },
828
            }
829
        }
×
830

831
        // Commit the last blocks only if we have switched to the new chain.
832
        if has_switched_to_new_chain && !self.header_validator.valid_headers().is_empty() {
×
833
            self.commit_pending_headers().await?;
×
834
        }
×
835

836
        // This rule is strict: if the peer advertised a higher PoW than they were able to provide (without
837
        // some other external factor like a disconnect etc), we detect the and ban the peer.
838
        if last_total_accumulated_difficulty < claimed_total_accumulated_diff {
×
839
            return Err(BlockHeaderSyncError::PeerSentInaccurateChainMetadata {
×
840
                claimed: claimed_total_accumulated_diff,
×
841
                actual: Some(last_total_accumulated_difficulty),
×
842
                local: split_info
×
843
                    .best_block_header
×
844
                    .accumulated_data()
×
845
                    .total_accumulated_difficulty,
×
846
            });
×
847
        }
×
848

849
        Ok(())
×
850
    }
9✔
851

852
    async fn commit_pending_headers(&mut self) -> Result<ChainHeader, BlockHeaderSyncError> {
9✔
853
        let chain_headers = self.header_validator.take_valid_headers();
9✔
854
        let num_headers = chain_headers.len();
9✔
855
        let start = Instant::now();
9✔
856

857
        let new_tip = chain_headers.last().cloned().unwrap();
9✔
858
        let mut txn = self.db.write_transaction();
9✔
859
        chain_headers.into_iter().for_each(|chain_header| {
32✔
860
            txn.insert_chain_header(chain_header);
32✔
861
        });
32✔
862

863
        txn.commit().await?;
9✔
864

865
        debug!(
9✔
866
            target: LOG_TARGET,
×
867
            "{} header(s) committed (tip = {}) to the blockchain db in {:.2?}",
868
            num_headers,
869
            new_tip.height(),
×
870
            start.elapsed()
×
871
        );
872

873
        Ok(new_tip)
9✔
874
    }
9✔
875

876
    fn pending_chain_has_higher_pow(&self, current_tip: &ChainHeader) -> bool {
9✔
877
        let chain_headers = self.header_validator.valid_headers();
9✔
878
        if chain_headers.is_empty() {
9✔
879
            return false;
×
880
        }
9✔
881

882
        // Check that the remote tip is stronger than the local tip, equal should not have ended up here, so we treat
883
        // equal as less
884
        let proposed_tip = chain_headers.last().unwrap();
9✔
885
        self.header_validator.compare_chains(current_tip, proposed_tip).is_lt()
9✔
886
    }
9✔
887

888
    async fn switch_to_pending_chain(&mut self, split_info: &ChainSplitInfo) -> Result<(), BlockHeaderSyncError> {
9✔
889
        // Reorg if required
890
        if split_info.reorg_steps_back > 0 {
9✔
891
            debug!(
1✔
892
                target: LOG_TARGET,
×
893
                "Reorg: Rewinding the chain by {} block(s) (split hash = {})",
894
                split_info.reorg_steps_back,
895
                split_info.chain_split_hash.to_hex()
×
896
            );
897
            let blocks = self.rewind_blockchain(split_info.chain_split_hash).await?;
1✔
898
            if !blocks.is_empty() {
1✔
899
                self.hooks.call_on_rewind_hooks(blocks);
1✔
900
            }
1✔
901
        }
8✔
902

903
        // Commit the forked chain. At this point
904
        // 1. Headers have been validated
905
        // 2. The forked chain has a higher PoW than the local chain
906
        //
907
        // After this we commit headers every `n` blocks
908
        self.commit_pending_headers().await?;
9✔
909

910
        Ok(())
9✔
911
    }
9✔
912

913
    // Sync peers are also removed from the list of sync peers if the ban duration is longer than the short ban period.
914
    fn remove_sync_peer(&mut self, node_id: &NodeId) {
1✔
915
        if let Some(pos) = self.sync_peers.iter().position(|p| p.node_id() == node_id) {
1✔
916
            self.sync_peers.remove(pos);
1✔
917
        }
1✔
918
    }
1✔
919

920
    // Helper function to get the index to the node_id inside of the vec of peers
921
    fn get_sync_peer_index(&mut self, node_id: &NodeId) -> Option<usize> {
13✔
922
        self.sync_peers.iter().position(|p| p.node_id() == node_id)
13✔
923
    }
13✔
924
}
925

926
#[derive(Debug, Clone)]
927
struct FindChainSplitResult {
928
    reorg_steps_back: u64,
929
    peer_headers: Vec<ProtoBlockHeader>,
930
    peer_fork_hash_index: u64,
931
    chain_split_hash: HashOutput,
932
}
933

934
/// Information about the chain split from the remote node.
935
#[derive(Debug, Clone, PartialEq)]
936
pub struct ChainSplitInfo {
937
    /// The best block's header on the local chain.
938
    pub best_block_header: ChainHeader,
939
    /// The number of blocks to reorg back to the fork.
940
    pub reorg_steps_back: u64,
941
    /// The hash of the block at the fork.
942
    pub chain_split_hash: HashOutput,
943
}
944

945
/// The result of an attempt to synchronize headers with a peer.
946
#[derive(Debug, Clone, PartialEq)]
947
pub struct AttemptSyncResult {
948
    /// The number of headers that were returned.
949
    pub headers_returned: u64,
950
    /// The fork hash index of the remote peer.
951
    pub peer_fork_hash_index: u64,
952
    /// The header sync status.
953
    pub header_sync_status: HeaderSyncStatus,
954
}
955

956
#[derive(Debug, Clone, PartialEq)]
957
pub enum HeaderSyncStatus {
958
    /// Local and remote node are in sync or ahead
959
    InSyncOrAhead,
960
    /// Local node is lagging behind remote node
961
    Lagging(Box<ChainSplitInfo>),
962
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc