• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 24836861553

23 Apr 2026 01:04PM UTC coverage: 60.949% (-0.07%) from 61.023%
24836861553

push

github

SWvheerden
chore: new release v5.3.0-pre.12

70768 of 116111 relevant lines covered (60.95%)

224032.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/base_layer/core/src/base_node/sync/horizon_state_sync/synchronizer.rs
1
//  Copyright 2022, The Tari Project
2
//
3
//  Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
//  following conditions are met:
5
//
6
//  1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
//  disclaimer.
8
//
9
//  2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
//  following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
//  3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
//  products derived from this software without specific prior written permission.
14
//
15
//  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
//  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
//  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
//  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
//  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
//  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
//  USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
use std::{
24
    cmp,
25
    collections::BTreeMap,
26
    convert::{TryFrom, TryInto},
27
    sync::Arc,
28
    time::{Duration, Instant},
29
};
30

31
use futures::StreamExt;
32
use log::*;
33
use tari_common_types::types::{CompressedCommitment, FixedHash, RangeProofService};
34
use tari_comms::{
35
    PeerConnection,
36
    connectivity::ConnectivityRequester,
37
    peer_manager::NodeId,
38
    protocol::rpc::{RpcClient, RpcError, RpcStatus},
39
};
40
use tari_crypto::commitment::HomomorphicCommitment;
41
use tari_node_components::blocks::{BlockHeader, ChainHeader};
42
use tari_transaction_components::{
43
    BanPeriod,
44
    transaction_components::{TransactionKernel, TransactionOutput, transaction_output::batch_verify_range_proofs},
45
    validation::{aggregate_body::validate_individual_output, helpers::validate_output_version},
46
};
47
use tari_utilities::{ByteArray, hex::Hex};
48
use tokio::task;
49

50
use super::error::HorizonSyncError;
51
use crate::{
52
    PrunedKernelMmr,
53
    base_node::sync::{
54
        BlockchainSyncConfig,
55
        SyncPeer,
56
        ban::PeerBanManager,
57
        hooks::Hooks,
58
        horizon_state_sync::{HorizonSyncInfo, HorizonSyncStatus},
59
        rpc,
60
        rpc::BaseNodeSyncRpcClient,
61
    },
62
    blocks::UpdateBlockAccumulatedData,
63
    chain_storage::{
64
        BlockchainBackend,
65
        ChainStorageError,
66
        HorizonStateTreeUpdate,
67
        HorizonSyncOutputCheckpoint,
68
        MmrTree,
69
        async_db::AsyncBlockchainDb,
70
    },
71
    common::rolling_avg::RollingAverageTime,
72
    consensus::BaseNodeConsensusManager,
73
    proto::base_node::{SyncKernelsRequest, SyncUtxosRequest, SyncUtxosResponse, sync_utxos_response::Txo},
74
    validation::FinalHorizonStateValidation,
75
};
76

77
const LOG_TARGET: &str = "c::bn::state_machine_service::states::horizon_state_sync";
78

79
const MAX_LATENCY_INCREASES: usize = 5;
80
const HORIZON_SYNC_BATCH_SIZE: usize = 10_000;
81
const PROGRESS_REPORT_INTERVAL: u64 = 100;
82
const HORIZON_SYNC_TRANCHE_SIZE: u64 = 5_000;
83

84
pub struct HorizonStateSynchronization<'a, B> {
85
    config: BlockchainSyncConfig,
86
    db: AsyncBlockchainDb<B>,
87
    rules: BaseNodeConsensusManager,
88
    sync_peers: &'a mut Vec<SyncPeer>,
89
    horizon_sync_height: u64,
90
    prover: Arc<RangeProofService>,
91
    num_kernels: u64,
92
    num_outputs: u64,
93
    hooks: Hooks,
94
    connectivity: ConnectivityRequester,
95
    final_state_validator: Arc<dyn FinalHorizonStateValidation<B>>,
96
    max_latency: Duration,
97
    peer_ban_manager: PeerBanManager,
98
}
99

100
impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
101
    #[allow(clippy::too_many_arguments)]
102
    pub fn new(
×
103
        config: BlockchainSyncConfig,
×
104
        db: AsyncBlockchainDb<B>,
×
105
        connectivity: ConnectivityRequester,
×
106
        rules: BaseNodeConsensusManager,
×
107
        sync_peers: &'a mut Vec<SyncPeer>,
×
108
        horizon_sync_height: u64,
×
109
        prover: Arc<RangeProofService>,
×
110
        final_state_validator: Arc<dyn FinalHorizonStateValidation<B>>,
×
111
    ) -> Self {
×
112
        let peer_ban_manager = PeerBanManager::new(config.clone(), connectivity.clone());
×
113
        Self {
×
114
            max_latency: config.initial_max_sync_latency,
×
115
            config,
×
116
            db,
×
117
            rules,
×
118
            connectivity,
×
119
            sync_peers,
×
120
            horizon_sync_height,
×
121
            prover,
×
122
            num_kernels: 0,
×
123
            num_outputs: 0,
×
124
            hooks: Hooks::default(),
×
125
            final_state_validator,
×
126
            peer_ban_manager,
×
127
        }
×
128
    }
×
129

130
    pub fn on_starting<H>(&mut self, hook: H)
×
131
    where for<'r> H: FnOnce(&SyncPeer) + Send + Sync + 'static {
×
132
        self.hooks.add_on_starting_hook(hook);
×
133
    }
×
134

135
    pub fn on_progress<H>(&mut self, hook: H)
×
136
    where H: Fn(HorizonSyncInfo) + Send + Sync + 'static {
×
137
        self.hooks.add_on_progress_horizon_hook(hook);
×
138
    }
×
139

140
    pub async fn synchronize(&mut self) -> Result<(), HorizonSyncError> {
×
141
        if self.sync_peers.is_empty() {
×
142
            return Err(HorizonSyncError::NoSyncPeers);
×
143
        }
×
144

145
        debug!(
×
146
            target: LOG_TARGET,
×
147
            "Preparing database for horizon sync to height #{}", self.horizon_sync_height
148
        );
149
        let to_header = self.db().fetch_header(self.horizon_sync_height).await?.ok_or_else(|| {
×
150
            ChainStorageError::ValueNotFound {
×
151
                entity: "Header",
×
152
                field: "height",
×
153
                value: self.horizon_sync_height.to_string(),
×
154
            }
×
155
        })?;
×
156

157
        // Hold `Arc<NodeId>` sync-list handles for each candidate peer. While these guards are
158
        // alive the connectivity manager marks the peers as "in use by sync", which prevents
159
        // opportunistic disconnects (e.g. DhtConnectivity random-pool pruning). The guards are
160
        // dropped automatically when this function returns.
161
        let mut _sync_guards: Vec<Arc<NodeId>> = Vec::with_capacity(self.sync_peers.len());
×
162
        for peer in self.sync_peers.iter() {
×
163
            match self.connectivity.add_peer_to_sync_list(peer.node_id().clone()).await {
×
164
                Ok(handle) => _sync_guards.push(handle),
×
165
                Err(e) => debug!(
×
166
                    target: LOG_TARGET,
×
167
                    "Failed to register sync peer {} on sync list: {e}", peer.node_id()
×
168
                ),
169
            }
170
        }
171

172
        self.synchronize_inner(&to_header).await
×
173
    }
×
174

175
    async fn synchronize_inner(&mut self, to_header: &BlockHeader) -> Result<(), HorizonSyncError> {
×
176
        let mut latency_increases_counter = 0;
×
177
        loop {
178
            match self.sync(to_header).await {
×
179
                Ok(()) => return Ok(()),
×
180
                Err(err @ HorizonSyncError::AllSyncPeersExceedLatency) => {
×
181
                    // If we don't have many sync peers to select from, return the listening state and see if we can get
182
                    // some more.
183
                    warn!(
×
184
                        target: LOG_TARGET,
×
185
                        "Slow sync peers detected: {}",
186
                        self.sync_peers
×
187
                            .iter()
×
188
                            .map(|p| format!("{} ({:.2?})", p.node_id(), p.latency().unwrap_or_default()))
×
189
                            .collect::<Vec<_>>()
×
190
                            .join(", ")
×
191
                    );
192
                    if self.sync_peers.len() < 2 {
×
193
                        return Err(err);
×
194
                    }
×
195
                    self.max_latency += self.config.max_latency_increase;
×
196
                    latency_increases_counter += 1;
×
197
                    if latency_increases_counter > MAX_LATENCY_INCREASES {
×
198
                        return Err(err);
×
199
                    }
×
200
                },
201
                Err(err) => return Err(err),
×
202
            }
203
        }
204
    }
×
205

206
    async fn sync(&mut self, to_header: &BlockHeader) -> Result<(), HorizonSyncError> {
×
207
        let sync_peer_node_ids = self.sync_peers.iter().map(|p| p.node_id()).cloned().collect::<Vec<_>>();
×
208
        info!(
×
209
            target: LOG_TARGET,
×
210
            "Attempting to sync horizon state ({} sync peers)",
211
            sync_peer_node_ids.len()
×
212
        );
213
        let mut latency_counter = 0usize;
×
214
        for node_id in sync_peer_node_ids {
×
215
            match self.connect_and_attempt_sync(&node_id, to_header).await {
×
216
                Ok(_) => return Ok(()),
×
217
                // Try another peer
218
                Err(err) => {
×
219
                    let ban_reason = HorizonSyncError::get_ban_reason(&err);
×
220

221
                    if let Some(reason) = ban_reason {
×
222
                        let duration = match reason.ban_duration {
×
223
                            BanPeriod::Short => self.config.short_ban_period,
×
224
                            BanPeriod::Long => self.config.ban_period,
×
225
                        };
226
                        warn!(target: LOG_TARGET, "{err}");
×
227
                        self.peer_ban_manager
×
228
                            .ban_peer_if_required(&node_id, reason.reason, duration)
×
229
                            .await;
×
230
                    }
×
231
                    if let HorizonSyncError::MaxLatencyExceeded { .. } = err {
×
232
                        latency_counter += 1;
×
233
                    } else {
×
234
                        self.remove_sync_peer(&node_id);
×
235
                    }
×
236
                },
237
            }
238
        }
239

240
        if self.sync_peers.is_empty() {
×
241
            Err(HorizonSyncError::NoMoreSyncPeers("Header sync failed".to_string()))
×
242
        } else if latency_counter >= self.sync_peers.len() {
×
243
            Err(HorizonSyncError::AllSyncPeersExceedLatency)
×
244
        } else {
245
            Err(HorizonSyncError::FailedSyncAllPeers)
×
246
        }
247
    }
×
248

249
    async fn connect_and_attempt_sync(
×
250
        &mut self,
×
251
        node_id: &NodeId,
×
252
        to_header: &BlockHeader,
×
253
    ) -> Result<(), HorizonSyncError> {
×
254
        // Connect
255
        let (mut client, sync_peer) = self.connect_sync_peer(node_id).await?;
×
256

257
        // Perform horizon sync
258
        debug!(target: LOG_TARGET, "Check if pruning is needed");
×
259
        self.prune_if_needed().await?;
×
260
        self.sync_kernels_and_outputs(sync_peer.clone(), &mut client, to_header)
×
261
            .await?;
×
262

263
        // Validate and finalize horizon sync
264
        self.finalize_horizon_sync(&sync_peer).await?;
×
265

266
        Ok(())
×
267
    }
×
268

269
    async fn connect_sync_peer(
×
270
        &mut self,
×
271
        node_id: &NodeId,
×
272
    ) -> Result<(BaseNodeSyncRpcClient, SyncPeer), HorizonSyncError> {
×
273
        let peer_index = self
×
274
            .get_sync_peer_index(node_id)
×
275
            .ok_or(HorizonSyncError::PeerNotFound)?;
×
276
        let sync_peer = self.sync_peers.get(peer_index).expect("Already checked");
×
277
        self.hooks.call_on_starting_hook(sync_peer);
×
278

279
        let mut conn = self.dial_sync_peer(node_id).await?;
×
280
        debug!(
×
281
            target: LOG_TARGET,
×
282
            "Attempting to synchronize horizon state with `{node_id}`"
283
        );
284
        // Defensive: the connection may have been torn down by another subsystem between
285
        // dial returning and this point (e.g. DhtConnectivity pruning).
286
        if !conn.is_connected() {
×
287
            warn!(
×
288
                target: LOG_TARGET,
×
289
                "Sync peer `{node_id}` was disconnected before RPC negotiation could begin"
290
            );
291
            return Err(HorizonSyncError::RpcError(RpcError::ClientClosed));
×
292
        }
×
293

294
        let config = RpcClient::builder()
×
295
            .with_deadline(self.config.rpc_deadline)
×
296
            .with_deadline_grace_period(Duration::from_secs(5));
×
297

298
        // Bound RPC negotiation so a stuck negotiation cannot wedge the sync loop.
299
        let mut client = tokio::time::timeout(
×
300
            self.config.rpc_deadline,
×
301
            conn.connect_rpc_using_builder::<rpc::BaseNodeSyncRpcClient>(config),
×
302
        )
×
303
        .await
×
304
        .map_err(|_| HorizonSyncError::RpcError(RpcError::ReplyTimeout))??;
×
305

306
        let latency = client
×
307
            .get_last_request_latency()
×
308
            .expect("unreachable panic: last request latency must be set after connect");
×
309
        self.sync_peers
×
310
            .get_mut(peer_index)
×
311
            .expect("Already checked")
×
312
            .set_latency(latency);
×
313
        if latency > self.max_latency {
×
314
            return Err(HorizonSyncError::MaxLatencyExceeded {
×
315
                peer: conn.peer_node_id().clone(),
×
316
                latency,
×
317
                max_latency: self.max_latency,
×
318
            });
×
319
        }
×
320
        debug!(target: LOG_TARGET, "Sync peer latency is {latency:.2?}");
×
321

322
        Ok((
×
323
            client,
×
324
            self.sync_peers.get(peer_index).expect("Already checked").clone(),
×
325
        ))
×
326
    }
×
327

328
    async fn dial_sync_peer(&self, node_id: &NodeId) -> Result<PeerConnection, HorizonSyncError> {
×
329
        let timer = Instant::now();
×
330
        debug!(target: LOG_TARGET, "Dialing {node_id} sync peer");
×
331
        let conn = self.connectivity.dial_peer(node_id.clone()).await?;
×
332
        info!(
×
333
            target: LOG_TARGET,
×
334
            "Successfully dialed sync peer {} in {:.2?}",
335
            node_id,
336
            timer.elapsed()
×
337
        );
338
        Ok(conn)
×
339
    }
×
340

341
    async fn sync_kernels_and_outputs(
×
342
        &mut self,
×
343
        sync_peer: SyncPeer,
×
344
        client: &mut rpc::BaseNodeSyncRpcClient,
×
345
        to_header: &BlockHeader,
×
346
    ) -> Result<(), HorizonSyncError> {
×
347
        // Note: We do not need to rewind kernels if the sync fails due to it being validated when inserted into
348
        //       the database. Furthermore, these kernels will also be successfully removed when we need to rewind
349
        //       the blockchain for whatever reason.
350
        debug!(target: LOG_TARGET, "Synchronizing kernels");
×
351
        self.synchronize_kernels(sync_peer.clone(), client, to_header).await?;
×
352
        debug!(target: LOG_TARGET, "Synchronizing outputs");
×
353
        // let cloned_backup_smt = self.db.inner().smt_read_access()?.clone();
354
        match self.synchronize_outputs(sync_peer, client, to_header).await {
×
355
            Ok(_) => Ok(()),
×
356
            Err(err) => {
×
357
                // We need to clean up the outputs
358
                let _ = self.clean_up_failed_output_sync(to_header).await;
×
359
                // let mut smt = self.db.inner().smt_write_access()?;
360
                // *smt = cloned_backup_smt;
361
                Err(err)
×
362
            },
363
        }
364
    }
×
365

366
    /// Cleanup stops at the last committed checkpoint so that previously-completed tranches are not disturbed.
367
    async fn clean_up_failed_output_sync(&mut self, to_header: &BlockHeader) {
×
368
        // Determine where to stop cleaning. If a tranche checkpoint exists, stop at the checkpoint block
369
        // Otherwise fall back to the current chain tip.
370
        let stop_hash = match self.db.fetch_horizon_sync_output_checkpoint().await {
×
371
            Ok(Some(cp)) => match self.db.fetch_header(cp.checkpoint_height).await {
×
372
                Ok(Some(header)) => Some(header.hash()),
×
373
                _ => None,
×
374
            },
375
            _ => None,
×
376
        };
377
        let stop_hash = match stop_hash {
×
378
            Some(h) => h,
×
379
            None => match self.db.fetch_header(0).await {
×
380
                Ok(Some(header)) => header.hash(),
×
381
                _ => return,
×
382
            },
383
        };
384

385
        let db = self.db().clone();
×
386
        let mut txn = db.write_transaction();
×
387
        let mut current_header = to_header.clone();
×
388
        loop {
389
            if let Ok(outputs) = self.db.fetch_outputs_in_block(current_header.hash()).await {
×
390
                for (count, output) in (1..=outputs.len()).zip(outputs.iter()) {
×
391
                    txn.prune_output_from_all_dbs(
×
392
                        output.hash(),
×
393
                        output.commitment.clone(),
×
394
                        output.features.output_type,
×
395
                    );
396
                    if (count % 100 == 0 || count == outputs.len()) &&
×
397
                        let Err(e) = txn.commit().await
×
398
                    {
399
                        warn!(
×
400
                            target: LOG_TARGET,
×
401
                            "Clean up failed sync - commit prune outputs for header '{}': {}",
402
                            current_header.hash(), e
×
403
                        );
404
                    }
×
405
                }
406
            }
×
407

408
            if let Ok(header) = db.fetch_header_by_block_hash(current_header.prev_hash).await {
×
409
                if let Some(previous_header) = header {
×
410
                    current_header = previous_header;
×
411
                } else {
×
412
                    warn!(target: LOG_TARGET, "Could not clean up failed output sync, previous_header link missing from db");
×
413
                    break;
×
414
                }
415
            } else {
416
                warn!(
×
417
                    target: LOG_TARGET,
×
418
                    "Could not clean up failed output sync, header '{}' not in db",
419
                    current_header.prev_hash.to_hex()
×
420
                );
421
                break;
×
422
            }
423
            if current_header.hash() == stop_hash {
×
424
                debug!(target: LOG_TARGET, "Reached stop point while cleaning up failed output sync");
×
425
                break;
×
426
            }
×
427
        }
428

429
        if let Err(e) = txn.commit().await {
×
430
            warn!(
×
431
                target: LOG_TARGET,
×
432
                "Clean up failed output sync - final commit failed: {}",
433
                e
434
            );
435
        }
×
436
    }
×
437

438
    /// Removes any outputs stored in the given block height range from the database.
439
    /// On a fresh start all `fetch_outputs_in_block` calls return empty, so this is a no-op.
440
    async fn clean_up_height_range(&mut self, start_height: u64, end_height: u64) -> Result<(), HorizonSyncError> {
×
441
        let db = self.db().clone();
×
442
        let mut txn = db.write_transaction();
×
443
        let mut count: u64 = 0;
×
444
        for height in start_height..=end_height {
×
445
            let header = db
×
446
                .fetch_header(height)
×
447
                .await?
×
448
                .ok_or_else(|| ChainStorageError::ValueNotFound {
×
449
                    entity: "Header",
450
                    field: "height",
451
                    value: height.to_string(),
×
452
                })?;
×
453
            let outputs = db.fetch_outputs_in_block(header.hash()).await?;
×
454
            for output in outputs {
×
455
                txn.prune_output_from_all_dbs(output.hash(), output.commitment.clone(), output.features.output_type);
×
456
                count += 1;
×
457
                if count.is_multiple_of(PROGRESS_REPORT_INTERVAL) {
×
458
                    txn.commit().await?;
×
459
                    txn = db.write_transaction();
×
460
                }
×
461
            }
462
        }
463
        txn.commit().await?;
×
464
        if count > 0 {
×
465
            debug!(
×
466
                target: LOG_TARGET,
×
467
                "Cleaned up {} partial output(s) from height range {}-{}", count, start_height, end_height
468
            );
469
        }
×
470
        Ok(())
×
471
    }
×
472

473
    async fn prune_if_needed(&mut self) -> Result<(), HorizonSyncError> {
×
474
        let local_metadata = self.db.get_chain_metadata().await?;
×
475
        let new_prune_height = cmp::min(local_metadata.best_block_height(), self.horizon_sync_height);
×
476
        if local_metadata.pruned_height() < new_prune_height {
×
477
            debug!(target: LOG_TARGET, "Pruning block chain to height {new_prune_height}");
×
478
            self.db.prune_to_height(new_prune_height).await?;
×
479
        }
×
480

481
        Ok(())
×
482
    }
×
483

484
    #[allow(clippy::too_many_lines)]
485
    async fn synchronize_kernels(
×
486
        &mut self,
×
487
        mut sync_peer: SyncPeer,
×
488
        client: &mut rpc::BaseNodeSyncRpcClient,
×
489
        to_header: &BlockHeader,
×
490
    ) -> Result<(), HorizonSyncError> {
×
491
        info!(target: LOG_TARGET, "Starting kernel sync from peer {sync_peer}");
×
492
        let local_num_kernels = self.db().fetch_mmr_size(MmrTree::Kernel).await?;
×
493

494
        let remote_num_kernels = to_header.kernel_mmr_size;
×
495
        self.num_kernels = remote_num_kernels;
×
496

497
        if local_num_kernels >= remote_num_kernels {
×
498
            debug!(target: LOG_TARGET, "Local kernel set already synchronized");
×
499
            return Ok(());
×
500
        }
×
501

502
        let info = HorizonSyncInfo::new(vec![sync_peer.node_id().clone()], HorizonSyncStatus::Kernels {
×
503
            current: local_num_kernels,
×
504
            total: remote_num_kernels,
×
505
            sync_peer: sync_peer.clone(),
×
506
        });
×
507
        self.hooks.call_on_progress_horizon_hooks(info);
×
508

509
        debug!(
×
510
            target: LOG_TARGET,
×
511
            "Requesting kernels from {} to {} ({} remaining)",
512
            local_num_kernels,
513
            remote_num_kernels,
514
            remote_num_kernels.saturating_sub(local_num_kernels),
×
515
        );
516

517
        let latency = client.get_last_request_latency();
×
518
        debug!(
×
519
            target: LOG_TARGET,
×
520
            "Initiating kernel sync with peer `{}` (latency = {}ms)",
521
            sync_peer.node_id(),
×
522
            latency.unwrap_or_default().as_millis()
×
523
        );
524

525
        let mut current_header = self.db().fetch_header_containing_kernel_mmr(local_num_kernels).await?;
×
526
        let req = SyncKernelsRequest {
×
527
            start: local_num_kernels,
×
528
            end_header_hash: to_header.hash().to_vec(),
×
529
        };
×
530
        let mut kernel_stream = client.sync_kernels(req).await?;
×
531

532
        debug!(
×
533
            target: LOG_TARGET,
×
534
            "Found header for kernels at mmr pos: {} height: {}",
535
            local_num_kernels,
536
            current_header.height()
×
537
        );
538
        let mut kernel_hashes = vec![];
×
539
        let db = self.db().clone();
×
540
        let mut txn = db.write_transaction();
×
541
        let mut mmr_position = local_num_kernels;
×
542
        let end = remote_num_kernels;
×
543
        let mut last_sync_timer = Instant::now();
×
544
        let mut avg_latency = RollingAverageTime::new(20);
×
545
        while let Some(kernel) = kernel_stream.next().await {
×
546
            let latency = last_sync_timer.elapsed();
×
547
            avg_latency.add_sample(latency);
×
548
            let kernel: TransactionKernel = kernel?.try_into().map_err(HorizonSyncError::ConversionError)?;
×
549
            kernel.verify_signature()?;
×
550

551
            kernel_hashes.push(kernel.hash());
×
552

553
            if mmr_position > end {
×
554
                return Err(HorizonSyncError::IncorrectResponse(
×
555
                    "Peer sent too many kernels".to_string(),
×
556
                ));
×
557
            }
×
558

559
            txn.insert_kernel_via_horizon_sync(kernel, *current_header.hash(), mmr_position);
×
560
            if mmr_position == current_header.header().kernel_mmr_size.saturating_sub(1) {
×
561
                let num_kernels = kernel_hashes.len();
×
562
                debug!(
×
563
                    target: LOG_TARGET,
×
564
                    "Header #{} ({} kernels, latency: {:.2?})",
565
                    current_header.height(),
×
566
                    num_kernels,
567
                    latency
568
                );
569
                // Validate root
570
                let block_data = db
×
571
                    .fetch_block_accumulated_data(current_header.header().prev_hash)
×
572
                    .await?;
×
573
                let kernel_pruned_set = block_data.dissolve();
×
574
                let mut kernel_mmr = PrunedKernelMmr::new(kernel_pruned_set);
×
575

576
                for hash in kernel_hashes.drain(..) {
×
577
                    kernel_mmr.push(hash.to_vec())?;
×
578
                }
579

580
                let mmr_root = kernel_mmr.get_merkle_root()?;
×
581
                if mmr_root.as_slice() != current_header.header().kernel_mr.as_slice() {
×
582
                    return Err(HorizonSyncError::InvalidMrRoot {
×
583
                        mr_tree: MmrTree::Kernel.to_string(),
×
584
                        at_height: current_header.height(),
×
585
                        expected_hex: current_header.header().kernel_mr.to_hex(),
×
586
                        actual_hex: mmr_root.to_hex(),
×
587
                    });
×
588
                }
×
589

590
                let kernel_hash_set = kernel_mmr.get_pruned_hash_set()?;
×
591
                debug!(
×
592
                    target: LOG_TARGET,
×
593
                    "Updating block data at height {}",
594
                    current_header.height()
×
595
                );
596
                txn.update_block_accumulated_data_via_horizon_sync(
×
597
                    *current_header.hash(),
×
598
                    UpdateBlockAccumulatedData {
×
599
                        kernel_hash_set: Some(kernel_hash_set),
×
600
                        ..Default::default()
×
601
                    },
×
602
                );
603

604
                txn.commit().await?;
×
605
                debug!(
×
606
                    target: LOG_TARGET,
×
607
                    "Committed {} kernel(s), ({}/{}) {} remaining",
608
                    num_kernels,
609
                    mmr_position + 1,
×
610
                    end,
611
                    end.saturating_sub(mmr_position + 1)
×
612
                );
613
                if mmr_position < end.saturating_sub(1) {
×
614
                    current_header = db.fetch_chain_header(current_header.height() + 1).await?;
×
615
                }
×
616
            }
×
617
            mmr_position += 1;
×
618

619
            sync_peer.set_latency(latency);
×
620
            sync_peer.add_sample(last_sync_timer.elapsed());
×
621
            if mmr_position % 100 == 0 || mmr_position == self.num_kernels {
×
622
                let info = HorizonSyncInfo::new(vec![sync_peer.node_id().clone()], HorizonSyncStatus::Kernels {
×
623
                    current: mmr_position,
×
624
                    total: self.num_kernels,
×
625
                    sync_peer: sync_peer.clone(),
×
626
                });
×
627
                self.hooks.call_on_progress_horizon_hooks(info);
×
628
            }
×
629

630
            self.check_latency(sync_peer.node_id(), &avg_latency)?;
×
631

632
            last_sync_timer = Instant::now();
×
633
        }
634

635
        if mmr_position != end {
×
636
            return Err(HorizonSyncError::IncorrectResponse(
×
637
                "Sync node did not send all kernels requested".to_string(),
×
638
            ));
×
639
        }
×
640
        Ok(())
×
641
    }
×
642

643
    fn check_latency(&self, peer: &NodeId, avg_latency: &RollingAverageTime) -> Result<(), HorizonSyncError> {
×
644
        if let Some(avg_latency) = avg_latency.calculate_average_with_min_samples(5) &&
×
645
            avg_latency > self.max_latency
×
646
        {
647
            return Err(HorizonSyncError::MaxLatencyExceeded {
×
648
                peer: peer.clone(),
×
649
                latency: avg_latency,
×
650
                max_latency: self.max_latency,
×
651
            });
×
652
        }
×
653

654
        Ok(())
×
655
    }
×
656

657
    // Synchronize outputs in independently-verifiable tranches
658
    #[allow(clippy::too_many_lines)]
659
    async fn synchronize_outputs(
×
660
        &mut self,
×
661
        mut sync_peer: SyncPeer,
×
662
        client: &mut rpc::BaseNodeSyncRpcClient,
×
663
        to_header: &BlockHeader,
×
664
    ) -> Result<(), HorizonSyncError> {
×
665
        info!(target: LOG_TARGET, "Starting output sync from peer {sync_peer}");
×
666
        let db = self.db().clone();
×
667

668
        let stored_checkpoint = db.fetch_horizon_sync_output_checkpoint().await?;
×
669

670
        let checkpoint_height = match stored_checkpoint {
×
671
            Some(ref cp) if cp.sync_target_height == to_header.height && cp.sync_target_hash == to_header.hash() => {
×
672
                match db.fetch_header(cp.checkpoint_height).await? {
×
673
                    Some(header) if header.hash() == cp.checkpoint_hash => {
×
674
                        info!(
×
675
                            target: LOG_TARGET,
×
676
                            "Resuming output sync from checkpoint at height {}, target unchanged",
677
                            cp.checkpoint_height
678
                        );
679
                        Some(cp.checkpoint_height)
×
680
                    },
681
                    _ => {
682
                        warn!(
×
683
                            target: LOG_TARGET,
×
684
                            "Horizon sync checkpoint at height {} is no longer on the canonical chain (reorg \
685
                             detected). Discarding checkpoint and restarting output sync from scratch.",
686
                            cp.checkpoint_height
687
                        );
688
                        db.write_transaction()
×
689
                            .clear_horizon_sync_output_checkpoint()
×
690
                            .commit()
×
691
                            .await?;
×
692
                        None
×
693
                    },
694
                }
695
            },
696
            Some(ref cp) => {
×
697
                warn!(
×
698
                    target: LOG_TARGET,
×
699
                    "Horizon sync target changed from height {} to {}. Discarding checkpoint and cleaning up \
700
                     partial outputs.",
701
                    cp.sync_target_height,
702
                    to_header.height
703
                );
704
                db.write_transaction()
×
705
                    .clear_horizon_sync_output_checkpoint()
×
706
                    .commit()
×
707
                    .await?;
×
708
                if let Ok(Some(cleanup_header)) = db.fetch_header(cp.checkpoint_height).await {
×
709
                    self.clean_up_failed_output_sync(&cleanup_header).await;
×
710
                }
×
711
                None
×
712
            },
713
            None => None,
×
714
        };
715
        let (sync_start_height, mut jmt_version) = match checkpoint_height {
×
716
            Some(h) => {
×
717
                // Only the in-progress (first resumption) tranche may have partial output data.
718
                let first_tranche_end = cmp::min(
×
719
                    (h + 1).saturating_add(HORIZON_SYNC_TRANCHE_SIZE).saturating_sub(1),
×
720
                    to_header.height,
×
721
                );
722
                self.clean_up_height_range(h + 1, first_tranche_end).await?;
×
723
                (h + 1, h)
×
724
            },
725
            None => {
726
                self.clean_up_height_range(0, to_header.height).await?;
×
727
                (0, 0)
×
728
            },
729
        };
730

731
        if sync_start_height > to_header.height {
×
732
            info!(
×
733
                target: LOG_TARGET,
×
734
                "Output sync already complete (sync_start_height {} > horizon height {})",
735
                sync_start_height,
736
                to_header.height
737
            );
738
            return Ok(());
×
739
        }
×
740

741
        self.num_outputs = to_header.output_smt_size;
×
742

743
        let info = HorizonSyncInfo::new(vec![sync_peer.node_id().clone()], HorizonSyncStatus::Outputs {
×
744
            current: 0,
×
745
            total: self.num_outputs,
×
746
            sync_peer: sync_peer.clone(),
×
747
        });
×
748
        self.hooks.call_on_progress_horizon_hooks(info);
×
749

750
        let latency = client.get_last_request_latency();
×
751
        debug!(
×
752
            target: LOG_TARGET,
×
753
            "Initiating output sync with peer `{}`, requesting ~{} outputs from height {} to height {} \
754
            last_chain_header height `{}` (latency = {}ms)",
755
            sync_peer.node_id(),
×
756
            self.num_outputs,
757
            sync_start_height,
758
            to_header.height,
759
            db.fetch_last_chain_header().await?.height(),
×
760
            latency.unwrap_or_default().as_millis(),
×
761
        );
762

763
        let timer = Instant::now();
×
764
        let mut total_utxo_counter = 0u64;
×
765
        let mut total_stxo_counter = 0u64;
×
766
        let mut tranche_start_height = sync_start_height;
×
767

768
        // Process the full block range in tranches
769
        while tranche_start_height <= to_header.height {
×
770
            let tranche_end_height = cmp::min(
×
771
                tranche_start_height
×
772
                    .saturating_add(HORIZON_SYNC_TRANCHE_SIZE)
×
773
                    .saturating_sub(1),
×
774
                to_header.height,
×
775
            );
776

777
            let tranche_start_header =
×
778
                db.fetch_header(tranche_start_height)
×
779
                    .await?
×
780
                    .ok_or_else(|| ChainStorageError::ValueNotFound {
×
781
                        entity: "Header",
782
                        field: "height",
783
                        value: tranche_start_height.to_string(),
×
784
                    })?;
×
785
            let tranche_end_header = if tranche_end_height == to_header.height {
×
786
                to_header.clone()
×
787
            } else {
788
                db.fetch_header(tranche_end_height)
×
789
                    .await?
×
790
                    .ok_or_else(|| ChainStorageError::ValueNotFound {
×
791
                        entity: "Header",
792
                        field: "height",
793
                        value: tranche_end_height.to_string(),
×
794
                    })?
×
795
            };
796

797
            debug!(
×
798
                target: LOG_TARGET,
×
799
                "Syncing output tranche heights {}-{} ({} blocks) from peer {}",
800
                tranche_start_height,
801
                tranche_end_height,
802
                tranche_end_height - tranche_start_height + 1,
×
803
                sync_peer.node_id(),
×
804
            );
805

806
            let req = SyncUtxosRequest {
×
807
                start_header_hash: tranche_start_header.hash().to_vec(),
×
808
                end_header_hash: tranche_end_header.hash().to_vec(),
×
809
            };
×
810
            let mut output_stream = tokio::time::timeout(self.config.rpc_deadline, client.sync_utxos(req))
×
811
                .await
×
812
                .map_err(|_| {
×
813
                    HorizonSyncError::RpcStatus(RpcStatus::general(&format!(
×
814
                        "Timed out waiting for sync_utxos stream from peer {}",
×
815
                        sync_peer.node_id()
×
816
                    )))
×
817
                })??;
×
818

819
            let mut txn = db.write_transaction();
×
820
            let mut utxo_counter = 0u64;
×
821
            let mut stxo_counter = 0u64;
×
822
            let mut items_processed = 0u64;
×
823
            let mut last_sync_timer = Instant::now();
×
824
            let mut avg_latency = RollingAverageTime::new(20);
×
825

826
            // Accumulate SMT updates for the current tranche only. These are not applied until the full tranche
827
            // stream has been received and the SMT root verified.
828
            let mut state_tree_updates = BTreeMap::<FixedHash, Option<FixedHash>>::new();
×
829
            let mut inputs_to_delete = Vec::new();
×
830
            let mut batch_op_counter = 0;
×
831
            let mut last_mined_header: Option<FixedHash> = None;
×
832

833
            while let Some(response) = output_stream.next().await {
×
834
                let latency = last_sync_timer.elapsed();
×
835
                avg_latency.add_sample(latency);
×
836
                let res: SyncUtxosResponse = response?;
×
837

838
                let output_header_hash = FixedHash::try_from(res.mined_header).map_err(|e| {
×
839
                    HorizonSyncError::IncorrectResponse(format!("Peer sent invalid mined header: {}", e))
×
840
                })?;
×
841
                last_mined_header = Some(output_header_hash);
×
842
                let current_header = self
×
843
                    .db()
×
844
                    .fetch_header_by_block_hash(output_header_hash)
×
845
                    .await?
×
846
                    .ok_or_else(|| {
×
847
                        HorizonSyncError::IncorrectResponse("Peer sent mined header we do not know of".into())
×
848
                    })?;
×
849

850
                let proto_output = res.txo.ok_or_else(|| {
×
851
                    HorizonSyncError::IncorrectResponse("Peer sent no transaction output data".into())
×
852
                })?;
×
853
                match proto_output {
×
854
                    Txo::Output(output) => {
×
855
                        let output = TransactionOutput::try_from(output).map_err(HorizonSyncError::ConversionError)?;
×
856
                        if !output.is_burned() {
×
857
                            utxo_counter += 1;
×
858
                            let output_hash = output.hash();
×
859
                            debug!(
×
860
                                target: LOG_TARGET,
×
861
                                "UTXO `{}` received from sync peer ({} of {})",
862
                                output_hash,
863
                                total_utxo_counter + utxo_counter,
×
864
                                self.num_outputs,
865
                            );
866
                            let key_bytes: [u8; 32] = output.commitment.as_bytes().try_into().map_err(|e| {
×
867
                                HorizonSyncError::IncorrectResponse(format!("Peer sent malformed commitment: {}", e))
×
868
                            })?;
×
869
                            let key = FixedHash::from(key_bytes);
×
870

871
                            if state_tree_updates
×
872
                                .insert(key, Some(output.smt_hash(current_header.height)))
×
873
                                .is_some()
×
874
                            {
875
                                return Err(HorizonSyncError::IncorrectResponse(
×
876
                                    "Peer sent duplicate output commitment during horizon sync".into(),
×
877
                                ));
×
878
                            }
×
879

880
                            let constants = self.rules.consensus_constants(current_header.height).clone();
×
881
                            validate_output_version(&constants, &output)?;
×
882
                            validate_individual_output(&output, &constants)?;
×
883
                            batch_verify_range_proofs(&self.prover, &[&output])?;
×
884

885
                            txn.insert_output_via_horizon_sync(
×
886
                                output,
×
887
                                current_header.hash(),
×
888
                                current_header.height,
×
889
                                current_header.timestamp.as_u64(),
×
890
                            );
891
                            batch_op_counter += 1;
×
892
                        }
×
893
                    },
894
                    Txo::Commitment(commitment_bytes) => {
×
895
                        stxo_counter += 1;
×
896

897
                        let commitment = CompressedCommitment::from_canonical_bytes(commitment_bytes.as_slice())?;
×
898
                        match self
×
899
                            .db()
×
900
                            .fetch_unspent_output_hash_by_commitment(commitment.clone())
×
901
                            .await?
×
902
                        {
903
                            Some(output_hash) => {
×
904
                                debug!(
×
905
                                    target: LOG_TARGET,
×
906
                                    "STXO hash `{output_hash}` received from sync peer ({stxo_counter})",
907
                                );
908
                                let key_bytes: [u8; 32] = commitment_bytes.as_slice().try_into().map_err(|e| {
×
909
                                    HorizonSyncError::IncorrectResponse(format!(
×
910
                                        "Peer sent malformed commitment: {}",
×
911
                                        e
×
912
                                    ))
×
913
                                })?;
×
914
                                let key = FixedHash::from(key_bytes);
×
915

916
                                if matches!(state_tree_updates.get(&key), Some(None)) {
×
917
                                    return Err(HorizonSyncError::ChainStorageError(
×
918
                                        ChainStorageError::UnspendableInput,
×
919
                                    ));
×
920
                                }
×
921
                                state_tree_updates.insert(key, None);
×
922

923
                                let output_info = self.db().fetch_output(output_hash).await?.ok_or_else(|| {
×
924
                                    HorizonSyncError::IncorrectResponse(
×
925
                                        "Could not fetch full output for spent commitment".into(),
×
926
                                    )
×
927
                                })?;
×
928
                                inputs_to_delete.push(output_info.output);
×
929
                            },
930
                            None => {
931
                                return Err(HorizonSyncError::IncorrectResponse(
×
932
                                    "Peer sent unknown commitment hash".into(),
×
933
                                ));
×
934
                            },
935
                        }
936
                    },
937
                }
938

939
                items_processed += 1;
×
940
                if items_processed.is_multiple_of(PROGRESS_REPORT_INTERVAL) {
×
941
                    let utxo_progress = total_utxo_counter + utxo_counter;
×
942
                    let info = HorizonSyncInfo::new(vec![sync_peer.node_id().clone()], HorizonSyncStatus::Outputs {
×
943
                        current: utxo_progress,
×
944
                        total: self.num_outputs,
×
945
                        sync_peer: sync_peer.clone(),
×
946
                    });
×
947
                    self.hooks.call_on_progress_horizon_hooks(info);
×
948
                }
×
949

950
                if batch_op_counter >= HORIZON_SYNC_BATCH_SIZE {
×
951
                    txn.commit().await?;
×
952
                    txn = db.write_transaction();
×
953
                    batch_op_counter = 0;
×
954
                }
×
955

956
                sync_peer.set_latency(latency);
×
957
                sync_peer.add_sample(last_sync_timer.elapsed());
×
958
                last_sync_timer = Instant::now();
×
959
            }
960

961
            // Verify the stream completed the full tranche before committing.
962
            if let Some(last_hash) = last_mined_header &&
×
963
                last_hash != tranche_end_header.hash()
×
964
            {
965
                return Err(HorizonSyncError::IncorrectResponse(format!(
×
966
                    "Sync peer did not complete output stream for tranche {}-{}. Last block received: {}, expected: {}",
×
967
                    tranche_start_height,
×
968
                    tranche_end_height,
×
969
                    last_hash.to_hex(),
×
970
                    tranche_end_header.hash().to_hex()
×
971
                )));
×
972
            }
×
973

974
            let tranche_updates = state_tree_updates
×
975
                .into_iter()
×
976
                .map(|(key, value)| HorizonStateTreeUpdate { key, value })
×
977
                .collect::<Vec<_>>();
×
978

979
            txn.apply_horizon_state_tree_updates(jmt_version, tranche_end_header.height, tranche_updates);
×
980
            for output in &inputs_to_delete {
×
981
                if let Some(sidechain_feature) = output.features.sidechain_feature.as_ref() &&
×
982
                    let Some(vn_reg) = sidechain_feature.validator_node_registration()
×
983
                {
×
984
                    txn.delete_validator_node(
×
985
                        sidechain_feature.sidechain_public_key().cloned(),
×
986
                        vn_reg.public_key().clone(),
×
987
                    );
×
988
                }
×
989
            }
990
            for output in inputs_to_delete {
×
991
                txn.prune_output_from_all_dbs(output.hash(), output.commitment.clone(), output.features.output_type);
×
992
            }
×
993
            // Only checkpoint intermediate tranches for network recovery.
994
            // We intentionally do not checkpoint the final tranche before verification.
995
            // If the final root verification fails, the ENTIRE state is considered poisoned.
996
            if tranche_end_height < to_header.height {
×
997
                txn.set_horizon_sync_output_checkpoint(HorizonSyncOutputCheckpoint {
×
998
                    checkpoint_height: tranche_end_height,
×
999
                    checkpoint_hash: tranche_end_header.hash(),
×
1000
                    sync_target_height: to_header.height,
×
1001
                    sync_target_hash: to_header.hash(),
×
1002
                });
×
1003
            }
×
1004
            txn.commit().await?;
×
1005
            jmt_version = tranche_end_height;
×
1006

1007
            debug!(
×
1008
                target: LOG_TARGET,
×
1009
                "Committed output tranche heights {}-{}: {} UTXOs and {} STXOs",
1010
                tranche_start_height,
1011
                tranche_end_height,
1012
                utxo_counter,
1013
                stxo_counter,
1014
            );
1015

1016
            total_utxo_counter += utxo_counter;
×
1017
            total_stxo_counter += stxo_counter;
×
1018
            tranche_start_height = tranche_end_height + 1;
×
1019
        }
1020

1021
        if let Err(e) = db
×
1022
            .verify_horizon_sync_output_root(to_header.height, to_header.output_mr)
×
1023
            .await
×
1024
        {
1025
            warn!(
×
1026
                target: LOG_TARGET,
×
1027
                "Final JMT root verification failed! The entire synced state is poisoned. Clearing checkpoint."
1028
            );
1029
            let _unused = db
×
1030
                .write_transaction()
×
1031
                .clear_horizon_sync_output_checkpoint()
×
1032
                .commit()
×
1033
                .await;
×
1034
            return Err(HorizonSyncError::ChainStorageError(e));
×
1035
        }
×
1036

1037
        // Mark output sync as complete
1038
        db.write_transaction()
×
1039
            .set_horizon_sync_output_checkpoint(HorizonSyncOutputCheckpoint {
×
1040
                checkpoint_height: to_header.height,
×
1041
                checkpoint_hash: to_header.hash(),
×
1042
                sync_target_height: to_header.height,
×
1043
                sync_target_hash: to_header.hash(),
×
1044
            })
×
1045
            .commit()
×
1046
            .await?;
×
1047

1048
        debug!(
×
1049
            target: LOG_TARGET,
×
1050
            "Finished syncing TXOs: {} unspent and {} spent downloaded in {:.2?}",
1051
            total_utxo_counter,
1052
            total_stxo_counter,
1053
            timer.elapsed()
×
1054
        );
1055
        Ok(())
×
1056
    }
×
1057

1058
    // Finalize the horizon state synchronization by setting the chain metadata to the local tip and committing
1059
    // the horizon state to the blockchain backend.
1060
    async fn finalize_horizon_sync(&mut self, sync_peer: &SyncPeer) -> Result<(), HorizonSyncError> {
×
1061
        debug!(target: LOG_TARGET, "Validating horizon state");
×
1062

1063
        self.hooks.call_on_progress_horizon_hooks(HorizonSyncInfo::new(
×
1064
            vec![sync_peer.node_id().clone()],
×
1065
            HorizonSyncStatus::Finalizing,
×
1066
        ));
1067

1068
        let header = self.db().fetch_chain_header(self.horizon_sync_height).await?;
×
1069
        let (calc_utxo_sum, calc_kernel_sum, calc_burned_sum) = self.calculate_commitment_sums(&header).await?;
×
1070

1071
        self.final_state_validator
×
1072
            .validate(
×
1073
                &*self.db().inner().db_read_access()?,
×
1074
                header.height(),
×
1075
                &calc_utxo_sum,
×
1076
                &calc_kernel_sum,
×
1077
                &calc_burned_sum,
×
1078
            )
1079
            .map_err(HorizonSyncError::FinalStateValidationFailed)?;
×
1080

1081
        let metadata = self.db().get_chain_metadata().await?;
×
1082
        info!(
×
1083
            target: LOG_TARGET,
×
1084
            "Horizon state validation succeeded! Committing horizon state."
1085
        );
1086
        self.db()
×
1087
            .write_transaction()
×
1088
            .set_best_block(
×
1089
                header.height(),
×
1090
                *header.hash(),
×
1091
                header.accumulated_data().total_accumulated_difficulty,
×
1092
                *metadata.best_block_hash(),
×
1093
                header.timestamp(),
×
1094
            )
×
1095
            .set_pruned_height(header.height())
×
1096
            .set_horizon_data(calc_kernel_sum, calc_utxo_sum)
×
1097
            .clear_horizon_sync_output_checkpoint()
×
1098
            .commit()
×
1099
            .await?;
×
1100

1101
        Ok(())
×
1102
    }
×
1103

1104
    /// (UTXO sum, Kernel sum)
1105
    async fn calculate_commitment_sums(
×
1106
        &mut self,
×
1107
        header: &ChainHeader,
×
1108
    ) -> Result<(CompressedCommitment, CompressedCommitment, CompressedCommitment), HorizonSyncError> {
×
1109
        let mut utxo_sum = HomomorphicCommitment::default();
×
1110
        let mut kernel_sum = HomomorphicCommitment::default();
×
1111
        let mut burned_sum = HomomorphicCommitment::default();
×
1112

1113
        let mut prev_kernel_mmr = 0;
×
1114

1115
        let height = header.height();
×
1116
        let db = self.db().inner().clone();
×
1117
        let header_hash = *header.hash();
×
1118
        task::spawn_blocking(move || {
×
1119
            for h in 0..=height {
×
1120
                let curr_header = db.fetch_chain_header(h)?;
×
1121
                trace!(
×
1122
                    target: LOG_TARGET,
×
1123
                    "Fetching utxos from db: height:{}",
1124
                    curr_header.height(),
×
1125
                );
1126
                let utxos = db.fetch_outputs_in_block_with_spend_state(*curr_header.hash(), Some(header_hash))?;
×
1127
                debug!(
×
1128
                    target: LOG_TARGET,
×
1129
                    "{} output(s) loaded for height {}",
1130
                    utxos.len(),
×
1131
                    curr_header.height()
×
1132
                );
1133
                trace!(
×
1134
                    target: LOG_TARGET,
×
1135
                    "Fetching kernels from db: height:{}, header.kernel_mmr:{}, prev_mmr:{}, end:{}",
1136
                    curr_header.height(),
×
1137
                    curr_header.header().kernel_mmr_size,
×
1138
                    prev_kernel_mmr,
1139
                    curr_header.header().kernel_mmr_size.saturating_sub(1)
×
1140
                );
1141

1142
                trace!(target: LOG_TARGET, "Number of utxos returned: {}", utxos.len());
×
1143
                for (u, spent) in utxos {
×
1144
                    if !spent {
×
1145
                        utxo_sum = &u.commitment.to_commitment()? + &utxo_sum;
×
1146
                    }
×
1147
                }
1148

1149
                let kernels = db.fetch_kernels_in_block(*curr_header.hash())?;
×
1150
                trace!(target: LOG_TARGET, "Number of kernels returned: {}", kernels.len());
×
1151
                for k in kernels {
×
1152
                    kernel_sum = &k.excess.to_commitment()? + &kernel_sum;
×
1153
                    if k.is_burned() {
×
1154
                        burned_sum = &(k.get_burn_commitment()?.to_commitment()?) + &burned_sum;
×
1155
                    }
×
1156
                }
1157
                prev_kernel_mmr = curr_header.header().kernel_mmr_size;
×
1158

1159
                if h % 1000 == 0 && height != 0 {
×
1160
                    debug!(
×
1161
                        target: LOG_TARGET,
×
1162
                        "Final Validation: {:.2}% complete. Height: {} sync",
1163
                        (h as f32 / height as f32) * 100.0,
×
1164
                        h,
1165
                    );
1166
                }
×
1167
            }
1168

1169
            Ok((
×
1170
                CompressedCommitment::from_commitment(utxo_sum),
×
1171
                CompressedCommitment::from_commitment(kernel_sum),
×
1172
                CompressedCommitment::from_commitment(burned_sum),
×
1173
            ))
×
1174
        })
×
1175
        .await?
×
1176
    }
×
1177

1178
    // Sync peers are also removed from the list of sync peers if the ban duration is longer than the short ban period.
1179
    fn remove_sync_peer(&mut self, node_id: &NodeId) {
×
1180
        if let Some(pos) = self.sync_peers.iter().position(|p| p.node_id() == node_id) {
×
1181
            self.sync_peers.remove(pos);
×
1182
        }
×
1183
    }
×
1184

1185
    // Helper function to get the index to the node_id inside of the vec of peers
1186
    fn get_sync_peer_index(&mut self, node_id: &NodeId) -> Option<usize> {
×
1187
        self.sync_peers.iter().position(|p| p.node_id() == node_id)
×
1188
    }
×
1189

1190
    #[inline]
1191
    fn db(&self) -> &AsyncBlockchainDb<B> {
×
1192
        &self.db
×
1193
    }
×
1194
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc