• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 16123384529

07 Jul 2025 05:11PM UTC coverage: 64.327% (-7.6%) from 71.89%
16123384529

push

github

web-flow
chore: new release v4.9.0-pre.0 (#7289)

Description
---
new release esmeralda

77151 of 119935 relevant lines covered (64.33%)

227108.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/base_layer/core/src/base_node/state_machine_service/states/header_sync.rs
1
// Copyright 2019. The Tari Project
2
//
3
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
// following conditions are met:
5
//
6
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
// disclaimer.
8
//
9
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
// following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
// products derived from this software without specific prior written permission.
14
//
15
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
use std::{cmp::Ordering, time::Instant};
24

25
use log::*;
26
use tari_common_types::chain_metadata::ChainMetadata;
27
use tari_comms::peer_manager::NodeId;
28

29
#[cfg(feature = "metrics")]
30
use crate::base_node::metrics;
31
use crate::{
32
    base_node::{
33
        comms_interface::BlockEvent,
34
        state_machine_service::states::{BlockSyncInfo, StateEvent, StateInfo, StatusInfo},
35
        sync::{BlockHeaderSyncError, HeaderSynchronizer, SyncPeer},
36
        BaseNodeStateMachine,
37
    },
38
    chain_storage::BlockchainBackend,
39
};
40
const LOG_TARGET: &str = "c::bn::header_sync";
41

42
#[derive(Clone, Debug)]
43
pub struct HeaderSyncState {
44
    sync_peers: Vec<SyncPeer>,
45
    is_synced: bool,
46
    local_metadata: ChainMetadata,
47
}
48

49
impl HeaderSyncState {
50
    pub fn new(mut sync_peers: Vec<SyncPeer>, local_metadata: ChainMetadata) -> Self {
×
51
        // Sort by latency lowest to highest
×
52
        sync_peers.sort_by(|a, b| match a.claimed_difficulty().cmp(&b.claimed_difficulty()) {
×
53
            Ordering::Less => Ordering::Less,
×
54
            // No latency goes to the end
55
            Ordering::Greater => Ordering::Greater,
×
56
            Ordering::Equal => {
57
                match (a.latency(), b.latency()) {
×
58
                    (None, None) => Ordering::Equal,
×
59
                    // No latency goes to the end
60
                    (Some(_), None) => Ordering::Less,
×
61
                    (None, Some(_)) => Ordering::Greater,
×
62
                    (Some(la), Some(lb)) => la.cmp(&lb),
×
63
                }
64
            },
65
        });
×
66
        Self {
×
67
            sync_peers,
×
68
            is_synced: false,
×
69
            local_metadata,
×
70
        }
×
71
    }
×
72

73
    pub fn is_synced(&self) -> bool {
×
74
        self.is_synced
×
75
    }
×
76

77
    pub fn into_sync_peers(self) -> Vec<SyncPeer> {
×
78
        self.sync_peers
×
79
    }
×
80

81
    fn remove_sync_peer(&mut self, node_id: &NodeId) {
×
82
        if let Some(pos) = self.sync_peers.iter().position(|p| p.node_id() == node_id) {
×
83
            self.sync_peers.remove(pos);
×
84
        }
×
85
    }
×
86

87
    // converting u64 to i64 is okay as the future time limit is the hundreds so way below u32 even
88
    #[allow(clippy::too_many_lines)]
89
    #[allow(clippy::cast_possible_wrap)]
90
    pub async fn next_event<B: BlockchainBackend + 'static>(
×
91
        &mut self,
×
92
        shared: &mut BaseNodeStateMachine<B>,
×
93
    ) -> StateEvent {
×
94
        // Only sync to peers with better claimed accumulated difficulty than the local chain: this may be possible
×
95
        // at this stage due to read-write lock race conditions in the database
×
96
        match shared.db.get_chain_metadata().await {
×
97
            Ok(best_block_metadata) => {
×
98
                let mut remove = Vec::new();
×
99
                for sync_peer in &self.sync_peers {
×
100
                    if sync_peer.claimed_chain_metadata().accumulated_difficulty() <=
×
101
                        best_block_metadata.accumulated_difficulty()
×
102
                    {
×
103
                        remove.push(sync_peer.node_id().clone());
×
104
                    }
×
105
                }
106
                for node_id in remove {
×
107
                    self.remove_sync_peer(&node_id);
×
108
                }
×
109
                if self.sync_peers.is_empty() {
×
110
                    // Go back to Listening state
111
                    return StateEvent::Continue;
×
112
                }
×
113
            },
114
            Err(e) => return StateEvent::FatalError(format!("{}", e)),
×
115
        }
116

117
        let mut synchronizer = HeaderSynchronizer::new(
×
118
            shared.config.blockchain_sync_config.clone(),
×
119
            shared.db.clone(),
×
120
            shared.consensus_rules.clone(),
×
121
            shared.connectivity.clone(),
×
122
            &mut self.sync_peers,
×
123
            shared.randomx_factory.clone(),
×
124
            &self.local_metadata,
×
125
        );
×
126

×
127
        let status_event_sender = shared.status_event_sender.clone();
×
128
        let bootstrapped = shared.is_bootstrapped();
×
129
        let randomx_vm_cnt = shared.get_randomx_vm_cnt();
×
130
        let randomx_vm_flags = shared.get_randomx_vm_flags();
×
131
        synchronizer.on_starting(move |sync_peer| {
×
132
            let _result = status_event_sender.send(StatusInfo {
×
133
                bootstrapped,
×
134
                state_info: StateInfo::Connecting(sync_peer.clone()),
×
135
                randomx_vm_cnt,
×
136
                randomx_vm_flags,
×
137
            });
×
138
        });
×
139

×
140
        let status_event_sender = shared.status_event_sender.clone();
×
141
        synchronizer.on_progress(move |current_height, remote_tip_height, sync_peer| {
×
142
            let details = BlockSyncInfo {
×
143
                tip_height: remote_tip_height,
×
144
                local_height: current_height,
×
145
                sync_peer: sync_peer.clone(),
×
146
            };
×
147
            let _result = status_event_sender.send(StatusInfo {
×
148
                bootstrapped,
×
149
                state_info: StateInfo::HeaderSync(Some(details)),
×
150
                randomx_vm_cnt,
×
151
                randomx_vm_flags,
×
152
            });
×
153
        });
×
154

×
155
        let local_nci = shared.local_node_interface.clone();
×
156
        synchronizer.on_rewind(move |removed| {
×
157
            #[cfg(feature = "metrics")]
158
            if let Some(fork_height) = removed.last().map(|b| b.height().saturating_sub(1)) {
×
159
                metrics::tip_height().set(fork_height as i64);
×
160
                metrics::reorg(fork_height, 0, removed.len()).inc();
×
161
            }
×
162

163
            local_nci.publish_block_event(BlockEvent::BlockSyncRewind(removed));
×
164
        });
×
165

×
166
        let timer = Instant::now();
×
167
        match synchronizer.synchronize().await {
×
168
            Ok((sync_peer, sync_result)) => {
×
169
                info!(
×
170
                    target: LOG_TARGET,
×
171
                    "Headers synchronized from peer {} in {:.0?}",
×
172
                    sync_peer,
×
173
                    timer.elapsed()
×
174
                );
175
                // Move the sync peer used in header sync to the front of the queue
176
                if let Some(pos) = self.sync_peers.iter().position(|p| *p == sync_peer) {
×
177
                    if pos > 0 {
×
178
                        let sync_peer = self.sync_peers.remove(pos);
×
179
                        self.sync_peers.insert(0, sync_peer);
×
180
                    }
×
181
                }
×
182
                self.is_synced = true;
×
183
                StateEvent::HeadersSynchronized(sync_peer, sync_result)
×
184
            },
185
            Err(err) => {
×
186
                let _ignore = shared.status_event_sender.send(StatusInfo {
×
187
                    bootstrapped,
×
188
                    state_info: StateInfo::SyncFailed("HeaderSyncFailed".to_string()),
×
189
                    randomx_vm_cnt,
×
190
                    randomx_vm_flags,
×
191
                });
×
192
                match err {
×
193
                    BlockHeaderSyncError::SyncFailedAllPeers => {
194
                        error!(target: LOG_TARGET, "Header sync failed with all peers. Error: {}", err);
×
195
                        warn!(target: LOG_TARGET, "{}. Continuing...", err);
×
196
                        StateEvent::Continue
×
197
                    },
198
                    _ => {
199
                        debug!(target: LOG_TARGET, "Header sync failed: {}", err);
×
200
                        StateEvent::HeaderSyncFailed(err.to_string())
×
201
                    },
202
                }
203
            },
204
        }
205
    }
×
206
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc