• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 15853103462

24 Jun 2025 02:21PM UTC coverage: 71.683% (-0.7%) from 72.398%
15853103462

push

github

web-flow
feat: offline signing (#7122)

Description
---

Adds the following features.

### CLI `--skip-recovery` option.

Cold wallets need to run in environments without Internet connection.
Console wallet created from seed words will require full initial
recovery to proceed with other functionality.
This switch allows to skip the recovery step.

### "prepare-one-sided-transaction-for-signing" CLI command

```sh
minotari_console_wallet prepare-one-sided-transaction-for-signing --output-file <unsigned_tx_file> <amount> <recipient_address>
```

Supposed to be run on _hot side_. This will create an unsigned
transaction request, which needs to be signed on cold side.

### "sign-one-sided-transaction" CLI command

```sh
minotari_console_wallet sign-one-sided-transaction --input-file <unsigned_tx_file> --output-file <signed_tx_file>
```

Supposed to be run on _cold side_. Signs the transaction using **private
spend key**.

### "broadcast-signed-one-sided-transaction" CLI command

```sh
minotari_console_wallet broadcast-signed-one-sided-transaction --input-file <signed_tx_file>
```

Supposed to be run on _hot side_. Broadcasts the signed transaction to
Tari network (mempool)

### GRPC methods to be run on _hot side_

* PrepareOneSidedTransactionForSigning
* BroadcastSignedOneSidedTransaction

Motivation and Context
---

Exchanges are requesting a way to have offline signing.
Basically, they want two wallets:
- a hot wallet, which uses view key and public spend key
- cold wallet with private spend key, not connected to internet

The signing process would look like follows.
1. Hot wallet is requested to create an unsigned transaction (from
recipient address and amount). There will be an option to use CLI to
create a file, which contains unsigned transaction
2. Unisigned transaction will be airgap transferred to cold wallet,
where it is signed via CLI. The output will be another file containing a
signed transaction
3. Signed transaction file is transferred to hot walle... (continued)

0 of 851 new or added lines in 9 files covered. (0.0%)

301 existing lines in 23 files now uncovered.

82846 of 115572 relevant lines covered (71.68%)

239176.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

50.65
/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs
1
//  Copyright 2020, The Tari Project
2
//
3
//  Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
//  following conditions are met:
5
//
6
//  1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
//  disclaimer.
8
//
9
//  2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
//  following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
//  3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
//  products derived from this software without specific prior written permission.
14
//
15
//  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
//  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
//  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
//  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
//  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
//  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
//  USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22
use std::{
23
    convert::TryFrom,
24
    sync::Arc,
25
    time::{Duration, Instant},
26
};
27

28
use futures::StreamExt;
29
use log::*;
30
use primitive_types::U512;
31
use tari_common_types::{chain_metadata::ChainMetadata, types::HashOutput};
32
use tari_comms::{
33
    connectivity::ConnectivityRequester,
34
    peer_manager::NodeId,
35
    protocol::rpc::{RpcClient, RpcError},
36
    PeerConnection,
37
};
38
use tari_utilities::hex::Hex;
39

40
use super::{validator::BlockHeaderSyncValidator, BlockHeaderSyncError};
41
use crate::{
42
    base_node::sync::{
43
        ban::PeerBanManager,
44
        header_sync::HEADER_SYNC_INITIAL_MAX_HEADERS,
45
        hooks::Hooks,
46
        rpc,
47
        BlockchainSyncConfig,
48
        SyncPeer,
49
    },
50
    blocks::{BlockHeader, ChainBlock, ChainHeader},
51
    chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend, ChainStorageError},
52
    common::{rolling_avg::RollingAverageTime, BanPeriod},
53
    consensus::ConsensusManager,
54
    proof_of_work::randomx_factory::RandomXFactory,
55
    proto::{
56
        base_node::{FindChainSplitRequest, SyncHeadersRequest},
57
        core::BlockHeader as ProtoBlockHeader,
58
    },
59
};
60

61
const LOG_TARGET: &str = "c::bn::header_sync";
62

63
const MAX_LATENCY_INCREASES: usize = 5;
64

65
pub struct HeaderSynchronizer<'a, B> {
66
    config: BlockchainSyncConfig,
67
    db: AsyncBlockchainDb<B>,
68
    header_validator: BlockHeaderSyncValidator<B>,
69
    connectivity: ConnectivityRequester,
70
    sync_peers: &'a mut Vec<SyncPeer>,
71
    hooks: Hooks,
72
    local_cached_metadata: &'a ChainMetadata,
73
    peer_ban_manager: PeerBanManager,
74
}
75

76
impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {
77
    pub fn new(
12✔
78
        config: BlockchainSyncConfig,
12✔
79
        db: AsyncBlockchainDb<B>,
12✔
80
        consensus_rules: ConsensusManager,
12✔
81
        connectivity: ConnectivityRequester,
12✔
82
        sync_peers: &'a mut Vec<SyncPeer>,
12✔
83
        randomx_factory: RandomXFactory,
12✔
84
        local_metadata: &'a ChainMetadata,
12✔
85
    ) -> Self {
12✔
86
        let peer_ban_manager = PeerBanManager::new(config.clone(), connectivity.clone());
12✔
87
        Self {
12✔
88
            config,
12✔
89
            header_validator: BlockHeaderSyncValidator::new(db.clone(), consensus_rules, randomx_factory),
12✔
90
            db,
12✔
91
            connectivity,
12✔
92
            sync_peers,
12✔
93
            hooks: Default::default(),
12✔
94
            local_cached_metadata: local_metadata,
12✔
95
            peer_ban_manager,
12✔
96
        }
12✔
97
    }
12✔
98

99
    pub fn on_starting<H>(&mut self, hook: H)
12✔
100
    where for<'r> H: FnOnce(&SyncPeer) + Send + Sync + 'static {
12✔
101
        self.hooks.add_on_starting_hook(hook);
12✔
102
    }
12✔
103

104
    pub fn on_progress<H>(&mut self, hook: H)
12✔
105
    where H: Fn(u64, u64, &SyncPeer) + Send + Sync + 'static {
12✔
106
        self.hooks.add_on_progress_header_hook(hook);
12✔
107
    }
12✔
108

109
    pub fn on_rewind<H>(&mut self, hook: H)
12✔
110
    where H: Fn(Vec<Arc<ChainBlock>>) + Send + Sync + 'static {
12✔
111
        self.hooks.add_on_rewind_hook(hook);
12✔
112
    }
12✔
113

114
    pub async fn synchronize(&mut self) -> Result<(SyncPeer, AttemptSyncResult), BlockHeaderSyncError> {
12✔
115
        debug!(target: LOG_TARGET, "Starting header sync.",);
12✔
116

117
        info!(
12✔
118
            target: LOG_TARGET,
×
119
            "Synchronizing headers ({} candidate peers selected)",
×
120
            self.sync_peers.len()
×
121
        );
122
        let mut max_latency = self.config.initial_max_sync_latency;
12✔
123
        let mut latency_increases_counter = 0;
12✔
124
        loop {
125
            match self.try_sync_from_all_peers(max_latency).await {
12✔
126
                Ok((peer, sync_result)) => break Ok((peer, sync_result)),
2✔
127
                Err(err @ BlockHeaderSyncError::AllSyncPeersExceedLatency) => {
×
128
                    // If we have few sync peers, throw this out to be retried later
×
129
                    if self.sync_peers.len() < 2 {
×
130
                        return Err(err);
×
131
                    }
×
132
                    max_latency += self.config.max_latency_increase;
×
133
                    latency_increases_counter += 1;
×
134
                    if latency_increases_counter > MAX_LATENCY_INCREASES {
×
135
                        return Err(err);
×
136
                    }
×
137
                },
138
                Err(err) => break Err(err),
10✔
139
            }
140
        }
141
    }
12✔
142

143
    #[allow(clippy::too_many_lines)]
144
    pub async fn try_sync_from_all_peers(
12✔
145
        &mut self,
12✔
146
        max_latency: Duration,
12✔
147
    ) -> Result<(SyncPeer, AttemptSyncResult), BlockHeaderSyncError> {
12✔
148
        let sync_peer_node_ids = self.sync_peers.iter().map(|p| p.node_id()).cloned().collect::<Vec<_>>();
12✔
149
        info!(
12✔
150
            target: LOG_TARGET,
×
151
            "Attempting to sync headers ({} sync peers)",
×
152
            sync_peer_node_ids.len()
×
153
        );
154
        let mut latency_counter = 0usize;
12✔
155
        for node_id in sync_peer_node_ids {
22✔
156
            match self.connect_and_attempt_sync(&node_id, max_latency).await {
12✔
157
                Ok((peer, sync_result)) => return Ok((peer, sync_result)),
2✔
158
                Err(err) => {
10✔
159
                    let ban_reason = BlockHeaderSyncError::get_ban_reason(&err);
10✔
160
                    if let Some(reason) = ban_reason {
10✔
161
                        warn!(target: LOG_TARGET, "{}", err);
×
162
                        let duration = match reason.ban_duration {
×
163
                            BanPeriod::Short => self.config.short_ban_period,
×
164
                            BanPeriod::Long => self.config.ban_period,
×
165
                        };
166
                        self.peer_ban_manager
×
167
                            .ban_peer_if_required(&node_id, reason.reason, duration)
×
168
                            .await;
×
169
                    }
10✔
170
                    if let BlockHeaderSyncError::MaxLatencyExceeded { .. } = err {
10✔
171
                        latency_counter += 1;
×
172
                    } else {
10✔
173
                        self.remove_sync_peer(&node_id);
10✔
174
                    }
10✔
175
                },
176
            }
177
        }
178

179
        if self.sync_peers.is_empty() {
10✔
180
            Err(BlockHeaderSyncError::NoMoreSyncPeers("Header sync failed".to_string()))
10✔
181
        } else if latency_counter >= self.sync_peers.len() {
×
182
            Err(BlockHeaderSyncError::AllSyncPeersExceedLatency)
×
183
        } else {
184
            Err(BlockHeaderSyncError::SyncFailedAllPeers)
×
185
        }
186
    }
12✔
187

188
    async fn connect_and_attempt_sync(
12✔
189
        &mut self,
12✔
190
        node_id: &NodeId,
12✔
191
        max_latency: Duration,
12✔
192
    ) -> Result<(SyncPeer, AttemptSyncResult), BlockHeaderSyncError> {
12✔
193
        let peer_index = self
12✔
194
            .get_sync_peer_index(node_id)
12✔
195
            .ok_or(BlockHeaderSyncError::PeerNotFound)?;
12✔
196
        let sync_peer = &self.sync_peers[peer_index];
12✔
197
        self.hooks.call_on_starting_hook(sync_peer);
12✔
198

199
        let mut conn = self.dial_sync_peer(node_id).await?;
12✔
200
        debug!(
2✔
201
            target: LOG_TARGET,
×
202
            "Attempting to synchronize headers with `{}`", node_id
×
203
        );
204

205
        let config = RpcClient::builder()
2✔
206
            .with_deadline(self.config.rpc_deadline)
2✔
207
            .with_deadline_grace_period(Duration::from_secs(5));
2✔
208
        let mut client = conn
2✔
209
            .connect_rpc_using_builder::<rpc::BaseNodeSyncRpcClient>(config)
2✔
210
            .await?;
2✔
211

212
        let latency = client
2✔
213
            .get_last_request_latency()
2✔
214
            .expect("unreachable panic: last request latency must be set after connect");
2✔
215
        self.sync_peers[peer_index].set_latency(latency);
2✔
216
        if latency > max_latency {
2✔
217
            return Err(BlockHeaderSyncError::MaxLatencyExceeded {
×
218
                peer: conn.peer_node_id().clone(),
×
219
                latency,
×
220
                max_latency,
×
221
            });
×
222
        }
2✔
223

2✔
224
        debug!(target: LOG_TARGET, "Sync peer latency is {:.2?}", latency);
2✔
225
        let sync_peer = self.sync_peers[peer_index].clone();
2✔
226
        let sync_result = self.attempt_sync(&sync_peer, client, max_latency).await?;
2✔
227
        Ok((sync_peer, sync_result))
2✔
228
    }
12✔
229

230
    async fn dial_sync_peer(&self, node_id: &NodeId) -> Result<PeerConnection, BlockHeaderSyncError> {
12✔
231
        let timer = Instant::now();
12✔
232
        debug!(target: LOG_TARGET, "Dialing {} sync peer", node_id);
12✔
233
        let conn = self.connectivity.dial_peer(node_id.clone()).await?;
12✔
234
        info!(
2✔
235
            target: LOG_TARGET,
×
236
            "Successfully dialed sync peer {} in {:.2?}",
×
237
            node_id,
×
238
            timer.elapsed()
×
239
        );
240
        Ok(conn)
2✔
241
    }
12✔
242

243
    async fn attempt_sync(
2✔
244
        &mut self,
2✔
245
        sync_peer: &SyncPeer,
2✔
246
        mut client: rpc::BaseNodeSyncRpcClient,
2✔
247
        max_latency: Duration,
2✔
248
    ) -> Result<AttemptSyncResult, BlockHeaderSyncError> {
2✔
249
        let latency = client.get_last_request_latency();
2✔
250
        debug!(
2✔
251
            target: LOG_TARGET,
×
252
            "Initiating header sync with peer `{}` (sync latency = {}ms)",
×
253
            sync_peer.node_id(),
×
254
            latency.unwrap_or_default().as_millis()
×
255
        );
256

257
        // Fetch best local data at the beginning of the sync process
258
        let best_block_metadata = self.db.get_chain_metadata().await?;
2✔
259
        let best_header = self.db.fetch_last_chain_header().await?;
2✔
260
        let best_block_header = self
2✔
261
            .db
2✔
262
            .fetch_chain_header(best_block_metadata.best_block_height())
2✔
263
            .await?;
2✔
264
        let best_header_height = best_header.height();
2✔
265
        let best_block_height = best_block_header.height();
2✔
266

2✔
267
        if best_header_height < best_block_height || best_block_height < self.local_cached_metadata.best_block_height()
2✔
268
        {
269
            return Err(BlockHeaderSyncError::ChainStorageError(
×
270
                ChainStorageError::CorruptedDatabase("Inconsistent block and header data".to_string()),
×
271
            ));
×
272
        }
2✔
273

274
        // - At this point we may have more (InSyncOrAhead), equal (InSyncOrAhead), or less headers (Lagging) than the
275
        //   peer, but they claimed better POW before we attempted sync.
276
        // - This method will return ban-able errors for certain offenses.
277
        let (header_sync_status, peer_response) = self
2✔
278
            .determine_sync_status(sync_peer, best_header.clone(), best_block_header.clone(), &mut client)
2✔
279
            .await?;
2✔
280

281
        match header_sync_status.clone() {
2✔
282
            HeaderSyncStatus::InSyncOrAhead => {
283
                debug!(
1✔
284
                    target: LOG_TARGET,
×
285
                    "Headers are in sync at height {} but tip is {}. Proceeding to archival/pruned block sync",
×
286
                    best_header_height,
287
                    best_block_height
288
                );
289

290
                Ok(AttemptSyncResult {
1✔
291
                    headers_returned: peer_response.peer_headers.len() as u64,
1✔
292
                    peer_fork_hash_index: peer_response.peer_fork_hash_index,
1✔
293
                    header_sync_status,
1✔
294
                })
1✔
295
            },
296
            HeaderSyncStatus::Lagging(split_info) => {
1✔
297
                self.hooks.call_on_progress_header_hooks(
1✔
298
                    split_info
1✔
299
                        .best_block_header
1✔
300
                        .height()
1✔
301
                        .checked_sub(split_info.reorg_steps_back)
1✔
302
                        .unwrap_or_default(),
1✔
303
                    sync_peer.claimed_chain_metadata().best_block_height(),
1✔
304
                    sync_peer,
1✔
305
                );
1✔
306
                self.synchronize_headers(sync_peer.clone(), &mut client, *split_info, max_latency)
1✔
307
                    .await?;
1✔
308
                Ok(AttemptSyncResult {
1✔
309
                    headers_returned: peer_response.peer_headers.len() as u64,
1✔
310
                    peer_fork_hash_index: peer_response.peer_fork_hash_index,
1✔
311
                    header_sync_status,
1✔
312
                })
1✔
313
            },
314
        }
315
    }
2✔
316

317
    #[allow(clippy::too_many_lines)]
318
    async fn find_chain_split(
2✔
319
        &mut self,
2✔
320
        peer_node_id: &NodeId,
2✔
321
        client: &mut rpc::BaseNodeSyncRpcClient,
2✔
322
        header_count: u64,
2✔
323
    ) -> Result<FindChainSplitResult, BlockHeaderSyncError> {
2✔
324
        const NUM_CHAIN_SPLIT_HEADERS: usize = 500;
325
        // Limit how far back we're willing to go. A peer might just say it does not have a chain split
326
        // and keep us busy going back until the genesis.
327
        // 20 x 500 = max 10,000 block split can be detected
328
        const MAX_CHAIN_SPLIT_ITERS: usize = 20;
329

330
        let mut offset = 0;
2✔
331
        let mut iter_count = 0;
2✔
332
        loop {
333
            iter_count += 1;
2✔
334
            if iter_count > MAX_CHAIN_SPLIT_ITERS {
2✔
335
                warn!(
×
336
                    target: LOG_TARGET,
×
337
                    "Peer `{}` did not provide a chain split after {} headers requested. Peer will be banned.",
×
338
                    peer_node_id,
×
339
                    NUM_CHAIN_SPLIT_HEADERS * MAX_CHAIN_SPLIT_ITERS,
×
340
                );
341
                return Err(BlockHeaderSyncError::ChainSplitNotFound(peer_node_id.clone()));
×
342
            }
2✔
343

344
            let block_hashes = self
2✔
345
                .db
2✔
346
                .fetch_block_hashes_from_header_tip(NUM_CHAIN_SPLIT_HEADERS, offset)
2✔
347
                .await?;
2✔
348
            debug!(
2✔
349
                target: LOG_TARGET,
×
350
                "Determining if chain splits between {} and {} headers back from the tip (peer: `{}`, {} hashes sent)",
×
351
                offset,
×
352
                offset + NUM_CHAIN_SPLIT_HEADERS,
×
353
                peer_node_id,
×
354
                block_hashes.len()
×
355
            );
356

357
            // No further hashes to send.
358
            if block_hashes.is_empty() {
2✔
359
                warn!(
×
360
                    target: LOG_TARGET,
×
361
                    "Peer `{}` did not provide a chain split after {} headers requested. Peer will be banned.",
×
362
                    peer_node_id,
×
363
                    NUM_CHAIN_SPLIT_HEADERS * MAX_CHAIN_SPLIT_ITERS,
×
364
                );
365
                return Err(BlockHeaderSyncError::ChainSplitNotFound(peer_node_id.clone()));
×
366
            }
2✔
367

2✔
368
            let request = FindChainSplitRequest {
2✔
369
                block_hashes: block_hashes.clone().iter().map(|v| v.to_vec()).collect(),
3✔
370
                header_count,
2✔
371
            };
2✔
372

373
            let resp = match client.find_chain_split(request).await {
2✔
374
                Ok(r) => r,
2✔
375
                Err(RpcError::RequestFailed(err)) if err.as_status_code().is_not_found() => {
×
376
                    // This round we sent less hashes than the max, so the next round will not have any more hashes to
×
377
                    // send. Exit early in this case.
×
378
                    if block_hashes.len() < NUM_CHAIN_SPLIT_HEADERS {
×
379
                        warn!(
×
380
                            target: LOG_TARGET,
×
381
                            "Peer `{}` did not provide a chain split after {} headers requested. Peer will be banned.",
×
382
                            peer_node_id,
×
383
                            NUM_CHAIN_SPLIT_HEADERS * MAX_CHAIN_SPLIT_ITERS,
×
384
                        );
385
                        return Err(BlockHeaderSyncError::ChainSplitNotFound(peer_node_id.clone()));
×
386
                    }
×
387
                    // Chain split not found, let's go further back
×
388
                    offset = NUM_CHAIN_SPLIT_HEADERS * iter_count;
×
389
                    continue;
×
390
                },
391
                Err(err) => {
×
392
                    return Err(err.into());
×
393
                },
394
            };
395
            if resp.headers.len() > HEADER_SYNC_INITIAL_MAX_HEADERS {
2✔
396
                warn!(
×
397
                    target: LOG_TARGET,
×
398
                    "Peer `{}` sent too many headers {}, only requested {}. Peer will be banned.",
×
399
                    peer_node_id,
×
400
                    resp.headers.len(),
×
401
                    HEADER_SYNC_INITIAL_MAX_HEADERS,
402
                );
403
                return Err(BlockHeaderSyncError::PeerSentTooManyHeaders(resp.headers.len()));
×
404
            }
2✔
405
            if resp.fork_hash_index >= block_hashes.len() as u64 {
2✔
406
                warn!(
×
407
                    target: LOG_TARGET,
×
408
                    "Peer `{}` sent hash index {} out of range {}. Peer will be banned.",
×
409
                    peer_node_id,
×
410
                    resp.fork_hash_index,
×
411
                    block_hashes.len(),
×
412
                );
413
                return Err(BlockHeaderSyncError::FoundHashIndexOutOfRange(
×
414
                    block_hashes.len() as u64,
×
415
                    resp.fork_hash_index,
×
416
                ));
×
417
            }
2✔
418
            #[allow(clippy::cast_possible_truncation)]
2✔
419
            if !resp.headers.is_empty() && resp.headers[0].prev_hash != block_hashes[resp.fork_hash_index as usize] {
2✔
420
                warn!(
×
421
                    target: LOG_TARGET,
×
422
                    "Peer `{}` sent hash an invalid protocol response, incorrect fork hash index {}. Peer will be banned.",
×
423
                    peer_node_id,
424
                    resp.fork_hash_index,
425
                );
426
                return Err(BlockHeaderSyncError::InvalidProtocolResponse(
×
427
                    "Peer sent incorrect fork hash index".into(),
×
428
                ));
×
429
            }
2✔
430
            #[allow(clippy::cast_possible_truncation)]
2✔
431
            let chain_split_hash = block_hashes[resp.fork_hash_index as usize];
2✔
432

2✔
433
            return Ok(FindChainSplitResult {
2✔
434
                reorg_steps_back: resp.fork_hash_index.saturating_add(offset as u64),
2✔
435
                peer_headers: resp.headers,
2✔
436
                peer_fork_hash_index: resp.fork_hash_index,
2✔
437
                chain_split_hash,
2✔
438
            });
2✔
439
        }
440
    }
2✔
441

442
    /// Attempt to determine the point at which the remote and local chain diverge, returning the relevant information
443
    /// of the chain split (see [HeaderSyncStatus]).
444
    ///
445
    /// If the local node is behind the remote chain (i.e. `HeaderSyncStatus::Lagging`), the appropriate
446
    /// `ChainSplitInfo` is returned, the header validator is initialized and the preliminary headers are validated.
447
    async fn determine_sync_status(
2✔
448
        &mut self,
2✔
449
        sync_peer: &SyncPeer,
2✔
450
        best_header: ChainHeader,
2✔
451
        best_block_header: ChainHeader,
2✔
452
        client: &mut rpc::BaseNodeSyncRpcClient,
2✔
453
    ) -> Result<(HeaderSyncStatus, FindChainSplitResult), BlockHeaderSyncError> {
2✔
454
        // This method will return ban-able errors for certain offenses.
455
        let chain_split_result = self
2✔
456
            .find_chain_split(sync_peer.node_id(), client, HEADER_SYNC_INITIAL_MAX_HEADERS as u64)
2✔
457
            .await?;
2✔
458
        if chain_split_result.reorg_steps_back > 0 {
2✔
UNCOV
459
            debug!(
×
460
                target: LOG_TARGET,
×
461
                "Found chain split {} blocks back, received {} headers from peer `{}`",
×
462
                chain_split_result.reorg_steps_back,
×
463
                chain_split_result.peer_headers.len(),
×
464
                sync_peer
465
            );
466
        }
2✔
467

468
        // If the peer returned no new headers, they may still have more blocks than we have, thus have a higher
469
        // accumulated difficulty.
470
        if chain_split_result.peer_headers.is_empty() {
2✔
471
            // Our POW is less than the peer's POW, as verified before the attempted header sync, therefore, if the
472
            // peer did not supply any headers and we know we are behind based on the peer's claimed metadata, then
473
            // we can ban the peer.
474
            if best_header.height() == best_block_header.height() {
1✔
475
                warn!(
×
476
                    target: LOG_TARGET,
×
477
                    "Peer `{}` did not provide any headers although they have a better chain and more headers: their \
×
478
                    difficulty: {}, our difficulty: {}. Peer will be banned.",
×
479
                    sync_peer.node_id(),
×
480
                    sync_peer.claimed_chain_metadata().accumulated_difficulty(),
×
481
                    best_block_header.accumulated_data().total_accumulated_difficulty,
×
482
                );
483
                return Err(BlockHeaderSyncError::PeerSentInaccurateChainMetadata {
×
484
                    claimed: sync_peer.claimed_chain_metadata().accumulated_difficulty(),
×
485
                    actual: None,
×
486
                    local: best_block_header.accumulated_data().total_accumulated_difficulty,
×
487
                });
×
488
            }
1✔
489
            debug!(target: LOG_TARGET, "Peer `{}` sent no headers; headers already in sync with peer.", sync_peer.node_id());
1✔
490
            return Ok((HeaderSyncStatus::InSyncOrAhead, chain_split_result));
1✔
491
        }
1✔
492

493
        let headers = chain_split_result
1✔
494
            .peer_headers
1✔
495
            .clone()
1✔
496
            .into_iter()
1✔
497
            .map(BlockHeader::try_from)
1✔
498
            .collect::<Result<Vec<_>, _>>()
1✔
499
            .map_err(BlockHeaderSyncError::ReceivedInvalidHeader)?;
1✔
500
        let num_new_headers = headers.len();
1✔
501
        // Do a cheap check to verify that we do not have these series of headers in the db already - if the 1st one is
1✔
502
        // not there most probably the rest are not either - the peer could still have returned old headers later on in
1✔
503
        // the list
1✔
504
        if self.db.fetch_header_by_block_hash(headers[0].hash()).await?.is_some() {
1✔
505
            return Err(BlockHeaderSyncError::ReceivedInvalidHeader(
×
506
                "Header already in database".to_string(),
×
507
            ));
×
508
        };
1✔
509

1✔
510
        self.header_validator
1✔
511
            .initialize_state(&chain_split_result.chain_split_hash)
1✔
512
            .await?;
1✔
513
        for header in headers {
2✔
514
            debug!(
1✔
515
                target: LOG_TARGET,
×
516
                "Validating header #{} (Pow: {}) with hash: ({})",
×
517
                header.height,
×
518
                header.pow_algo(),
×
519
                header.hash().to_hex(),
×
520
            );
521
            self.header_validator.validate(header).await?;
1✔
522
        }
523

524
        debug!(
1✔
525
            target: LOG_TARGET,
×
526
            "Peer `{}` has submitted {} valid header(s)", sync_peer.node_id(), num_new_headers
×
527
        );
528

529
        let chain_split_info = ChainSplitInfo {
1✔
530
            best_block_header,
1✔
531
            reorg_steps_back: chain_split_result.reorg_steps_back,
1✔
532
            chain_split_hash: chain_split_result.chain_split_hash,
1✔
533
        };
1✔
534
        Ok((
1✔
535
            HeaderSyncStatus::Lagging(Box::new(chain_split_info)),
1✔
536
            chain_split_result,
1✔
537
        ))
1✔
538
    }
2✔
539

540
    async fn rewind_blockchain(&self, split_hash: HashOutput) -> Result<Vec<Arc<ChainBlock>>, BlockHeaderSyncError> {
×
541
        debug!(
×
542
            target: LOG_TARGET,
×
543
            "Deleting headers that no longer form part of the main chain up until split at {}",
×
544
            split_hash.to_hex()
×
545
        );
546

547
        let blocks = self.db.rewind_to_hash(split_hash).await?;
×
548
        debug!(
×
549
            target: LOG_TARGET,
×
550
            "Rewound {} block(s) in preparation for header sync",
×
551
            blocks.len()
×
552
        );
553
        Ok(blocks)
×
554
    }
×
555

556
    #[allow(clippy::too_many_lines)]
557
    async fn synchronize_headers(
1✔
558
        &mut self,
1✔
559
        mut sync_peer: SyncPeer,
1✔
560
        client: &mut rpc::BaseNodeSyncRpcClient,
1✔
561
        split_info: ChainSplitInfo,
1✔
562
        max_latency: Duration,
1✔
563
    ) -> Result<(), BlockHeaderSyncError> {
1✔
564
        info!(target: LOG_TARGET, "Starting header sync from peer {}", sync_peer);
1✔
565
        const COMMIT_EVERY_N_HEADERS: usize = 1000;
566

567
        let mut has_switched_to_new_chain = false;
1✔
568
        let pending_len = self.header_validator.valid_headers().len();
1✔
569

1✔
570
        // Find the hash to start syncing the rest of the headers.
1✔
571
        // The expectation cannot fail because there has been at least one valid header returned (checked in
1✔
572
        // determine_sync_status)
1✔
573
        let (start_header_height, start_header_hash, total_accumulated_difficulty) = self
1✔
574
            .header_validator
1✔
575
            .current_valid_chain_tip_header()
1✔
576
            .map(|h| (h.height(), *h.hash(), h.accumulated_data().total_accumulated_difficulty))
1✔
577
            .expect("synchronize_headers: expected there to be a valid tip header but it was None");
1✔
578

1✔
579
        // If we already have a stronger chain at this point, switch over to it.
1✔
580
        // just in case we happen to be exactly HEADER_SYNC_INITIAL_MAX_HEADERS headers behind.
1✔
581
        let has_better_pow = self.pending_chain_has_higher_pow(&split_info.best_block_header);
1✔
582

1✔
583
        if has_better_pow {
1✔
584
            debug!(
1✔
585
                target: LOG_TARGET,
×
586
                "Remote chain from peer {} has higher PoW. Switching",
×
587
                sync_peer.node_id()
×
588
            );
589
            self.switch_to_pending_chain(&split_info).await?;
1✔
590
            has_switched_to_new_chain = true;
1✔
591
        }
×
592

593
        if pending_len < HEADER_SYNC_INITIAL_MAX_HEADERS {
1✔
594
            // Peer returned less than the max number of requested headers. This indicates that we have all the
595
            // available headers from the peer.
596
            if !has_better_pow {
1✔
597
                // Because the pow is less or equal than the current chain the peer had to have lied about their pow
598
                debug!(target: LOG_TARGET, "No further headers to download");
×
599
                return Err(BlockHeaderSyncError::PeerSentInaccurateChainMetadata {
×
600
                    claimed: sync_peer.claimed_chain_metadata().accumulated_difficulty(),
×
601
                    actual: Some(total_accumulated_difficulty),
×
602
                    local: split_info
×
603
                        .best_block_header
×
604
                        .accumulated_data()
×
605
                        .total_accumulated_difficulty,
×
606
                });
×
607
            }
1✔
608
            // The pow is higher, we swapped to the higher chain, we have all the better chain headers, we can move on
1✔
609
            // to block sync.
1✔
610
            return Ok(());
1✔
611
        }
×
612

×
613
        debug!(
×
614
            target: LOG_TARGET,
×
615
            "Download remaining headers starting from header #{} from peer `{}`",
×
616
            start_header_height,
×
617
            sync_peer.node_id()
×
618
        );
619
        let request = SyncHeadersRequest {
×
620
            start_hash: start_header_hash.to_vec(),
×
621
            // To the tip!
×
622
            count: 0,
×
623
        };
×
624

625
        let mut header_stream = client.sync_headers(request).await?;
×
626
        debug!(
×
627
            target: LOG_TARGET,
×
628
            "Reading headers from peer `{}`",
×
629
            sync_peer.node_id()
×
630
        );
631

632
        let mut last_sync_timer = Instant::now();
×
633

×
634
        let mut last_total_accumulated_difficulty = U512::zero();
×
635
        let mut avg_latency = RollingAverageTime::new(20);
×
636
        let mut prev_height: Option<u64> = None;
×
637
        while let Some(header) = header_stream.next().await {
×
638
            let latency = last_sync_timer.elapsed();
×
639
            avg_latency.add_sample(latency);
×
640
            let header = BlockHeader::try_from(header?).map_err(BlockHeaderSyncError::ReceivedInvalidHeader)?;
×
641
            debug!(
×
642
                target: LOG_TARGET,
×
643
                "Validating header #{} (Pow: {}) with hash: ({}). Latency: {:.2?}",
×
644
                header.height,
×
645
                header.pow_algo(),
×
646
                header.hash().to_hex(),
×
647
                latency
648
            );
649
            trace!(
×
650
                target: LOG_TARGET,
×
651
                "{}",
×
652
                header
653
            );
654
            if let Some(prev_header_height) = prev_height {
×
655
                if header.height != prev_header_height.saturating_add(1) {
×
656
                    warn!(
×
657
                        target: LOG_TARGET,
×
658
                        "Received header #{} `{}` does not follow previous header",
×
659
                        header.height,
×
660
                        header.hash().to_hex()
×
661
                    );
662
                    return Err(BlockHeaderSyncError::ReceivedInvalidHeader(
×
663
                        "Header does not follow previous header".to_string(),
×
664
                    ));
×
665
                }
×
666
            }
×
667
            let existing_header = self.db.fetch_header_by_block_hash(header.hash()).await?;
×
668
            if let Some(h) = existing_header {
×
669
                warn!(
×
670
                    target: LOG_TARGET,
×
671
                    "Received header #{} `{}` that we already have.",
×
672
                    h.height,
×
673
                    h.hash().to_hex()
×
674
                );
675
                return Err(BlockHeaderSyncError::ReceivedInvalidHeader(
×
676
                    "Header already in database".to_string(),
×
677
                ));
×
678
            }
×
679
            let current_height = header.height;
×
680
            last_total_accumulated_difficulty = self.header_validator.validate(header).await?;
×
681

682
            if has_switched_to_new_chain {
×
683
                // If we've switched to the new chain, we simply commit every COMMIT_EVERY_N_HEADERS headers
684
                if self.header_validator.valid_headers().len() >= COMMIT_EVERY_N_HEADERS {
×
685
                    self.commit_pending_headers().await?;
×
686
                }
×
687
            } else {
688
                // The remote chain has not (yet) been accepted.
689
                // We check the tip difficulties, switching over to the new chain if a higher accumulated difficulty is
690
                // achieved.
691
                if self.pending_chain_has_higher_pow(&split_info.best_block_header) {
×
692
                    self.switch_to_pending_chain(&split_info).await?;
×
693
                    has_switched_to_new_chain = true;
×
694
                }
×
695
            }
696

697
            sync_peer.set_latency(latency);
×
698
            sync_peer.add_sample(last_sync_timer.elapsed());
×
699
            self.hooks.call_on_progress_header_hooks(
×
700
                current_height,
×
701
                sync_peer.claimed_chain_metadata().best_block_height(),
×
702
                &sync_peer,
×
703
            );
×
704

×
705
            let last_avg_latency = avg_latency.calculate_average_with_min_samples(5);
×
706
            if let Some(avg_latency) = last_avg_latency {
×
707
                if avg_latency > max_latency {
×
708
                    return Err(BlockHeaderSyncError::MaxLatencyExceeded {
×
709
                        peer: sync_peer.node_id().clone(),
×
710
                        latency: avg_latency,
×
711
                        max_latency,
×
712
                    });
×
713
                }
×
714
            }
×
715

716
            last_sync_timer = Instant::now();
×
717
            prev_height = Some(current_height);
×
718
        }
719

720
        if !has_switched_to_new_chain {
×
721
            if sync_peer.claimed_chain_metadata().accumulated_difficulty() <
×
722
                self.header_validator
×
723
                    .current_valid_chain_tip_header()
×
724
                    .map(|h| h.accumulated_data().total_accumulated_difficulty)
×
725
                    .unwrap_or_default()
×
726
            {
727
                // We should only return this error if the peer sent a PoW less than they advertised.
728
                return Err(BlockHeaderSyncError::PeerSentInaccurateChainMetadata {
×
729
                    claimed: sync_peer.claimed_chain_metadata().accumulated_difficulty(),
×
730
                    actual: self
×
731
                        .header_validator
×
732
                        .current_valid_chain_tip_header()
×
733
                        .map(|h| h.accumulated_data().total_accumulated_difficulty),
×
734
                    local: split_info
×
735
                        .best_block_header
×
736
                        .accumulated_data()
×
737
                        .total_accumulated_difficulty,
×
738
                });
×
739
            } else {
740
                warn!(
×
741
                    target: LOG_TARGET,
×
742
                    "Received pow from peer matches claimed, difficulty #{} but local is higher: ({}) and we have not \
×
743
                     swapped. Ignoring",
×
744
                    sync_peer.claimed_chain_metadata().accumulated_difficulty(),
×
745
                    split_info
×
746
                        .best_block_header
×
747
                        .accumulated_data()
×
748
                        .total_accumulated_difficulty
749
                );
750
                return Ok(());
×
751
            }
752
        }
×
753

×
754
        // Commit the last blocks that don't fit into the COMMIT_EVENT_N_HEADERS blocks
×
755
        if !self.header_validator.valid_headers().is_empty() {
×
756
            self.commit_pending_headers().await?;
×
757
        }
×
758

759
        let claimed_total_accumulated_diff = sync_peer.claimed_chain_metadata().accumulated_difficulty();
×
760
        // This rule is strict: if the peer advertised a higher PoW than they were able to provide (without
×
761
        // some other external factor like a disconnect etc), we detect the and ban the peer.
×
762
        if last_total_accumulated_difficulty < claimed_total_accumulated_diff {
×
763
            return Err(BlockHeaderSyncError::PeerSentInaccurateChainMetadata {
×
764
                claimed: claimed_total_accumulated_diff,
×
765
                actual: Some(last_total_accumulated_difficulty),
×
766
                local: split_info
×
767
                    .best_block_header
×
768
                    .accumulated_data()
×
769
                    .total_accumulated_difficulty,
×
770
            });
×
771
        }
×
772

×
773
        Ok(())
×
774
    }
1✔
775

776
    async fn commit_pending_headers(&mut self) -> Result<ChainHeader, BlockHeaderSyncError> {
1✔
777
        let chain_headers = self.header_validator.take_valid_headers();
1✔
778
        let num_headers = chain_headers.len();
1✔
779
        let start = Instant::now();
1✔
780

1✔
781
        let new_tip = chain_headers.last().cloned().unwrap();
1✔
782
        let mut txn = self.db.write_transaction();
1✔
783
        chain_headers.into_iter().for_each(|chain_header| {
1✔
784
            txn.insert_chain_header(chain_header);
1✔
785
        });
1✔
786

1✔
787
        txn.commit().await?;
1✔
788

789
        debug!(
1✔
790
            target: LOG_TARGET,
×
791
            "{} header(s) committed (tip = {}) to the blockchain db in {:.2?}",
×
792
            num_headers,
×
793
            new_tip.height(),
×
794
            start.elapsed()
×
795
        );
796

797
        Ok(new_tip)
1✔
798
    }
1✔
799

800
    fn pending_chain_has_higher_pow(&self, current_tip: &ChainHeader) -> bool {
1✔
801
        let chain_headers = self.header_validator.valid_headers();
1✔
802
        if chain_headers.is_empty() {
1✔
803
            return false;
×
804
        }
1✔
805

1✔
806
        // Check that the remote tip is stronger than the local tip, equal should not have ended up here, so we treat
1✔
807
        // equal as less
1✔
808
        let proposed_tip = chain_headers.last().unwrap();
1✔
809
        self.header_validator.compare_chains(current_tip, proposed_tip).is_lt()
1✔
810
    }
1✔
811

812
    async fn switch_to_pending_chain(&mut self, split_info: &ChainSplitInfo) -> Result<(), BlockHeaderSyncError> {
1✔
813
        // Reorg if required
1✔
814
        if split_info.reorg_steps_back > 0 {
1✔
815
            debug!(
×
816
                target: LOG_TARGET,
×
817
                "Reorg: Rewinding the chain by {} block(s) (split hash = {})",
×
818
                split_info.reorg_steps_back,
×
819
                split_info.chain_split_hash.to_hex()
×
820
            );
821
            let blocks = self.rewind_blockchain(split_info.chain_split_hash).await?;
×
822
            if !blocks.is_empty() {
×
823
                self.hooks.call_on_rewind_hooks(blocks);
×
824
            }
×
825
        }
1✔
826

827
        // Commit the forked chain. At this point
828
        // 1. Headers have been validated
829
        // 2. The forked chain has a higher PoW than the local chain
830
        //
831
        // After this we commit headers every `n` blocks
832
        self.commit_pending_headers().await?;
1✔
833

834
        Ok(())
1✔
835
    }
1✔
836

837
    // Sync peers are also removed from the list of sync peers if the ban duration is longer than the short ban period.
838
    fn remove_sync_peer(&mut self, node_id: &NodeId) {
10✔
839
        if let Some(pos) = self.sync_peers.iter().position(|p| p.node_id() == node_id) {
10✔
840
            self.sync_peers.remove(pos);
10✔
841
        }
10✔
842
    }
10✔
843

844
    // Helper function to get the index to the node_id inside of the vec of peers
845
    fn get_sync_peer_index(&mut self, node_id: &NodeId) -> Option<usize> {
12✔
846
        self.sync_peers.iter().position(|p| p.node_id() == node_id)
12✔
847
    }
12✔
848
}
849

850
#[derive(Debug, Clone)]
851
struct FindChainSplitResult {
852
    reorg_steps_back: u64,
853
    peer_headers: Vec<ProtoBlockHeader>,
854
    peer_fork_hash_index: u64,
855
    chain_split_hash: HashOutput,
856
}
857

858
/// Information about the chain split from the remote node.
859
#[derive(Debug, Clone, PartialEq)]
860
pub struct ChainSplitInfo {
861
    /// The best block's header on the local chain.
862
    pub best_block_header: ChainHeader,
863
    /// The number of blocks to reorg back to the fork.
864
    pub reorg_steps_back: u64,
865
    /// The hash of the block at the fork.
866
    pub chain_split_hash: HashOutput,
867
}
868

869
/// The result of an attempt to synchronize headers with a peer.
870
#[derive(Debug, Clone, PartialEq)]
871
pub struct AttemptSyncResult {
872
    /// The number of headers that were returned.
873
    pub headers_returned: u64,
874
    /// The fork hash index of the remote peer.
875
    pub peer_fork_hash_index: u64,
876
    /// The header sync status.
877
    pub header_sync_status: HeaderSyncStatus,
878
}
879

880
#[derive(Debug, Clone, PartialEq)]
881
pub enum HeaderSyncStatus {
882
    /// Local and remote node are in sync or ahead
883
    InSyncOrAhead,
884
    /// Local node is lagging behind remote node
885
    Lagging(Box<ChainSplitInfo>),
886
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc