• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 17275382059

27 Aug 2025 06:28PM UTC coverage: 60.14% (-0.1%) from 60.274%
17275382059

push

github

web-flow
chore: new release v5.0.0-pre.8 (#7446)

Description
---
new release

71505 of 118897 relevant lines covered (60.14%)

536444.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/base_layer/core/src/base_node/sync/horizon_state_sync/synchronizer.rs
1
//  Copyright 2022, The Tari Project
2
//
3
//  Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
//  following conditions are met:
5
//
6
//  1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
//  disclaimer.
8
//
9
//  2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
//  following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
//  3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
//  products derived from this software without specific prior written permission.
14
//
15
//  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
//  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
//  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
//  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
//  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
//  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
//  USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
use std::{
24
    cmp,
25
    convert::{TryFrom, TryInto},
26
    sync::Arc,
27
    time::{Duration, Instant},
28
};
29

30
use futures::StreamExt;
31
use log::*;
32
use tari_common_types::types::{CompressedCommitment, FixedHash, RangeProofService};
33
use tari_comms::{connectivity::ConnectivityRequester, peer_manager::NodeId, protocol::rpc::RpcClient, PeerConnection};
34
use tari_crypto::commitment::HomomorphicCommitment;
35
use tari_node_components::blocks::BlockHeader;
36
use tari_transaction_components::{
37
    transaction_components::{transaction_output::batch_verify_range_proofs, TransactionKernel, TransactionOutput},
38
    validation::{aggregate_body::validate_individual_output, helpers::validate_output_version},
39
    BanPeriod,
40
};
41
use tari_utilities::{hex::Hex, ByteArray};
42
use tokio::task;
43

44
use super::error::HorizonSyncError;
45
use crate::{
46
    base_node::sync::{
47
        ban::PeerBanManager,
48
        hooks::Hooks,
49
        horizon_state_sync::{HorizonSyncInfo, HorizonSyncStatus},
50
        rpc,
51
        rpc::BaseNodeSyncRpcClient,
52
        BlockchainSyncConfig,
53
        SyncPeer,
54
    },
55
    blocks::{ChainHeader, UpdateBlockAccumulatedData},
56
    chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend, ChainStorageError, MmrTree},
57
    common::rolling_avg::RollingAverageTime,
58
    consensus::BaseNodeConsensusManager,
59
    proto::base_node::{sync_utxos_response::Txo, SyncKernelsRequest, SyncUtxosRequest, SyncUtxosResponse},
60
    validation::FinalHorizonStateValidation,
61
    PrunedKernelMmr,
62
};
63

64
const LOG_TARGET: &str = "c::bn::state_machine_service::states::horizon_state_sync";
65

66
const MAX_LATENCY_INCREASES: usize = 5;
67

68
pub struct HorizonStateSynchronization<'a, B> {
69
    config: BlockchainSyncConfig,
70
    db: AsyncBlockchainDb<B>,
71
    rules: BaseNodeConsensusManager,
72
    sync_peers: &'a mut Vec<SyncPeer>,
73
    horizon_sync_height: u64,
74
    prover: Arc<RangeProofService>,
75
    num_kernels: u64,
76
    num_outputs: u64,
77
    hooks: Hooks,
78
    connectivity: ConnectivityRequester,
79
    final_state_validator: Arc<dyn FinalHorizonStateValidation<B>>,
80
    max_latency: Duration,
81
    peer_ban_manager: PeerBanManager,
82
}
83

84
impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
85
    #[allow(clippy::too_many_arguments)]
86
    pub fn new(
×
87
        config: BlockchainSyncConfig,
×
88
        db: AsyncBlockchainDb<B>,
×
89
        connectivity: ConnectivityRequester,
×
90
        rules: BaseNodeConsensusManager,
×
91
        sync_peers: &'a mut Vec<SyncPeer>,
×
92
        horizon_sync_height: u64,
×
93
        prover: Arc<RangeProofService>,
×
94
        final_state_validator: Arc<dyn FinalHorizonStateValidation<B>>,
×
95
    ) -> Self {
×
96
        let peer_ban_manager = PeerBanManager::new(config.clone(), connectivity.clone());
×
97
        Self {
×
98
            max_latency: config.initial_max_sync_latency,
×
99
            config,
×
100
            db,
×
101
            rules,
×
102
            connectivity,
×
103
            sync_peers,
×
104
            horizon_sync_height,
×
105
            prover,
×
106
            num_kernels: 0,
×
107
            num_outputs: 0,
×
108
            hooks: Hooks::default(),
×
109
            final_state_validator,
×
110
            peer_ban_manager,
×
111
        }
×
112
    }
×
113

114
    pub fn on_starting<H>(&mut self, hook: H)
×
115
    where for<'r> H: FnOnce(&SyncPeer) + Send + Sync + 'static {
×
116
        self.hooks.add_on_starting_hook(hook);
×
117
    }
×
118

119
    pub fn on_progress<H>(&mut self, hook: H)
×
120
    where H: Fn(HorizonSyncInfo) + Send + Sync + 'static {
×
121
        self.hooks.add_on_progress_horizon_hook(hook);
×
122
    }
×
123

124
    pub async fn synchronize(&mut self) -> Result<(), HorizonSyncError> {
×
125
        if self.sync_peers.is_empty() {
×
126
            return Err(HorizonSyncError::NoSyncPeers);
×
127
        }
×
128

×
129
        debug!(
×
130
            target: LOG_TARGET,
×
131
            "Preparing database for horizon sync to height #{}", self.horizon_sync_height
×
132
        );
133
        let to_header = self.db().fetch_header(self.horizon_sync_height).await?.ok_or_else(|| {
×
134
            ChainStorageError::ValueNotFound {
×
135
                entity: "Header",
×
136
                field: "height",
×
137
                value: self.horizon_sync_height.to_string(),
×
138
            }
×
139
        })?;
×
140

141
        let mut latency_increases_counter = 0;
×
142
        loop {
143
            match self.sync(&to_header).await {
×
144
                Ok(()) => return Ok(()),
×
145
                Err(err @ HorizonSyncError::AllSyncPeersExceedLatency) => {
×
146
                    // If we don't have many sync peers to select from, return the listening state and see if we can get
×
147
                    // some more.
×
148
                    warn!(
×
149
                        target: LOG_TARGET,
×
150
                        "Slow sync peers detected: {}",
×
151
                        self.sync_peers
×
152
                            .iter()
×
153
                            .map(|p| format!("{} ({:.2?})", p.node_id(), p.latency().unwrap_or_default()))
×
154
                            .collect::<Vec<_>>()
×
155
                            .join(", ")
×
156
                    );
157
                    if self.sync_peers.len() < 2 {
×
158
                        return Err(err);
×
159
                    }
×
160
                    self.max_latency += self.config.max_latency_increase;
×
161
                    latency_increases_counter += 1;
×
162
                    if latency_increases_counter > MAX_LATENCY_INCREASES {
×
163
                        return Err(err);
×
164
                    }
×
165
                },
166
                Err(err) => return Err(err),
×
167
            }
168
        }
169
    }
×
170

171
    async fn sync(&mut self, to_header: &BlockHeader) -> Result<(), HorizonSyncError> {
×
172
        let sync_peer_node_ids = self.sync_peers.iter().map(|p| p.node_id()).cloned().collect::<Vec<_>>();
×
173
        info!(
×
174
            target: LOG_TARGET,
×
175
            "Attempting to sync horizon state ({} sync peers)",
×
176
            sync_peer_node_ids.len()
×
177
        );
178
        let mut latency_counter = 0usize;
×
179
        for node_id in sync_peer_node_ids {
×
180
            match self.connect_and_attempt_sync(&node_id, to_header).await {
×
181
                Ok(_) => return Ok(()),
×
182
                // Try another peer
183
                Err(err) => {
×
184
                    let ban_reason = HorizonSyncError::get_ban_reason(&err);
×
185

186
                    if let Some(reason) = ban_reason {
×
187
                        let duration = match reason.ban_duration {
×
188
                            BanPeriod::Short => self.config.short_ban_period,
×
189
                            BanPeriod::Long => self.config.ban_period,
×
190
                        };
191
                        warn!(target: LOG_TARGET, "{err}");
×
192
                        self.peer_ban_manager
×
193
                            .ban_peer_if_required(&node_id, reason.reason, duration)
×
194
                            .await;
×
195
                    }
×
196
                    if let HorizonSyncError::MaxLatencyExceeded { .. } = err {
×
197
                        latency_counter += 1;
×
198
                    } else {
×
199
                        self.remove_sync_peer(&node_id);
×
200
                    }
×
201
                },
202
            }
203
        }
204

205
        if self.sync_peers.is_empty() {
×
206
            Err(HorizonSyncError::NoMoreSyncPeers("Header sync failed".to_string()))
×
207
        } else if latency_counter >= self.sync_peers.len() {
×
208
            Err(HorizonSyncError::AllSyncPeersExceedLatency)
×
209
        } else {
210
            Err(HorizonSyncError::FailedSyncAllPeers)
×
211
        }
212
    }
×
213

214
    async fn connect_and_attempt_sync(
×
215
        &mut self,
×
216
        node_id: &NodeId,
×
217
        to_header: &BlockHeader,
×
218
    ) -> Result<(), HorizonSyncError> {
×
219
        // Connect
220
        let (mut client, sync_peer) = self.connect_sync_peer(node_id).await?;
×
221

222
        // Perform horizon sync
223
        debug!(target: LOG_TARGET, "Check if pruning is needed");
×
224
        self.prune_if_needed().await?;
×
225
        self.sync_kernels_and_outputs(sync_peer.clone(), &mut client, to_header)
×
226
            .await?;
×
227

228
        // Validate and finalize horizon sync
229
        self.finalize_horizon_sync(&sync_peer).await?;
×
230

231
        Ok(())
×
232
    }
×
233

234
    async fn connect_sync_peer(
×
235
        &mut self,
×
236
        node_id: &NodeId,
×
237
    ) -> Result<(BaseNodeSyncRpcClient, SyncPeer), HorizonSyncError> {
×
238
        let peer_index = self
×
239
            .get_sync_peer_index(node_id)
×
240
            .ok_or(HorizonSyncError::PeerNotFound)?;
×
241
        let sync_peer = self.sync_peers.get(peer_index).expect("Already checked");
×
242
        self.hooks.call_on_starting_hook(sync_peer);
×
243

244
        let mut conn = self.dial_sync_peer(node_id).await?;
×
245
        debug!(
×
246
            target: LOG_TARGET,
×
247
            "Attempting to synchronize horizon state with `{node_id}`"
×
248
        );
249

250
        let config = RpcClient::builder()
×
251
            .with_deadline(self.config.rpc_deadline)
×
252
            .with_deadline_grace_period(Duration::from_secs(5));
×
253

254
        let mut client = conn
×
255
            .connect_rpc_using_builder::<rpc::BaseNodeSyncRpcClient>(config)
×
256
            .await?;
×
257

258
        let latency = client
×
259
            .get_last_request_latency()
×
260
            .expect("unreachable panic: last request latency must be set after connect");
×
261
        self.sync_peers
×
262
            .get_mut(peer_index)
×
263
            .expect("Already checked")
×
264
            .set_latency(latency);
×
265
        if latency > self.max_latency {
×
266
            return Err(HorizonSyncError::MaxLatencyExceeded {
×
267
                peer: conn.peer_node_id().clone(),
×
268
                latency,
×
269
                max_latency: self.max_latency,
×
270
            });
×
271
        }
×
272
        debug!(target: LOG_TARGET, "Sync peer latency is {latency:.2?}");
×
273

274
        Ok((
×
275
            client,
×
276
            self.sync_peers.get(peer_index).expect("Already checked").clone(),
×
277
        ))
×
278
    }
×
279

280
    async fn dial_sync_peer(&self, node_id: &NodeId) -> Result<PeerConnection, HorizonSyncError> {
×
281
        let timer = Instant::now();
×
282
        debug!(target: LOG_TARGET, "Dialing {node_id} sync peer");
×
283
        let conn = self.connectivity.dial_peer(node_id.clone()).await?;
×
284
        info!(
×
285
            target: LOG_TARGET,
×
286
            "Successfully dialed sync peer {} in {:.2?}",
×
287
            node_id,
×
288
            timer.elapsed()
×
289
        );
290
        Ok(conn)
×
291
    }
×
292

293
    async fn sync_kernels_and_outputs(
×
294
        &mut self,
×
295
        sync_peer: SyncPeer,
×
296
        client: &mut rpc::BaseNodeSyncRpcClient,
×
297
        to_header: &BlockHeader,
×
298
    ) -> Result<(), HorizonSyncError> {
×
299
        // Note: We do not need to rewind kernels if the sync fails due to it being validated when inserted into
×
300
        //       the database. Furthermore, these kernels will also be successfully removed when we need to rewind
×
301
        //       the blockchain for whatever reason.
×
302
        debug!(target: LOG_TARGET, "Synchronizing kernels");
×
303
        self.synchronize_kernels(sync_peer.clone(), client, to_header).await?;
×
304
        debug!(target: LOG_TARGET, "Synchronizing outputs");
×
305
        // let cloned_backup_smt = self.db.inner().smt_read_access()?.clone();
306
        match self.synchronize_outputs(sync_peer, client, to_header).await {
×
307
            Ok(_) => Ok(()),
×
308
            Err(err) => {
×
309
                // We need to clean up the outputs
×
310
                let _ = self.clean_up_failed_output_sync(to_header).await;
×
311
                // let mut smt = self.db.inner().smt_write_access()?;
312
                // *smt = cloned_backup_smt;
313
                Err(err)
×
314
            },
315
        }
316
    }
×
317

318
    /// We clean up a failed output sync attempt and ignore any errors that occur during the clean up process.
319
    async fn clean_up_failed_output_sync(&mut self, to_header: &BlockHeader) {
×
320
        let tip_header = if let Ok(header) = self.db.fetch_tip_header().await {
×
321
            header
×
322
        } else {
323
            return;
×
324
        };
325
        let db = self.db().clone();
×
326
        let mut txn = db.write_transaction();
×
327
        let mut current_header = to_header.clone();
×
328
        loop {
329
            if let Ok(outputs) = self.db.fetch_outputs_in_block(current_header.hash()).await {
×
330
                for (count, output) in (1..=outputs.len()).zip(outputs.iter()) {
×
331
                    // Note: We do not need to clean up the SMT as it was not saved in the database yet, however, we
332
                    // need to clean up the outputs
333
                    txn.prune_output_from_all_dbs(
×
334
                        output.hash(),
×
335
                        output.commitment.clone(),
×
336
                        output.features.output_type,
×
337
                    );
×
338
                    if let Err(e) = txn.commit().await {
×
339
                        warn!(
×
340
                        target: LOG_TARGET,
×
341
                        "Clean up failed sync - prune output from all dbs for header '{}': {}",
×
342
                        current_header.hash(), e
×
343
                        );
344
                    }
×
345
                    if count % 100 == 0 || count == outputs.len() {
×
346
                        if let Err(e) = txn.commit().await {
×
347
                            warn!(
×
348
                                target: LOG_TARGET,
×
349
                                "Clean up failed sync - commit prune outputs for header '{}': {}",
×
350
                                current_header.hash(), e
×
351
                            );
352
                        }
×
353
                    }
×
354
                }
355
            }
×
356
            if let Err(e) = txn.commit().await {
×
357
                warn!(
×
358
                    target: LOG_TARGET, "Clean up failed output sync - commit delete kernels for header '{}': {}",
×
359
                    current_header.hash(), e
×
360
                );
361
            }
×
362
            if let Ok(header) = db.fetch_header_by_block_hash(current_header.prev_hash).await {
×
363
                if let Some(previous_header) = header {
×
364
                    current_header = previous_header;
×
365
                } else {
×
366
                    warn!(target: LOG_TARGET, "Could not clean up failed output sync, previous_header link missing frm db");
×
367
                    break;
×
368
                }
369
            } else {
370
                warn!(
×
371
                    target: LOG_TARGET,
×
372
                    "Could not clean up failed output sync, header '{}' not in db",
×
373
                    current_header.prev_hash.to_hex()
×
374
                );
375
                break;
×
376
            }
377
            if &current_header.hash() == tip_header.hash() {
×
378
                debug!(target: LOG_TARGET, "Finished cleaning up failed output sync");
×
379
                break;
×
380
            }
×
381
        }
382
    }
×
383

384
    async fn prune_if_needed(&mut self) -> Result<(), HorizonSyncError> {
×
385
        let local_metadata = self.db.get_chain_metadata().await?;
×
386
        let new_prune_height = cmp::min(local_metadata.best_block_height(), self.horizon_sync_height);
×
387
        if local_metadata.pruned_height() < new_prune_height {
×
388
            debug!(target: LOG_TARGET, "Pruning block chain to height {new_prune_height}");
×
389
            self.db.prune_to_height(new_prune_height).await?;
×
390
        }
×
391

392
        Ok(())
×
393
    }
×
394

395
    #[allow(clippy::too_many_lines)]
396
    async fn synchronize_kernels(
×
397
        &mut self,
×
398
        mut sync_peer: SyncPeer,
×
399
        client: &mut rpc::BaseNodeSyncRpcClient,
×
400
        to_header: &BlockHeader,
×
401
    ) -> Result<(), HorizonSyncError> {
×
402
        info!(target: LOG_TARGET, "Starting kernel sync from peer {sync_peer}");
×
403
        let local_num_kernels = self.db().fetch_mmr_size(MmrTree::Kernel).await?;
×
404

405
        let remote_num_kernels = to_header.kernel_mmr_size;
×
406
        self.num_kernels = remote_num_kernels;
×
407

×
408
        if local_num_kernels >= remote_num_kernels {
×
409
            debug!(target: LOG_TARGET, "Local kernel set already synchronized");
×
410
            return Ok(());
×
411
        }
×
412

×
413
        let info = HorizonSyncInfo::new(vec![sync_peer.node_id().clone()], HorizonSyncStatus::Kernels {
×
414
            current: local_num_kernels,
×
415
            total: remote_num_kernels,
×
416
            sync_peer: sync_peer.clone(),
×
417
        });
×
418
        self.hooks.call_on_progress_horizon_hooks(info);
×
419

×
420
        debug!(
×
421
            target: LOG_TARGET,
×
422
            "Requesting kernels from {} to {} ({} remaining)",
×
423
            local_num_kernels,
×
424
            remote_num_kernels,
×
425
            remote_num_kernels.saturating_sub(local_num_kernels),
×
426
        );
427

428
        let latency = client.get_last_request_latency();
×
429
        debug!(
×
430
            target: LOG_TARGET,
×
431
            "Initiating kernel sync with peer `{}` (latency = {}ms)",
×
432
            sync_peer.node_id(),
×
433
            latency.unwrap_or_default().as_millis()
×
434
        );
435

436
        let mut current_header = self.db().fetch_header_containing_kernel_mmr(local_num_kernels).await?;
×
437
        let req = SyncKernelsRequest {
×
438
            start: local_num_kernels,
×
439
            end_header_hash: to_header.hash().to_vec(),
×
440
        };
×
441
        let mut kernel_stream = client.sync_kernels(req).await?;
×
442

443
        debug!(
×
444
            target: LOG_TARGET,
×
445
            "Found header for kernels at mmr pos: {} height: {}",
×
446
            local_num_kernels,
×
447
            current_header.height()
×
448
        );
449
        let mut kernel_hashes = vec![];
×
450
        let db = self.db().clone();
×
451
        let mut txn = db.write_transaction();
×
452
        let mut mmr_position = local_num_kernels;
×
453
        let end = remote_num_kernels;
×
454
        let mut last_sync_timer = Instant::now();
×
455
        let mut avg_latency = RollingAverageTime::new(20);
×
456
        while let Some(kernel) = kernel_stream.next().await {
×
457
            let latency = last_sync_timer.elapsed();
×
458
            avg_latency.add_sample(latency);
×
459
            let kernel: TransactionKernel = kernel?.try_into().map_err(HorizonSyncError::ConversionError)?;
×
460
            kernel.verify_signature()?;
×
461

462
            kernel_hashes.push(kernel.hash());
×
463

×
464
            if mmr_position > end {
×
465
                return Err(HorizonSyncError::IncorrectResponse(
×
466
                    "Peer sent too many kernels".to_string(),
×
467
                ));
×
468
            }
×
469

×
470
            txn.insert_kernel_via_horizon_sync(kernel, *current_header.hash(), mmr_position);
×
471
            if mmr_position == current_header.header().kernel_mmr_size.saturating_sub(1) {
×
472
                let num_kernels = kernel_hashes.len();
×
473
                debug!(
×
474
                    target: LOG_TARGET,
×
475
                    "Header #{} ({} kernels, latency: {:.2?})",
×
476
                    current_header.height(),
×
477
                    num_kernels,
478
                    latency
479
                );
480
                // Validate root
481
                let block_data = db
×
482
                    .fetch_block_accumulated_data(current_header.header().prev_hash)
×
483
                    .await?;
×
484
                let kernel_pruned_set = block_data.dissolve();
×
485
                let mut kernel_mmr = PrunedKernelMmr::new(kernel_pruned_set);
×
486

487
                for hash in kernel_hashes.drain(..) {
×
488
                    kernel_mmr.push(hash.to_vec())?;
×
489
                }
490

491
                let mmr_root = kernel_mmr.get_merkle_root()?;
×
492
                if mmr_root.as_slice() != current_header.header().kernel_mr.as_slice() {
×
493
                    return Err(HorizonSyncError::InvalidMrRoot {
×
494
                        mr_tree: MmrTree::Kernel.to_string(),
×
495
                        at_height: current_header.height(),
×
496
                        expected_hex: current_header.header().kernel_mr.to_hex(),
×
497
                        actual_hex: mmr_root.to_hex(),
×
498
                    });
×
499
                }
×
500

501
                let kernel_hash_set = kernel_mmr.get_pruned_hash_set()?;
×
502
                debug!(
×
503
                    target: LOG_TARGET,
×
504
                    "Updating block data at height {}",
×
505
                    current_header.height()
×
506
                );
507
                txn.update_block_accumulated_data_via_horizon_sync(
×
508
                    *current_header.hash(),
×
509
                    UpdateBlockAccumulatedData {
×
510
                        kernel_hash_set: Some(kernel_hash_set),
×
511
                        ..Default::default()
×
512
                    },
×
513
                );
×
514

×
515
                txn.commit().await?;
×
516
                debug!(
×
517
                    target: LOG_TARGET,
×
518
                    "Committed {} kernel(s), ({}/{}) {} remaining",
×
519
                    num_kernels,
×
520
                    mmr_position + 1,
×
521
                    end,
×
522
                    end.saturating_sub(mmr_position + 1)
×
523
                );
524
                if mmr_position < end.saturating_sub(1) {
×
525
                    current_header = db.fetch_chain_header(current_header.height() + 1).await?;
×
526
                }
×
527
            }
×
528
            mmr_position += 1;
×
529

×
530
            sync_peer.set_latency(latency);
×
531
            sync_peer.add_sample(last_sync_timer.elapsed());
×
532
            if mmr_position % 100 == 0 || mmr_position == self.num_kernels {
×
533
                let info = HorizonSyncInfo::new(vec![sync_peer.node_id().clone()], HorizonSyncStatus::Kernels {
×
534
                    current: mmr_position,
×
535
                    total: self.num_kernels,
×
536
                    sync_peer: sync_peer.clone(),
×
537
                });
×
538
                self.hooks.call_on_progress_horizon_hooks(info);
×
539
            }
×
540

541
            self.check_latency(sync_peer.node_id(), &avg_latency)?;
×
542

543
            last_sync_timer = Instant::now();
×
544
        }
545

546
        if mmr_position != end {
×
547
            return Err(HorizonSyncError::IncorrectResponse(
×
548
                "Sync node did not send all kernels requested".to_string(),
×
549
            ));
×
550
        }
×
551
        Ok(())
×
552
    }
×
553

554
    fn check_latency(&self, peer: &NodeId, avg_latency: &RollingAverageTime) -> Result<(), HorizonSyncError> {
×
555
        if let Some(avg_latency) = avg_latency.calculate_average_with_min_samples(5) {
×
556
            if avg_latency > self.max_latency {
×
557
                return Err(HorizonSyncError::MaxLatencyExceeded {
×
558
                    peer: peer.clone(),
×
559
                    latency: avg_latency,
×
560
                    max_latency: self.max_latency,
×
561
                });
×
562
            }
×
563
        }
×
564

565
        Ok(())
×
566
    }
×
567

568
    // Synchronize outputs, returning true if any keys were deleted from the output SMT.
569
    #[allow(clippy::too_many_lines)]
570
    async fn synchronize_outputs(
×
571
        &mut self,
×
572
        mut sync_peer: SyncPeer,
×
573
        client: &mut rpc::BaseNodeSyncRpcClient,
×
574
        to_header: &BlockHeader,
×
575
    ) -> Result<(), HorizonSyncError> {
×
576
        info!(target: LOG_TARGET, "Starting output sync from peer {sync_peer}");
×
577
        let db = self.db().clone();
×
578
        let tip_header = db.fetch_tip_header().await?;
×
579

580
        // Estimate the number of outputs to be downloaded; this cannot be known exactly until the sync is complete.
581
        let mut current_header = to_header.clone();
×
582
        self.num_outputs = 0;
×
583
        loop {
584
            current_header =
×
585
                if let Some(previous_header) = db.fetch_header_by_block_hash(current_header.prev_hash).await? {
×
586
                    self.num_outputs += current_header
×
587
                        .output_smt_size
×
588
                        .saturating_sub(previous_header.output_smt_size);
×
589
                    previous_header
×
590
                } else {
591
                    break;
×
592
                };
593
            if &current_header.hash() == tip_header.hash() {
×
594
                break;
×
595
            }
×
596
        }
597

598
        let info = HorizonSyncInfo::new(vec![sync_peer.node_id().clone()], HorizonSyncStatus::Outputs {
×
599
            current: 0,
×
600
            total: self.num_outputs,
×
601
            sync_peer: sync_peer.clone(),
×
602
        });
×
603
        self.hooks.call_on_progress_horizon_hooks(info);
×
604

×
605
        let latency = client.get_last_request_latency();
×
606
        debug!(
×
607
            target: LOG_TARGET,
×
608
            "Initiating output sync with peer `{}`, requesting ~{} outputs, tip_header height `{}`, \
×
609
            last_chain_header height `{}` (latency = {}ms)",
×
610
            sync_peer.node_id(),
×
611
            self.num_outputs,
×
612
            tip_header.height(),
×
613
            db.fetch_last_chain_header().await?.height(),
×
614
            latency.unwrap_or_default().as_millis(),
×
615
        );
616

617
        let start_chain_header = db.fetch_chain_header(tip_header.height() + 1).await?;
×
618
        let req = SyncUtxosRequest {
×
619
            start_header_hash: start_chain_header.hash().to_vec(),
×
620
            end_header_hash: to_header.hash().to_vec(),
×
621
        };
×
622
        let mut output_stream = client.sync_utxos(req).await?;
×
623

624
        // let mut txn = db.write_transaction();
625
        let mut utxo_counter = 0u64;
×
626
        let mut stxo_counter = 0u64;
×
627
        // let mut output_smt = (*db.inner().smt_write_access()?).clone();
×
628
        let mut last_sync_timer = Instant::now();
×
629
        let mut avg_latency = RollingAverageTime::new(20);
×
630

631
        // let mut inputs_to_delete = Vec::new();
632
        while let Some(response) = output_stream.next().await {
×
633
            let latency = last_sync_timer.elapsed();
×
634
            avg_latency.add_sample(latency);
×
635
            let res: SyncUtxosResponse = response?;
×
636

637
            let output_header_hash = FixedHash::try_from(res.mined_header)
×
638
                .map_err(|_| HorizonSyncError::IncorrectResponse("Peer sent no mined header".into()))?;
×
639
            let current_header = self
×
640
                .db()
×
641
                .fetch_header_by_block_hash(output_header_hash)
×
642
                .await?
×
643
                .ok_or_else(|| {
×
644
                    HorizonSyncError::IncorrectResponse("Peer sent mined header we do not know of".into())
×
645
                })?;
×
646

647
            let proto_output = res
×
648
                .txo
×
649
                .ok_or_else(|| HorizonSyncError::IncorrectResponse("Peer sent no transaction output data".into()))?;
×
650
            match proto_output {
×
651
                Txo::Output(output) => {
×
652
                    utxo_counter += 1;
×
653
                    // Increase the estimate number of outputs to be downloaded (for display purposes only).
×
654
                    if utxo_counter >= self.num_outputs {
×
655
                        self.num_outputs = utxo_counter + u64::from(current_header.hash() != to_header.hash());
×
656
                    }
×
657

658
                    let constants = self.rules.consensus_constants(current_header.height).clone();
×
659
                    let output = TransactionOutput::try_from(output).map_err(HorizonSyncError::ConversionError)?;
×
660
                    if !output.is_burned() {
×
661
                        debug!(
×
662
                            target: LOG_TARGET,
×
663
                            "UTXO `{}` received from sync peer ({} of {})",
×
664
                            output.hash(),
×
665
                            utxo_counter,
666
                            self.num_outputs,
667
                        );
668
                        validate_output_version(&constants, &output)?;
×
669
                        validate_individual_output(&output, &constants)?;
×
670

671
                        batch_verify_range_proofs(&self.prover, &[&output])?;
×
672
                        // let smt_key = NodeKey::try_from(output.commitment.as_bytes())?;
673
                        // let smt_node = ValueHash::try_from(output.smt_hash(current_header.height).as_slice())?;
674
                        // if let Err(e) = output_smt.insert(smt_key, smt_node) {
675
                        //     error!(
676
                        //         target: LOG_TARGET,
677
                        //         "Output commitment({}) already in SMT",
678
                        //         output.commitment.to_hex(),
679
                        //     );
680
                        //     return Err(e.into());
681
                        // }
682
                        todo!("Implement smt changes");
×
683
                        // txn.insert_output_via_horizon_sync(
684
                        //     output,
685
                        //     current_header.hash(),
686
                        //     current_header.height,
687
                        //     current_header.timestamp.as_u64(),
688
                        // );
689

690
                        // // We have checked the range proof, and we have checked that the linked to header exists.
691
                        // txn.commit().await?;
692
                    }
×
693
                },
694
                Txo::Commitment(commitment_bytes) => {
×
695
                    stxo_counter += 1;
×
696

697
                    let commitment = CompressedCommitment::from_canonical_bytes(commitment_bytes.as_slice())?;
×
698
                    match self
×
699
                        .db()
×
700
                        .fetch_unspent_output_hash_by_commitment(commitment.clone())
×
701
                        .await?
×
702
                    {
703
                        Some(output_hash) => {
×
704
                            debug!(
×
705
                                target: LOG_TARGET,
×
706
                                "STXO hash `{output_hash}` received from sync peer ({stxo_counter})",
×
707
                            );
708
                            // let smt_key = NodeKey::try_from(commitment_bytes.as_slice())?;
709
                            // match output_smt.delete(&smt_key)? {
710
                            //     DeleteResult::Deleted(_value_hash) => {},
711
                            //     DeleteResult::KeyNotFound => {
712
                            //         error!(
713
                            //             target: LOG_TARGET,
714
                            //             "Could not find input({}) in SMT",
715
                            //             commitment.to_hex(),
716
                            //         );
717
                            //         return Err(HorizonSyncError::ChainStorageError(
718
                            //             ChainStorageError::UnspendableInput,
719
                            //         ));
720
                            //     },
721
                            // };
722
                            todo!("Implement smt changes");
×
723
                            // This will only be committed once the SMT has been verified due to rewind difficulties if
724
                            // we need to abort the sync
725
                            // inputs_to_delete.push((output_hash, commitment));
726
                        },
727
                        None => {
728
                            return Err(HorizonSyncError::IncorrectResponse(
×
729
                                "Peer sent unknown commitment hash".into(),
×
730
                            ))
×
731
                        },
732
                    }
733
                },
734
            }
735

736
            if utxo_counter % 100 == 0 {
×
737
                let info = HorizonSyncInfo::new(vec![sync_peer.node_id().clone()], HorizonSyncStatus::Outputs {
×
738
                    current: utxo_counter,
×
739
                    total: self.num_outputs,
×
740
                    sync_peer: sync_peer.clone(),
×
741
                });
×
742
                self.hooks.call_on_progress_horizon_hooks(info);
×
743
            }
×
744
            sync_peer.set_latency(latency);
×
745
            sync_peer.add_sample(last_sync_timer.elapsed());
×
746
            last_sync_timer = Instant::now();
×
747
        }
748
        // The SMT can only be verified after all outputs have been downloaded, due to the way we optimize fetching
749
        // outputs from the sync peer. As an example:
750
        // 1. Initial sync:
751
        //    - We request outputs from height 0 to 100 (the tranche)
752
        //    - The sync peer only returns outputs per block that would still be unspent at height 100 and all inputs
753
        //      per block. All outputs that were created and spent within the tranche are never returned.
754
        //    - For example, an output is created in block 50 and spent in block 70. It would be included in the SMT for
755
        //      headers from height 50 to 69, but due to the optimization, the sync peer would never know about it.
756
        // 2. Consecutive sync:
757
        //    - We request outputs from height 101 to 200 (the tranche)
758
        //    - The sync peer only returns outputs per block that would still be unspent at height 200, as well as all
759
        //      inputs per block, but in this case, only those inputs that are not an output of the current tranche of
760
        //      outputs. Similarly, all outputs created and spent within the tranche are never returned.
761
        //    - For example, an output is created in block 110 and spent in block 180. It would be included in the SMT
762
        //      for headers from height 110 to 179, but due to the optimization, the sync peer would never know about
763
        //      it.
764
        // 3. In both cases it would be impossible to verify the SMT per block, as we would not be able to update the
765
        //    SMT with the outputs that were created and spent within the tranche.
766
        todo!("Implement SMT check");
×
767
        // HorizonStateSynchronization::<B>::check_output_smt_root_hash(&mut output_smt, to_header)?;
768

769
        // // Commit in chunks to avoid locking the database for too long
770
        // let inputs_to_delete_len = inputs_to_delete.len();
771
        // for (count, (output_hash, commitment)) in (1..=inputs_to_delete_len).zip(inputs_to_delete.into_iter()) {
772
        //     txn.prune_output_from_all_dbs(output_hash, commitment, OutputType::default());
773
        //     if count % 100 == 0 || count == inputs_to_delete_len {
774
        //         txn.commit().await?;
775
        //     }
776
        // }
777
        // let mut writing_lock_output_smt = db.inner().smt_write_access()?;
778
        // *writing_lock_output_smt = output_smt;
779
        // debug!(
780
        //     target: LOG_TARGET,
781
        //     "Finished syncing TXOs: {} unspent and {} spent downloaded in {:.2?}",
782
        //     utxo_counter,
783
        //     stxo_counter,
784
        //     timer.elapsed()
785
        // );
786
        // Ok(())
787
    }
×
788

789
    // Helper function to check the output SMT root hash against the expected root hash.
790
    // fn check_output_smt_root_hash(output_smt: &LmdbTreeReader, header: &BlockHeader) -> Result<(), HorizonSyncError>
791
    // {     let tree = JellyfishMerkleTree::<_, SmtHasher>::new(output_smt);
792
    //     let root = tree.get_root_hash(header.height).map_err(|e| HorizonSyncError::SMTError(()))
793
    //     if root != header.output_mr {
794
    //         warn!(
795
    //             target: LOG_TARGET,
796
    //             "Target root(#{}) did not match expected (#{})",
797
    //                 header.output_mr.to_hex(),
798
    //                 root.to_hex(),
799
    //         );
800
    //         return Err(HorizonSyncError::InvalidMrRoot {
801
    //             mr_tree: "UTXO SMT".to_string(),
802
    //             at_height: header.height,
803
    //             expected_hex: header.output_mr.to_hex(),
804
    //             actual_hex: root.to_hex(),
805
    //         });
806
    //     }
807
    //     Ok(())
808
    // }
809

810
    // Finalize the horizon state synchronization by setting the chain metadata to the local tip and committing
811
    // the horizon state to the blockchain backend.
812
    async fn finalize_horizon_sync(&mut self, sync_peer: &SyncPeer) -> Result<(), HorizonSyncError> {
×
813
        debug!(target: LOG_TARGET, "Validating horizon state");
×
814

815
        self.hooks.call_on_progress_horizon_hooks(HorizonSyncInfo::new(
×
816
            vec![sync_peer.node_id().clone()],
×
817
            HorizonSyncStatus::Finalizing,
×
818
        ));
×
819

820
        let header = self.db().fetch_chain_header(self.horizon_sync_height).await?;
×
821
        let (calc_utxo_sum, calc_kernel_sum, calc_burned_sum) = self.calculate_commitment_sums(&header).await?;
×
822

823
        self.final_state_validator
×
824
            .validate(
×
825
                &*self.db().inner().db_read_access()?,
×
826
                header.height(),
×
827
                &calc_utxo_sum,
×
828
                &calc_kernel_sum,
×
829
                &calc_burned_sum,
×
830
            )
×
831
            .map_err(HorizonSyncError::FinalStateValidationFailed)?;
×
832

833
        let metadata = self.db().get_chain_metadata().await?;
×
834
        info!(
×
835
            target: LOG_TARGET,
×
836
            "Horizon state validation succeeded! Committing horizon state."
×
837
        );
838
        self.db()
×
839
            .write_transaction()
×
840
            .set_best_block(
×
841
                header.height(),
×
842
                *header.hash(),
×
843
                header.accumulated_data().total_accumulated_difficulty,
×
844
                *metadata.best_block_hash(),
×
845
                header.timestamp(),
×
846
            )
×
847
            .set_pruned_height(header.height())
×
848
            .set_horizon_data(calc_kernel_sum, calc_utxo_sum)
×
849
            .commit()
×
850
            .await?;
×
851

852
        Ok(())
×
853
    }
×
854

855
    /// (UTXO sum, Kernel sum)
856
    async fn calculate_commitment_sums(
×
857
        &mut self,
×
858
        header: &ChainHeader,
×
859
    ) -> Result<(CompressedCommitment, CompressedCommitment, CompressedCommitment), HorizonSyncError> {
×
860
        let mut utxo_sum = HomomorphicCommitment::default();
×
861
        let mut kernel_sum = HomomorphicCommitment::default();
×
862
        let mut burned_sum = HomomorphicCommitment::default();
×
863

×
864
        let mut prev_kernel_mmr = 0;
×
865

×
866
        let height = header.height();
×
867
        let db = self.db().inner().clone();
×
868
        let header_hash = *header.hash();
×
869
        task::spawn_blocking(move || {
×
870
            for h in 0..=height {
×
871
                let curr_header = db.fetch_chain_header(h)?;
×
872
                trace!(
×
873
                    target: LOG_TARGET,
×
874
                    "Fetching utxos from db: height:{}",
×
875
                    curr_header.height(),
×
876
                );
877
                let utxos = db.fetch_outputs_in_block_with_spend_state(*curr_header.hash(), Some(header_hash))?;
×
878
                debug!(
×
879
                    target: LOG_TARGET,
×
880
                    "{} output(s) loaded for height {}",
×
881
                    utxos.len(),
×
882
                    curr_header.height()
×
883
                );
884
                trace!(
×
885
                    target: LOG_TARGET,
×
886
                    "Fetching kernels from db: height:{}, header.kernel_mmr:{}, prev_mmr:{}, end:{}",
×
887
                    curr_header.height(),
×
888
                    curr_header.header().kernel_mmr_size,
×
889
                    prev_kernel_mmr,
×
890
                    curr_header.header().kernel_mmr_size.saturating_sub(1)
×
891
                );
892

893
                trace!(target: LOG_TARGET, "Number of utxos returned: {}", utxos.len());
×
894
                for (u, spent) in utxos {
×
895
                    if !spent {
×
896
                        utxo_sum = &u.commitment.to_commitment()? + &utxo_sum;
×
897
                    }
×
898
                }
899

900
                let kernels = db.fetch_kernels_in_block(*curr_header.hash())?;
×
901
                trace!(target: LOG_TARGET, "Number of kernels returned: {}", kernels.len());
×
902
                for k in kernels {
×
903
                    kernel_sum = &k.excess.to_commitment()? + &kernel_sum;
×
904
                    if k.is_burned() {
×
905
                        burned_sum = &(k.get_burn_commitment()?.to_commitment()?) + &burned_sum;
×
906
                    }
×
907
                }
908
                prev_kernel_mmr = curr_header.header().kernel_mmr_size;
×
909

×
910
                if h % 1000 == 0 && height != 0 {
×
911
                    debug!(
×
912
                        target: LOG_TARGET,
×
913
                        "Final Validation: {:.2}% complete. Height: {} sync",
×
914
                        (h as f32 / height as f32) * 100.0,
×
915
                        h,
916
                    );
917
                }
×
918
            }
919

920
            Ok((
×
921
                CompressedCommitment::from_commitment(utxo_sum),
×
922
                CompressedCommitment::from_commitment(kernel_sum),
×
923
                CompressedCommitment::from_commitment(burned_sum),
×
924
            ))
×
925
        })
×
926
        .await?
×
927
    }
×
928

929
    // Sync peers are also removed from the list of sync peers if the ban duration is longer than the short ban period.
930
    fn remove_sync_peer(&mut self, node_id: &NodeId) {
×
931
        if let Some(pos) = self.sync_peers.iter().position(|p| p.node_id() == node_id) {
×
932
            self.sync_peers.remove(pos);
×
933
        }
×
934
    }
×
935

936
    // Helper function to get the index to the node_id inside of the vec of peers
937
    fn get_sync_peer_index(&mut self, node_id: &NodeId) -> Option<usize> {
×
938
        self.sync_peers.iter().position(|p| p.node_id() == node_id)
×
939
    }
×
940

941
    #[inline]
942
    fn db(&self) -> &AsyncBlockchainDb<B> {
×
943
        &self.db
×
944
    }
×
945
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc