• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 17033178607

18 Aug 2025 06:45AM UTC coverage: 54.49% (-0.007%) from 54.497%
17033178607

push

github

stringhandler
Merge branch 'development' of github.com:tari-project/tari into odev

971 of 2923 new or added lines in 369 files covered. (33.22%)

5804 existing lines in 173 files now uncovered.

76688 of 140739 relevant lines covered (54.49%)

193850.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/base_layer/core/src/base_node/sync/horizon_state_sync/synchronizer.rs
1
//  Copyright 2022, The Tari Project
2
//
3
//  Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
//  following conditions are met:
5
//
6
//  1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
//  disclaimer.
8
//
9
//  2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
//  following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
//  3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
//  products derived from this software without specific prior written permission.
14
//
15
//  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
//  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
//  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
//  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
//  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
//  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
//  USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
use std::{
24
    cmp,
25
    convert::{TryFrom, TryInto},
26
    sync::Arc,
27
    time::{Duration, Instant},
28
};
29

30
use futures::StreamExt;
31
use log::*;
32
use tari_common_types::types::{CompressedCommitment, FixedHash, RangeProofService};
33
use tari_comms::{connectivity::ConnectivityRequester, peer_manager::NodeId, protocol::rpc::RpcClient, PeerConnection};
34
use tari_crypto::commitment::HomomorphicCommitment;
35
use tari_utilities::{hex::Hex, ByteArray};
36
use tokio::task;
37

38
use super::error::HorizonSyncError;
39
use crate::{
40
    base_node::sync::{
41
        ban::PeerBanManager,
42
        hooks::Hooks,
43
        horizon_state_sync::{HorizonSyncInfo, HorizonSyncStatus},
44
        rpc,
45
        rpc::BaseNodeSyncRpcClient,
46
        BlockchainSyncConfig,
47
        SyncPeer,
48
    },
49
    blocks::{BlockHeader, ChainHeader, UpdateBlockAccumulatedData},
50
    chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend, ChainStorageError, MmrTree},
51
    common::{rolling_avg::RollingAverageTime, BanPeriod},
52
    consensus::ConsensusManager,
53
    proto::base_node::{sync_utxos_response::Txo, SyncKernelsRequest, SyncUtxosRequest, SyncUtxosResponse},
54
    transactions::transaction_components::{
55
        transaction_output::batch_verify_range_proofs,
56
        TransactionKernel,
57
        TransactionOutput,
58
    },
59
    validation::{
60
        aggregate_body::validate_individual_output,
61
        helpers::validate_output_version,
62
        FinalHorizonStateValidation,
63
    },
64
    PrunedKernelMmr,
65
};
66

67
const LOG_TARGET: &str = "c::bn::state_machine_service::states::horizon_state_sync";
68

69
const MAX_LATENCY_INCREASES: usize = 5;
70

71
pub struct HorizonStateSynchronization<'a, B> {
72
    config: BlockchainSyncConfig,
73
    db: AsyncBlockchainDb<B>,
74
    rules: ConsensusManager,
75
    sync_peers: &'a mut Vec<SyncPeer>,
76
    horizon_sync_height: u64,
77
    prover: Arc<RangeProofService>,
78
    num_kernels: u64,
79
    num_outputs: u64,
80
    hooks: Hooks,
81
    connectivity: ConnectivityRequester,
82
    final_state_validator: Arc<dyn FinalHorizonStateValidation<B>>,
83
    max_latency: Duration,
84
    peer_ban_manager: PeerBanManager,
85
}
86

87
impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
88
    #[allow(clippy::too_many_arguments)]
89
    pub fn new(
×
90
        config: BlockchainSyncConfig,
×
91
        db: AsyncBlockchainDb<B>,
×
92
        connectivity: ConnectivityRequester,
×
93
        rules: ConsensusManager,
×
94
        sync_peers: &'a mut Vec<SyncPeer>,
×
95
        horizon_sync_height: u64,
×
96
        prover: Arc<RangeProofService>,
×
97
        final_state_validator: Arc<dyn FinalHorizonStateValidation<B>>,
×
98
    ) -> Self {
×
99
        let peer_ban_manager = PeerBanManager::new(config.clone(), connectivity.clone());
×
100
        Self {
×
101
            max_latency: config.initial_max_sync_latency,
×
102
            config,
×
103
            db,
×
104
            rules,
×
105
            connectivity,
×
106
            sync_peers,
×
107
            horizon_sync_height,
×
108
            prover,
×
109
            num_kernels: 0,
×
110
            num_outputs: 0,
×
111
            hooks: Hooks::default(),
×
112
            final_state_validator,
×
113
            peer_ban_manager,
×
114
        }
×
115
    }
×
116

117
    pub fn on_starting<H>(&mut self, hook: H)
×
118
    where for<'r> H: FnOnce(&SyncPeer) + Send + Sync + 'static {
×
119
        self.hooks.add_on_starting_hook(hook);
×
120
    }
×
121

122
    pub fn on_progress<H>(&mut self, hook: H)
×
123
    where H: Fn(HorizonSyncInfo) + Send + Sync + 'static {
×
124
        self.hooks.add_on_progress_horizon_hook(hook);
×
125
    }
×
126

127
    pub async fn synchronize(&mut self) -> Result<(), HorizonSyncError> {
×
128
        if self.sync_peers.is_empty() {
×
129
            return Err(HorizonSyncError::NoSyncPeers);
×
130
        }
×
131

×
132
        debug!(
×
133
            target: LOG_TARGET,
×
134
            "Preparing database for horizon sync to height #{}", self.horizon_sync_height
×
135
        );
136
        let to_header = self.db().fetch_header(self.horizon_sync_height).await?.ok_or_else(|| {
×
137
            ChainStorageError::ValueNotFound {
×
138
                entity: "Header",
×
139
                field: "height",
×
140
                value: self.horizon_sync_height.to_string(),
×
141
            }
×
142
        })?;
×
143

144
        let mut latency_increases_counter = 0;
×
145
        loop {
146
            match self.sync(&to_header).await {
×
147
                Ok(()) => return Ok(()),
×
148
                Err(err @ HorizonSyncError::AllSyncPeersExceedLatency) => {
×
149
                    // If we don't have many sync peers to select from, return the listening state and see if we can get
×
150
                    // some more.
×
151
                    warn!(
×
152
                        target: LOG_TARGET,
×
153
                        "Slow sync peers detected: {}",
×
154
                        self.sync_peers
×
155
                            .iter()
×
156
                            .map(|p| format!("{} ({:.2?})", p.node_id(), p.latency().unwrap_or_default()))
×
157
                            .collect::<Vec<_>>()
×
158
                            .join(", ")
×
159
                    );
160
                    if self.sync_peers.len() < 2 {
×
161
                        return Err(err);
×
162
                    }
×
163
                    self.max_latency += self.config.max_latency_increase;
×
164
                    latency_increases_counter += 1;
×
165
                    if latency_increases_counter > MAX_LATENCY_INCREASES {
×
166
                        return Err(err);
×
167
                    }
×
168
                },
169
                Err(err) => return Err(err),
×
170
            }
171
        }
172
    }
×
173

174
    async fn sync(&mut self, to_header: &BlockHeader) -> Result<(), HorizonSyncError> {
×
175
        let sync_peer_node_ids = self.sync_peers.iter().map(|p| p.node_id()).cloned().collect::<Vec<_>>();
×
176
        info!(
×
177
            target: LOG_TARGET,
×
178
            "Attempting to sync horizon state ({} sync peers)",
×
179
            sync_peer_node_ids.len()
×
180
        );
181
        let mut latency_counter = 0usize;
×
182
        for node_id in sync_peer_node_ids {
×
183
            match self.connect_and_attempt_sync(&node_id, to_header).await {
×
184
                Ok(_) => return Ok(()),
×
185
                // Try another peer
186
                Err(err) => {
×
187
                    let ban_reason = HorizonSyncError::get_ban_reason(&err);
×
188

189
                    if let Some(reason) = ban_reason {
×
190
                        let duration = match reason.ban_duration {
×
191
                            BanPeriod::Short => self.config.short_ban_period,
×
192
                            BanPeriod::Long => self.config.ban_period,
×
193
                        };
NEW
194
                        warn!(target: LOG_TARGET, "{err}");
×
195
                        self.peer_ban_manager
×
196
                            .ban_peer_if_required(&node_id, reason.reason, duration)
×
197
                            .await;
×
198
                    }
×
199
                    if let HorizonSyncError::MaxLatencyExceeded { .. } = err {
×
200
                        latency_counter += 1;
×
201
                    } else {
×
202
                        self.remove_sync_peer(&node_id);
×
203
                    }
×
204
                },
205
            }
206
        }
207

208
        if self.sync_peers.is_empty() {
×
209
            Err(HorizonSyncError::NoMoreSyncPeers("Header sync failed".to_string()))
×
210
        } else if latency_counter >= self.sync_peers.len() {
×
211
            Err(HorizonSyncError::AllSyncPeersExceedLatency)
×
212
        } else {
213
            Err(HorizonSyncError::FailedSyncAllPeers)
×
214
        }
215
    }
×
216

217
    async fn connect_and_attempt_sync(
×
218
        &mut self,
×
219
        node_id: &NodeId,
×
220
        to_header: &BlockHeader,
×
221
    ) -> Result<(), HorizonSyncError> {
×
222
        // Connect
223
        let (mut client, sync_peer) = self.connect_sync_peer(node_id).await?;
×
224

225
        // Perform horizon sync
226
        debug!(target: LOG_TARGET, "Check if pruning is needed");
×
227
        self.prune_if_needed().await?;
×
228
        self.sync_kernels_and_outputs(sync_peer.clone(), &mut client, to_header)
×
229
            .await?;
×
230

231
        // Validate and finalize horizon sync
232
        self.finalize_horizon_sync(&sync_peer).await?;
×
233

234
        Ok(())
×
235
    }
×
236

237
    async fn connect_sync_peer(
×
238
        &mut self,
×
239
        node_id: &NodeId,
×
240
    ) -> Result<(BaseNodeSyncRpcClient, SyncPeer), HorizonSyncError> {
×
241
        let peer_index = self
×
242
            .get_sync_peer_index(node_id)
×
243
            .ok_or(HorizonSyncError::PeerNotFound)?;
×
NEW
244
        let sync_peer = self.sync_peers.get(peer_index).expect("Already checked");
×
245
        self.hooks.call_on_starting_hook(sync_peer);
×
246

247
        let mut conn = self.dial_sync_peer(node_id).await?;
×
248
        debug!(
×
249
            target: LOG_TARGET,
×
NEW
250
            "Attempting to synchronize horizon state with `{node_id}`"
×
251
        );
252

253
        let config = RpcClient::builder()
×
254
            .with_deadline(self.config.rpc_deadline)
×
255
            .with_deadline_grace_period(Duration::from_secs(5));
×
256

257
        let mut client = conn
×
258
            .connect_rpc_using_builder::<rpc::BaseNodeSyncRpcClient>(config)
×
259
            .await?;
×
260

261
        let latency = client
×
262
            .get_last_request_latency()
×
263
            .expect("unreachable panic: last request latency must be set after connect");
×
NEW
264
        self.sync_peers
×
NEW
265
            .get_mut(peer_index)
×
NEW
266
            .expect("Already checked")
×
NEW
267
            .set_latency(latency);
×
268
        if latency > self.max_latency {
×
269
            return Err(HorizonSyncError::MaxLatencyExceeded {
×
270
                peer: conn.peer_node_id().clone(),
×
271
                latency,
×
272
                max_latency: self.max_latency,
×
273
            });
×
274
        }
×
NEW
275
        debug!(target: LOG_TARGET, "Sync peer latency is {latency:.2?}");
×
276

NEW
277
        Ok((
×
NEW
278
            client,
×
NEW
279
            self.sync_peers.get(peer_index).expect("Already checked").clone(),
×
NEW
280
        ))
×
UNCOV
281
    }
×
282

283
    async fn dial_sync_peer(&self, node_id: &NodeId) -> Result<PeerConnection, HorizonSyncError> {
×
284
        let timer = Instant::now();
×
NEW
285
        debug!(target: LOG_TARGET, "Dialing {node_id} sync peer");
×
286
        let conn = self.connectivity.dial_peer(node_id.clone()).await?;
×
287
        info!(
×
UNCOV
288
            target: LOG_TARGET,
×
289
            "Successfully dialed sync peer {} in {:.2?}",
×
290
            node_id,
×
291
            timer.elapsed()
×
292
        );
293
        Ok(conn)
×
294
    }
×
295

296
    async fn sync_kernels_and_outputs(
×
297
        &mut self,
×
UNCOV
298
        sync_peer: SyncPeer,
×
299
        client: &mut rpc::BaseNodeSyncRpcClient,
×
300
        to_header: &BlockHeader,
×
UNCOV
301
    ) -> Result<(), HorizonSyncError> {
×
302
        // Note: We do not need to rewind kernels if the sync fails due to it being validated when inserted into
×
303
        //       the database. Furthermore, these kernels will also be successfully removed when we need to rewind
×
304
        //       the blockchain for whatever reason.
×
305
        debug!(target: LOG_TARGET, "Synchronizing kernels");
×
306
        self.synchronize_kernels(sync_peer.clone(), client, to_header).await?;
×
307
        debug!(target: LOG_TARGET, "Synchronizing outputs");
×
308
        // let cloned_backup_smt = self.db.inner().smt_read_access()?.clone();
309
        match self.synchronize_outputs(sync_peer, client, to_header).await {
×
310
            Ok(_) => Ok(()),
×
311
            Err(err) => {
×
312
                // We need to clean up the outputs
×
313
                let _ = self.clean_up_failed_output_sync(to_header).await;
×
314
                // let mut smt = self.db.inner().smt_write_access()?;
315
                // *smt = cloned_backup_smt;
316
                Err(err)
×
317
            },
318
        }
319
    }
×
320

321
    /// We clean up a failed output sync attempt and ignore any errors that occur during the clean up process.
322
    async fn clean_up_failed_output_sync(&mut self, to_header: &BlockHeader) {
×
UNCOV
323
        let tip_header = if let Ok(header) = self.db.fetch_tip_header().await {
×
UNCOV
324
            header
×
325
        } else {
UNCOV
326
            return;
×
327
        };
328
        let db = self.db().clone();
×
329
        let mut txn = db.write_transaction();
×
330
        let mut current_header = to_header.clone();
×
331
        loop {
332
            if let Ok(outputs) = self.db.fetch_outputs_in_block(current_header.hash()).await {
×
UNCOV
333
                for (count, output) in (1..=outputs.len()).zip(outputs.iter()) {
×
334
                    // Note: We do not need to clean up the SMT as it was not saved in the database yet, however, we
335
                    // need to clean up the outputs
336
                    txn.prune_output_from_all_dbs(
×
UNCOV
337
                        output.hash(),
×
338
                        output.commitment.clone(),
×
339
                        output.features.output_type,
×
UNCOV
340
                    );
×
UNCOV
341
                    if let Err(e) = txn.commit().await {
×
342
                        warn!(
×
343
                        target: LOG_TARGET,
×
344
                        "Clean up failed sync - prune output from all dbs for header '{}': {}",
×
345
                        current_header.hash(), e
×
346
                        );
347
                    }
×
348
                    if count % 100 == 0 || count == outputs.len() {
×
349
                        if let Err(e) = txn.commit().await {
×
350
                            warn!(
×
351
                                target: LOG_TARGET,
×
UNCOV
352
                                "Clean up failed sync - commit prune outputs for header '{}': {}",
×
353
                                current_header.hash(), e
×
354
                            );
355
                        }
×
356
                    }
×
357
                }
358
            }
×
359
            if let Err(e) = txn.commit().await {
×
UNCOV
360
                warn!(
×
361
                    target: LOG_TARGET, "Clean up failed output sync - commit delete kernels for header '{}': {}",
×
362
                    current_header.hash(), e
×
363
                );
364
            }
×
365
            if let Ok(header) = db.fetch_header_by_block_hash(current_header.prev_hash).await {
×
366
                if let Some(previous_header) = header {
×
367
                    current_header = previous_header;
×
368
                } else {
×
UNCOV
369
                    warn!(target: LOG_TARGET, "Could not clean up failed output sync, previous_header link missing frm db");
×
370
                    break;
×
371
                }
372
            } else {
373
                warn!(
×
374
                    target: LOG_TARGET,
×
375
                    "Could not clean up failed output sync, header '{}' not in db",
×
376
                    current_header.prev_hash.to_hex()
×
377
                );
UNCOV
378
                break;
×
379
            }
380
            if &current_header.hash() == tip_header.hash() {
×
381
                debug!(target: LOG_TARGET, "Finished cleaning up failed output sync");
×
382
                break;
×
UNCOV
383
            }
×
384
        }
UNCOV
385
    }
×
386

387
    async fn prune_if_needed(&mut self) -> Result<(), HorizonSyncError> {
×
388
        let local_metadata = self.db.get_chain_metadata().await?;
×
389
        let new_prune_height = cmp::min(local_metadata.best_block_height(), self.horizon_sync_height);
×
UNCOV
390
        if local_metadata.pruned_height() < new_prune_height {
×
NEW
391
            debug!(target: LOG_TARGET, "Pruning block chain to height {new_prune_height}");
×
UNCOV
392
            self.db.prune_to_height(new_prune_height).await?;
×
393
        }
×
394

395
        Ok(())
×
396
    }
×
397

398
    #[allow(clippy::too_many_lines)]
399
    async fn synchronize_kernels(
×
UNCOV
400
        &mut self,
×
401
        mut sync_peer: SyncPeer,
×
402
        client: &mut rpc::BaseNodeSyncRpcClient,
×
UNCOV
403
        to_header: &BlockHeader,
×
UNCOV
404
    ) -> Result<(), HorizonSyncError> {
×
NEW
405
        info!(target: LOG_TARGET, "Starting kernel sync from peer {sync_peer}");
×
406
        let local_num_kernels = self.db().fetch_mmr_size(MmrTree::Kernel).await?;
×
407

408
        let remote_num_kernels = to_header.kernel_mmr_size;
×
409
        self.num_kernels = remote_num_kernels;
×
410

×
411
        if local_num_kernels >= remote_num_kernels {
×
412
            debug!(target: LOG_TARGET, "Local kernel set already synchronized");
×
UNCOV
413
            return Ok(());
×
414
        }
×
415

×
416
        let info = HorizonSyncInfo::new(vec![sync_peer.node_id().clone()], HorizonSyncStatus::Kernels {
×
417
            current: local_num_kernels,
×
418
            total: remote_num_kernels,
×
419
            sync_peer: sync_peer.clone(),
×
420
        });
×
421
        self.hooks.call_on_progress_horizon_hooks(info);
×
422

×
423
        debug!(
×
424
            target: LOG_TARGET,
×
425
            "Requesting kernels from {} to {} ({} remaining)",
×
426
            local_num_kernels,
×
427
            remote_num_kernels,
×
428
            remote_num_kernels.saturating_sub(local_num_kernels),
×
429
        );
430

431
        let latency = client.get_last_request_latency();
×
432
        debug!(
×
433
            target: LOG_TARGET,
×
434
            "Initiating kernel sync with peer `{}` (latency = {}ms)",
×
UNCOV
435
            sync_peer.node_id(),
×
UNCOV
436
            latency.unwrap_or_default().as_millis()
×
437
        );
438

439
        let mut current_header = self.db().fetch_header_containing_kernel_mmr(local_num_kernels).await?;
×
440
        let req = SyncKernelsRequest {
×
441
            start: local_num_kernels,
×
442
            end_header_hash: to_header.hash().to_vec(),
×
UNCOV
443
        };
×
UNCOV
444
        let mut kernel_stream = client.sync_kernels(req).await?;
×
445

446
        debug!(
×
447
            target: LOG_TARGET,
×
448
            "Found header for kernels at mmr pos: {} height: {}",
×
449
            local_num_kernels,
×
450
            current_header.height()
×
451
        );
452
        let mut kernel_hashes = vec![];
×
453
        let db = self.db().clone();
×
454
        let mut txn = db.write_transaction();
×
455
        let mut mmr_position = local_num_kernels;
×
456
        let end = remote_num_kernels;
×
UNCOV
457
        let mut last_sync_timer = Instant::now();
×
458
        let mut avg_latency = RollingAverageTime::new(20);
×
459
        while let Some(kernel) = kernel_stream.next().await {
×
460
            let latency = last_sync_timer.elapsed();
×
461
            avg_latency.add_sample(latency);
×
462
            let kernel: TransactionKernel = kernel?.try_into().map_err(HorizonSyncError::ConversionError)?;
×
463
            kernel.verify_signature()?;
×
464

465
            kernel_hashes.push(kernel.hash());
×
466

×
467
            if mmr_position > end {
×
468
                return Err(HorizonSyncError::IncorrectResponse(
×
469
                    "Peer sent too many kernels".to_string(),
×
UNCOV
470
                ));
×
471
            }
×
472

×
473
            txn.insert_kernel_via_horizon_sync(kernel, *current_header.hash(), mmr_position);
×
474
            if mmr_position == current_header.header().kernel_mmr_size.saturating_sub(1) {
×
475
                let num_kernels = kernel_hashes.len();
×
476
                debug!(
×
477
                    target: LOG_TARGET,
×
478
                    "Header #{} ({} kernels, latency: {:.2?})",
×
479
                    current_header.height(),
×
480
                    num_kernels,
481
                    latency
482
                );
483
                // Validate root
484
                let block_data = db
×
485
                    .fetch_block_accumulated_data(current_header.header().prev_hash)
×
UNCOV
486
                    .await?;
×
UNCOV
487
                let kernel_pruned_set = block_data.dissolve();
×
UNCOV
488
                let mut kernel_mmr = PrunedKernelMmr::new(kernel_pruned_set);
×
489

490
                for hash in kernel_hashes.drain(..) {
×
491
                    kernel_mmr.push(hash.to_vec())?;
×
492
                }
493

494
                let mmr_root = kernel_mmr.get_merkle_root()?;
×
UNCOV
495
                if mmr_root.as_slice() != current_header.header().kernel_mr.as_slice() {
×
496
                    return Err(HorizonSyncError::InvalidMrRoot {
×
497
                        mr_tree: MmrTree::Kernel.to_string(),
×
UNCOV
498
                        at_height: current_header.height(),
×
UNCOV
499
                        expected_hex: current_header.header().kernel_mr.to_hex(),
×
500
                        actual_hex: mmr_root.to_hex(),
×
501
                    });
×
502
                }
×
503

504
                let kernel_hash_set = kernel_mmr.get_pruned_hash_set()?;
×
505
                debug!(
×
506
                    target: LOG_TARGET,
×
507
                    "Updating block data at height {}",
×
508
                    current_header.height()
×
509
                );
510
                txn.update_block_accumulated_data_via_horizon_sync(
×
511
                    *current_header.hash(),
×
512
                    UpdateBlockAccumulatedData {
×
513
                        kernel_hash_set: Some(kernel_hash_set),
×
514
                        ..Default::default()
×
UNCOV
515
                    },
×
516
                );
×
517

×
518
                txn.commit().await?;
×
519
                debug!(
×
520
                    target: LOG_TARGET,
×
521
                    "Committed {} kernel(s), ({}/{}) {} remaining",
×
522
                    num_kernels,
×
523
                    mmr_position + 1,
×
524
                    end,
×
525
                    end.saturating_sub(mmr_position + 1)
×
526
                );
527
                if mmr_position < end.saturating_sub(1) {
×
528
                    current_header = db.fetch_chain_header(current_header.height() + 1).await?;
×
529
                }
×
530
            }
×
531
            mmr_position += 1;
×
UNCOV
532

×
533
            sync_peer.set_latency(latency);
×
534
            sync_peer.add_sample(last_sync_timer.elapsed());
×
535
            if mmr_position % 100 == 0 || mmr_position == self.num_kernels {
×
536
                let info = HorizonSyncInfo::new(vec![sync_peer.node_id().clone()], HorizonSyncStatus::Kernels {
×
537
                    current: mmr_position,
×
538
                    total: self.num_kernels,
×
539
                    sync_peer: sync_peer.clone(),
×
540
                });
×
541
                self.hooks.call_on_progress_horizon_hooks(info);
×
542
            }
×
543

544
            self.check_latency(sync_peer.node_id(), &avg_latency)?;
×
545

546
            last_sync_timer = Instant::now();
×
547
        }
548

UNCOV
549
        if mmr_position != end {
×
550
            return Err(HorizonSyncError::IncorrectResponse(
×
UNCOV
551
                "Sync node did not send all kernels requested".to_string(),
×
552
            ));
×
UNCOV
553
        }
×
UNCOV
554
        Ok(())
×
555
    }
×
556

557
    fn check_latency(&self, peer: &NodeId, avg_latency: &RollingAverageTime) -> Result<(), HorizonSyncError> {
×
558
        if let Some(avg_latency) = avg_latency.calculate_average_with_min_samples(5) {
×
559
            if avg_latency > self.max_latency {
×
560
                return Err(HorizonSyncError::MaxLatencyExceeded {
×
561
                    peer: peer.clone(),
×
UNCOV
562
                    latency: avg_latency,
×
563
                    max_latency: self.max_latency,
×
564
                });
×
565
            }
×
566
        }
×
567

568
        Ok(())
×
569
    }
×
570

571
    // Synchronize outputs, returning true if any keys were deleted from the output SMT.
572
    #[allow(clippy::too_many_lines)]
UNCOV
573
    async fn synchronize_outputs(
×
574
        &mut self,
×
575
        mut sync_peer: SyncPeer,
×
UNCOV
576
        client: &mut rpc::BaseNodeSyncRpcClient,
×
UNCOV
577
        to_header: &BlockHeader,
×
UNCOV
578
    ) -> Result<(), HorizonSyncError> {
×
NEW
579
        info!(target: LOG_TARGET, "Starting output sync from peer {sync_peer}");
×
580
        let db = self.db().clone();
×
581
        let tip_header = db.fetch_tip_header().await?;
×
582

583
        // Estimate the number of outputs to be downloaded; this cannot be known exactly until the sync is complete.
584
        let mut current_header = to_header.clone();
×
585
        self.num_outputs = 0;
×
586
        loop {
587
            current_header =
×
UNCOV
588
                if let Some(previous_header) = db.fetch_header_by_block_hash(current_header.prev_hash).await? {
×
UNCOV
589
                    self.num_outputs += current_header
×
590
                        .output_smt_size
×
591
                        .saturating_sub(previous_header.output_smt_size);
×
UNCOV
592
                    previous_header
×
593
                } else {
594
                    break;
×
595
                };
596
            if &current_header.hash() == tip_header.hash() {
×
597
                break;
×
598
            }
×
599
        }
600

UNCOV
601
        let info = HorizonSyncInfo::new(vec![sync_peer.node_id().clone()], HorizonSyncStatus::Outputs {
×
602
            current: 0,
×
603
            total: self.num_outputs,
×
604
            sync_peer: sync_peer.clone(),
×
UNCOV
605
        });
×
UNCOV
606
        self.hooks.call_on_progress_horizon_hooks(info);
×
607

×
608
        let latency = client.get_last_request_latency();
×
609
        debug!(
×
610
            target: LOG_TARGET,
×
611
            "Initiating output sync with peer `{}`, requesting ~{} outputs, tip_header height `{}`, \
×
612
            last_chain_header height `{}` (latency = {}ms)",
×
613
            sync_peer.node_id(),
×
614
            self.num_outputs,
×
615
            tip_header.height(),
×
616
            db.fetch_last_chain_header().await?.height(),
×
617
            latency.unwrap_or_default().as_millis(),
×
618
        );
619

620
        let start_chain_header = db.fetch_chain_header(tip_header.height() + 1).await?;
×
621
        let req = SyncUtxosRequest {
×
622
            start_header_hash: start_chain_header.hash().to_vec(),
×
623
            end_header_hash: to_header.hash().to_vec(),
×
UNCOV
624
        };
×
UNCOV
625
        let mut output_stream = client.sync_utxos(req).await?;
×
626

627
        // let mut txn = db.write_transaction();
628
        let mut utxo_counter = 0u64;
×
629
        let mut stxo_counter = 0u64;
×
630
        // let mut output_smt = (*db.inner().smt_write_access()?).clone();
×
631
        let mut last_sync_timer = Instant::now();
×
UNCOV
632
        let mut avg_latency = RollingAverageTime::new(20);
×
633

634
        // let mut inputs_to_delete = Vec::new();
635
        while let Some(response) = output_stream.next().await {
×
636
            let latency = last_sync_timer.elapsed();
×
637
            avg_latency.add_sample(latency);
×
638
            let res: SyncUtxosResponse = response?;
×
639

UNCOV
640
            let output_header_hash = FixedHash::try_from(res.mined_header)
×
641
                .map_err(|_| HorizonSyncError::IncorrectResponse("Peer sent no mined header".into()))?;
×
642
            let current_header = self
×
643
                .db()
×
644
                .fetch_header_by_block_hash(output_header_hash)
×
UNCOV
645
                .await?
×
646
                .ok_or_else(|| {
×
647
                    HorizonSyncError::IncorrectResponse("Peer sent mined header we do not know of".into())
×
648
                })?;
×
649

650
            let proto_output = res
×
651
                .txo
×
652
                .ok_or_else(|| HorizonSyncError::IncorrectResponse("Peer sent no transaction output data".into()))?;
×
653
            match proto_output {
×
654
                Txo::Output(output) => {
×
UNCOV
655
                    utxo_counter += 1;
×
656
                    // Increase the estimate number of outputs to be downloaded (for display purposes only).
×
657
                    if utxo_counter >= self.num_outputs {
×
658
                        self.num_outputs = utxo_counter + u64::from(current_header.hash() != to_header.hash());
×
659
                    }
×
660

661
                    let constants = self.rules.consensus_constants(current_header.height).clone();
×
662
                    let output = TransactionOutput::try_from(output).map_err(HorizonSyncError::ConversionError)?;
×
663
                    if !output.is_burned() {
×
664
                        debug!(
×
665
                            target: LOG_TARGET,
×
UNCOV
666
                            "UTXO `{}` received from sync peer ({} of {})",
×
667
                            output.hash(),
×
668
                            utxo_counter,
669
                            self.num_outputs,
670
                        );
671
                        validate_output_version(&constants, &output)?;
×
672
                        validate_individual_output(&output, &constants)?;
×
673

UNCOV
674
                        batch_verify_range_proofs(&self.prover, &[&output])?;
×
675
                        // let smt_key = NodeKey::try_from(output.commitment.as_bytes())?;
676
                        // let smt_node = ValueHash::try_from(output.smt_hash(current_header.height).as_slice())?;
677
                        // if let Err(e) = output_smt.insert(smt_key, smt_node) {
678
                        //     error!(
679
                        //         target: LOG_TARGET,
680
                        //         "Output commitment({}) already in SMT",
681
                        //         output.commitment.to_hex(),
682
                        //     );
683
                        //     return Err(e.into());
684
                        // }
UNCOV
685
                        todo!("Implement smt changes");
×
686
                        // txn.insert_output_via_horizon_sync(
687
                        //     output,
688
                        //     current_header.hash(),
689
                        //     current_header.height,
690
                        //     current_header.timestamp.as_u64(),
691
                        // );
692

693
                        // // We have checked the range proof, and we have checked that the linked to header exists.
694
                        // txn.commit().await?;
UNCOV
695
                    }
×
696
                },
UNCOV
697
                Txo::Commitment(commitment_bytes) => {
×
UNCOV
698
                    stxo_counter += 1;
×
699

UNCOV
700
                    let commitment = CompressedCommitment::from_canonical_bytes(commitment_bytes.as_slice())?;
×
701
                    match self
×
UNCOV
702
                        .db()
×
703
                        .fetch_unspent_output_hash_by_commitment(commitment.clone())
×
704
                        .await?
×
705
                    {
706
                        Some(output_hash) => {
×
707
                            debug!(
×
708
                                target: LOG_TARGET,
×
NEW
709
                                "STXO hash `{output_hash}` received from sync peer ({stxo_counter})",
×
710
                            );
711
                            // let smt_key = NodeKey::try_from(commitment_bytes.as_slice())?;
712
                            // match output_smt.delete(&smt_key)? {
713
                            //     DeleteResult::Deleted(_value_hash) => {},
714
                            //     DeleteResult::KeyNotFound => {
715
                            //         error!(
716
                            //             target: LOG_TARGET,
717
                            //             "Could not find input({}) in SMT",
718
                            //             commitment.to_hex(),
719
                            //         );
720
                            //         return Err(HorizonSyncError::ChainStorageError(
721
                            //             ChainStorageError::UnspendableInput,
722
                            //         ));
723
                            //     },
724
                            // };
UNCOV
725
                            todo!("Implement smt changes");
×
726
                            // This will only be committed once the SMT has been verified due to rewind difficulties if
727
                            // we need to abort the sync
728
                            // inputs_to_delete.push((output_hash, commitment));
729
                        },
730
                        None => {
UNCOV
731
                            return Err(HorizonSyncError::IncorrectResponse(
×
UNCOV
732
                                "Peer sent unknown commitment hash".into(),
×
UNCOV
733
                            ))
×
734
                        },
735
                    }
736
                },
737
            }
738

UNCOV
739
            if utxo_counter % 100 == 0 {
×
UNCOV
740
                let info = HorizonSyncInfo::new(vec![sync_peer.node_id().clone()], HorizonSyncStatus::Outputs {
×
UNCOV
741
                    current: utxo_counter,
×
UNCOV
742
                    total: self.num_outputs,
×
743
                    sync_peer: sync_peer.clone(),
×
744
                });
×
745
                self.hooks.call_on_progress_horizon_hooks(info);
×
746
            }
×
747
            sync_peer.set_latency(latency);
×
748
            sync_peer.add_sample(last_sync_timer.elapsed());
×
749
            last_sync_timer = Instant::now();
×
750
        }
751
        // The SMT can only be verified after all outputs have been downloaded, due to the way we optimize fetching
752
        // outputs from the sync peer. As an example:
753
        // 1. Initial sync:
754
        //    - We request outputs from height 0 to 100 (the tranche)
755
        //    - The sync peer only returns outputs per block that would still be unspent at height 100 and all inputs
756
        //      per block. All outputs that were created and spent within the tranche are never returned.
757
        //    - For example, an output is created in block 50 and spent in block 70. It would be included in the SMT for
758
        //      headers from height 50 to 69, but due to the optimization, the sync peer would never know about it.
759
        // 2. Consecutive sync:
760
        //    - We request outputs from height 101 to 200 (the tranche)
761
        //    - The sync peer only returns outputs per block that would still be unspent at height 200, as well as all
762
        //      inputs per block, but in this case, only those inputs that are not an output of the current tranche of
763
        //      outputs. Similarly, all outputs created and spent within the tranche are never returned.
764
        //    - For example, an output is created in block 110 and spent in block 180. It would be included in the SMT
765
        //      for headers from height 110 to 179, but due to the optimization, the sync peer would never know about
766
        //      it.
767
        // 3. In both cases it would be impossible to verify the SMT per block, as we would not be able to update the
768
        //    SMT with the outputs that were created and spent within the tranche.
UNCOV
769
        todo!("Implement SMT check");
×
770
        // HorizonStateSynchronization::<B>::check_output_smt_root_hash(&mut output_smt, to_header)?;
771

772
        // // Commit in chunks to avoid locking the database for too long
773
        // let inputs_to_delete_len = inputs_to_delete.len();
774
        // for (count, (output_hash, commitment)) in (1..=inputs_to_delete_len).zip(inputs_to_delete.into_iter()) {
775
        //     txn.prune_output_from_all_dbs(output_hash, commitment, OutputType::default());
776
        //     if count % 100 == 0 || count == inputs_to_delete_len {
777
        //         txn.commit().await?;
778
        //     }
779
        // }
780
        // let mut writing_lock_output_smt = db.inner().smt_write_access()?;
781
        // *writing_lock_output_smt = output_smt;
782
        // debug!(
783
        //     target: LOG_TARGET,
784
        //     "Finished syncing TXOs: {} unspent and {} spent downloaded in {:.2?}",
785
        //     utxo_counter,
786
        //     stxo_counter,
787
        //     timer.elapsed()
788
        // );
789
        // Ok(())
UNCOV
790
    }
×
791

792
    // Helper function to check the output SMT root hash against the expected root hash.
793
    // fn check_output_smt_root_hash(output_smt: &LmdbTreeReader, header: &BlockHeader) -> Result<(), HorizonSyncError>
794
    // {     let tree = JellyfishMerkleTree::<_, SmtHasher>::new(output_smt);
795
    //     let root = tree.get_root_hash(header.height).map_err(|e| HorizonSyncError::SMTError(()))
796
    //     if root != header.output_mr {
797
    //         warn!(
798
    //             target: LOG_TARGET,
799
    //             "Target root(#{}) did not match expected (#{})",
800
    //                 header.output_mr.to_hex(),
801
    //                 root.to_hex(),
802
    //         );
803
    //         return Err(HorizonSyncError::InvalidMrRoot {
804
    //             mr_tree: "UTXO SMT".to_string(),
805
    //             at_height: header.height,
806
    //             expected_hex: header.output_mr.to_hex(),
807
    //             actual_hex: root.to_hex(),
808
    //         });
809
    //     }
810
    //     Ok(())
811
    // }
812

813
    // Finalize the horizon state synchronization by setting the chain metadata to the local tip and committing
814
    // the horizon state to the blockchain backend.
UNCOV
815
    async fn finalize_horizon_sync(&mut self, sync_peer: &SyncPeer) -> Result<(), HorizonSyncError> {
×
UNCOV
816
        debug!(target: LOG_TARGET, "Validating horizon state");
×
817

UNCOV
818
        self.hooks.call_on_progress_horizon_hooks(HorizonSyncInfo::new(
×
819
            vec![sync_peer.node_id().clone()],
×
820
            HorizonSyncStatus::Finalizing,
×
UNCOV
821
        ));
×
822

823
        let header = self.db().fetch_chain_header(self.horizon_sync_height).await?;
×
824
        let (calc_utxo_sum, calc_kernel_sum, calc_burned_sum) = self.calculate_commitment_sums(&header).await?;
×
825

UNCOV
826
        self.final_state_validator
×
827
            .validate(
×
828
                &*self.db().inner().db_read_access()?,
×
UNCOV
829
                header.height(),
×
830
                &calc_utxo_sum,
×
831
                &calc_kernel_sum,
×
832
                &calc_burned_sum,
×
833
            )
×
834
            .map_err(HorizonSyncError::FinalStateValidationFailed)?;
×
835

836
        let metadata = self.db().get_chain_metadata().await?;
×
837
        info!(
×
838
            target: LOG_TARGET,
×
UNCOV
839
            "Horizon state validation succeeded! Committing horizon state."
×
840
        );
841
        self.db()
×
842
            .write_transaction()
×
843
            .set_best_block(
×
UNCOV
844
                header.height(),
×
845
                *header.hash(),
×
846
                header.accumulated_data().total_accumulated_difficulty,
×
847
                *metadata.best_block_hash(),
×
848
                header.timestamp(),
×
849
            )
×
850
            .set_pruned_height(header.height())
×
851
            .set_horizon_data(calc_kernel_sum, calc_utxo_sum)
×
852
            .commit()
×
853
            .await?;
×
854

855
        Ok(())
×
856
    }
×
857

858
    /// (UTXO sum, Kernel sum)
859
    async fn calculate_commitment_sums(
×
860
        &mut self,
×
UNCOV
861
        header: &ChainHeader,
×
UNCOV
862
    ) -> Result<(CompressedCommitment, CompressedCommitment, CompressedCommitment), HorizonSyncError> {
×
863
        let mut utxo_sum = HomomorphicCommitment::default();
×
864
        let mut kernel_sum = HomomorphicCommitment::default();
×
865
        let mut burned_sum = HomomorphicCommitment::default();
×
866

×
867
        let mut prev_kernel_mmr = 0;
×
868

×
869
        let height = header.height();
×
870
        let db = self.db().inner().clone();
×
871
        let header_hash = *header.hash();
×
872
        task::spawn_blocking(move || {
×
873
            for h in 0..=height {
×
874
                let curr_header = db.fetch_chain_header(h)?;
×
875
                trace!(
×
876
                    target: LOG_TARGET,
×
877
                    "Fetching utxos from db: height:{}",
×
878
                    curr_header.height(),
×
879
                );
880
                let utxos = db.fetch_outputs_in_block_with_spend_state(*curr_header.hash(), Some(header_hash))?;
×
881
                debug!(
×
882
                    target: LOG_TARGET,
×
UNCOV
883
                    "{} output(s) loaded for height {}",
×
884
                    utxos.len(),
×
885
                    curr_header.height()
×
886
                );
887
                trace!(
×
888
                    target: LOG_TARGET,
×
889
                    "Fetching kernels from db: height:{}, header.kernel_mmr:{}, prev_mmr:{}, end:{}",
×
UNCOV
890
                    curr_header.height(),
×
891
                    curr_header.header().kernel_mmr_size,
×
892
                    prev_kernel_mmr,
×
893
                    curr_header.header().kernel_mmr_size.saturating_sub(1)
×
894
                );
895

896
                trace!(target: LOG_TARGET, "Number of utxos returned: {}", utxos.len());
×
897
                for (u, spent) in utxos {
×
UNCOV
898
                    if !spent {
×
UNCOV
899
                        utxo_sum = &u.commitment.to_commitment()? + &utxo_sum;
×
900
                    }
×
901
                }
902

903
                let kernels = db.fetch_kernels_in_block(*curr_header.hash())?;
×
904
                trace!(target: LOG_TARGET, "Number of kernels returned: {}", kernels.len());
×
UNCOV
905
                for k in kernels {
×
UNCOV
906
                    kernel_sum = &k.excess.to_commitment()? + &kernel_sum;
×
907
                    if k.is_burned() {
×
908
                        burned_sum = &(k.get_burn_commitment()?.to_commitment()?) + &burned_sum;
×
909
                    }
×
910
                }
911
                prev_kernel_mmr = curr_header.header().kernel_mmr_size;
×
912

×
913
                if h % 1000 == 0 && height != 0 {
×
UNCOV
914
                    debug!(
×
915
                        target: LOG_TARGET,
×
916
                        "Final Validation: {:.2}% complete. Height: {} sync",
×
917
                        (h as f32 / height as f32) * 100.0,
×
918
                        h,
919
                    );
920
                }
×
921
            }
922

UNCOV
923
            Ok((
×
924
                CompressedCommitment::from_commitment(utxo_sum),
×
UNCOV
925
                CompressedCommitment::from_commitment(kernel_sum),
×
UNCOV
926
                CompressedCommitment::from_commitment(burned_sum),
×
927
            ))
×
928
        })
×
929
        .await?
×
930
    }
×
931

932
    // Sync peers are also removed from the list of sync peers if the ban duration is longer than the short ban period.
933
    fn remove_sync_peer(&mut self, node_id: &NodeId) {
×
934
        if let Some(pos) = self.sync_peers.iter().position(|p| p.node_id() == node_id) {
×
UNCOV
935
            self.sync_peers.remove(pos);
×
UNCOV
936
        }
×
937
    }
×
938

939
    // Helper function to get the index to the node_id inside of the vec of peers
940
    fn get_sync_peer_index(&mut self, node_id: &NodeId) -> Option<usize> {
×
941
        self.sync_peers.iter().position(|p| p.node_id() == node_id)
×
UNCOV
942
    }
×
943

944
    #[inline]
945
    fn db(&self) -> &AsyncBlockchainDb<B> {
×
946
        &self.db
×
UNCOV
947
    }
×
948
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc