• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 23541521288

25 Mar 2026 12:41PM UTC coverage: 61.304% (-0.1%) from 61.403%
23541521288

push

github

web-flow
chore: fix most cucumber tests (#7710)

Description
---
flags broken tests

37 of 289 new or added lines in 26 files covered. (12.8%)

25 existing lines in 10 files now uncovered.

70599 of 115162 relevant lines covered (61.3%)

225644.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/base_layer/core/src/base_node/sync/horizon_state_sync/synchronizer.rs
1
//  Copyright 2022, The Tari Project
2
//
3
//  Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
//  following conditions are met:
5
//
6
//  1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
//  disclaimer.
8
//
9
//  2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
//  following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
//  3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
//  products derived from this software without specific prior written permission.
14
//
15
//  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
//  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
//  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
//  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
//  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
//  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
//  USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
use std::{
24
    cmp,
25
    collections::BTreeMap,
26
    convert::{TryFrom, TryInto},
27
    sync::Arc,
28
    time::{Duration, Instant},
29
};
30

31
use futures::StreamExt;
32
use log::*;
33
use tari_common_types::types::{CompressedCommitment, FixedHash, RangeProofService};
34
use tari_comms::{
35
    PeerConnection,
36
    connectivity::ConnectivityRequester,
37
    peer_manager::NodeId,
38
    protocol::rpc::{RpcClient, RpcStatus},
39
};
40
use tari_crypto::commitment::HomomorphicCommitment;
41
use tari_node_components::blocks::{BlockHeader, ChainHeader};
42
use tari_transaction_components::{
43
    BanPeriod,
44
    transaction_components::{TransactionKernel, TransactionOutput, transaction_output::batch_verify_range_proofs},
45
    validation::{aggregate_body::validate_individual_output, helpers::validate_output_version},
46
};
47
use tari_utilities::{ByteArray, hex::Hex};
48
use tokio::task;
49

50
use super::error::HorizonSyncError;
51
use crate::{
52
    PrunedKernelMmr,
53
    base_node::sync::{
54
        BlockchainSyncConfig,
55
        SyncPeer,
56
        ban::PeerBanManager,
57
        hooks::Hooks,
58
        horizon_state_sync::{HorizonSyncInfo, HorizonSyncStatus},
59
        rpc,
60
        rpc::BaseNodeSyncRpcClient,
61
    },
62
    blocks::UpdateBlockAccumulatedData,
63
    chain_storage::{
64
        BlockchainBackend,
65
        ChainStorageError,
66
        HorizonStateTreeUpdate,
67
        HorizonSyncOutputCheckpoint,
68
        MmrTree,
69
        async_db::AsyncBlockchainDb,
70
    },
71
    common::rolling_avg::RollingAverageTime,
72
    consensus::BaseNodeConsensusManager,
73
    proto::base_node::{SyncKernelsRequest, SyncUtxosRequest, SyncUtxosResponse, sync_utxos_response::Txo},
74
    validation::FinalHorizonStateValidation,
75
};
76

77
const LOG_TARGET: &str = "c::bn::state_machine_service::states::horizon_state_sync";
78

79
const MAX_LATENCY_INCREASES: usize = 5;
80
const HORIZON_SYNC_BATCH_SIZE: usize = 10_000;
81
const PROGRESS_REPORT_INTERVAL: u64 = 100;
82
const HORIZON_SYNC_TRANCHE_SIZE: u64 = 5_000;
83

84
pub struct HorizonStateSynchronization<'a, B> {
85
    config: BlockchainSyncConfig,
86
    db: AsyncBlockchainDb<B>,
87
    rules: BaseNodeConsensusManager,
88
    sync_peers: &'a mut Vec<SyncPeer>,
89
    horizon_sync_height: u64,
90
    prover: Arc<RangeProofService>,
91
    num_kernels: u64,
92
    num_outputs: u64,
93
    hooks: Hooks,
94
    connectivity: ConnectivityRequester,
95
    final_state_validator: Arc<dyn FinalHorizonStateValidation<B>>,
96
    max_latency: Duration,
97
    peer_ban_manager: PeerBanManager,
98
}
99

100
impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
101
    #[allow(clippy::too_many_arguments)]
102
    pub fn new(
×
103
        config: BlockchainSyncConfig,
×
104
        db: AsyncBlockchainDb<B>,
×
105
        connectivity: ConnectivityRequester,
×
106
        rules: BaseNodeConsensusManager,
×
107
        sync_peers: &'a mut Vec<SyncPeer>,
×
108
        horizon_sync_height: u64,
×
109
        prover: Arc<RangeProofService>,
×
110
        final_state_validator: Arc<dyn FinalHorizonStateValidation<B>>,
×
111
    ) -> Self {
×
112
        let peer_ban_manager = PeerBanManager::new(config.clone(), connectivity.clone());
×
113
        Self {
×
114
            max_latency: config.initial_max_sync_latency,
×
115
            config,
×
116
            db,
×
117
            rules,
×
118
            connectivity,
×
119
            sync_peers,
×
120
            horizon_sync_height,
×
121
            prover,
×
122
            num_kernels: 0,
×
123
            num_outputs: 0,
×
124
            hooks: Hooks::default(),
×
125
            final_state_validator,
×
126
            peer_ban_manager,
×
127
        }
×
128
    }
×
129

130
    pub fn on_starting<H>(&mut self, hook: H)
×
131
    where for<'r> H: FnOnce(&SyncPeer) + Send + Sync + 'static {
×
132
        self.hooks.add_on_starting_hook(hook);
×
133
    }
×
134

135
    pub fn on_progress<H>(&mut self, hook: H)
×
136
    where H: Fn(HorizonSyncInfo) + Send + Sync + 'static {
×
137
        self.hooks.add_on_progress_horizon_hook(hook);
×
138
    }
×
139

140
    pub async fn synchronize(&mut self) -> Result<(), HorizonSyncError> {
×
141
        if self.sync_peers.is_empty() {
×
142
            return Err(HorizonSyncError::NoSyncPeers);
×
143
        }
×
144

145
        debug!(
×
146
            target: LOG_TARGET,
×
147
            "Preparing database for horizon sync to height #{}", self.horizon_sync_height
148
        );
149
        let to_header = self.db().fetch_header(self.horizon_sync_height).await?.ok_or_else(|| {
×
150
            ChainStorageError::ValueNotFound {
×
151
                entity: "Header",
×
152
                field: "height",
×
153
                value: self.horizon_sync_height.to_string(),
×
154
            }
×
155
        })?;
×
156

157
        let mut latency_increases_counter = 0;
×
158
        loop {
159
            match self.sync(&to_header).await {
×
160
                Ok(()) => return Ok(()),
×
161
                Err(err @ HorizonSyncError::AllSyncPeersExceedLatency) => {
×
162
                    // If we don't have many sync peers to select from, return the listening state and see if we can get
163
                    // some more.
164
                    warn!(
×
165
                        target: LOG_TARGET,
×
166
                        "Slow sync peers detected: {}",
167
                        self.sync_peers
×
168
                            .iter()
×
169
                            .map(|p| format!("{} ({:.2?})", p.node_id(), p.latency().unwrap_or_default()))
×
170
                            .collect::<Vec<_>>()
×
171
                            .join(", ")
×
172
                    );
173
                    if self.sync_peers.len() < 2 {
×
174
                        return Err(err);
×
175
                    }
×
176
                    self.max_latency += self.config.max_latency_increase;
×
177
                    latency_increases_counter += 1;
×
178
                    if latency_increases_counter > MAX_LATENCY_INCREASES {
×
179
                        return Err(err);
×
180
                    }
×
181
                },
182
                Err(err) => return Err(err),
×
183
            }
184
        }
185
    }
×
186

187
    async fn sync(&mut self, to_header: &BlockHeader) -> Result<(), HorizonSyncError> {
×
188
        let sync_peer_node_ids = self.sync_peers.iter().map(|p| p.node_id()).cloned().collect::<Vec<_>>();
×
189
        info!(
×
190
            target: LOG_TARGET,
×
191
            "Attempting to sync horizon state ({} sync peers)",
192
            sync_peer_node_ids.len()
×
193
        );
194
        let mut latency_counter = 0usize;
×
195
        for node_id in sync_peer_node_ids {
×
196
            match self.connect_and_attempt_sync(&node_id, to_header).await {
×
197
                Ok(_) => return Ok(()),
×
198
                // Try another peer
199
                Err(err) => {
×
200
                    let ban_reason = HorizonSyncError::get_ban_reason(&err);
×
201

202
                    if let Some(reason) = ban_reason {
×
203
                        let duration = match reason.ban_duration {
×
204
                            BanPeriod::Short => self.config.short_ban_period,
×
205
                            BanPeriod::Long => self.config.ban_period,
×
206
                        };
207
                        warn!(target: LOG_TARGET, "{err}");
×
208
                        self.peer_ban_manager
×
209
                            .ban_peer_if_required(&node_id, reason.reason, duration)
×
210
                            .await;
×
211
                    }
×
212
                    if let HorizonSyncError::MaxLatencyExceeded { .. } = err {
×
213
                        latency_counter += 1;
×
214
                    } else {
×
215
                        self.remove_sync_peer(&node_id);
×
216
                    }
×
217
                },
218
            }
219
        }
220

221
        if self.sync_peers.is_empty() {
×
222
            Err(HorizonSyncError::NoMoreSyncPeers("Header sync failed".to_string()))
×
223
        } else if latency_counter >= self.sync_peers.len() {
×
224
            Err(HorizonSyncError::AllSyncPeersExceedLatency)
×
225
        } else {
226
            Err(HorizonSyncError::FailedSyncAllPeers)
×
227
        }
228
    }
×
229

230
    async fn connect_and_attempt_sync(
×
231
        &mut self,
×
232
        node_id: &NodeId,
×
233
        to_header: &BlockHeader,
×
234
    ) -> Result<(), HorizonSyncError> {
×
235
        // Connect
236
        let (mut client, sync_peer) = self.connect_sync_peer(node_id).await?;
×
237

238
        // Perform horizon sync
239
        debug!(target: LOG_TARGET, "Check if pruning is needed");
×
240
        self.prune_if_needed().await?;
×
241
        self.sync_kernels_and_outputs(sync_peer.clone(), &mut client, to_header)
×
242
            .await?;
×
243

244
        // Validate and finalize horizon sync
245
        self.finalize_horizon_sync(&sync_peer).await?;
×
246

247
        Ok(())
×
248
    }
×
249

250
    async fn connect_sync_peer(
×
251
        &mut self,
×
252
        node_id: &NodeId,
×
253
    ) -> Result<(BaseNodeSyncRpcClient, SyncPeer), HorizonSyncError> {
×
254
        let peer_index = self
×
255
            .get_sync_peer_index(node_id)
×
256
            .ok_or(HorizonSyncError::PeerNotFound)?;
×
257
        let sync_peer = self.sync_peers.get(peer_index).expect("Already checked");
×
258
        self.hooks.call_on_starting_hook(sync_peer);
×
259

260
        let mut conn = self.dial_sync_peer(node_id).await?;
×
261
        debug!(
×
262
            target: LOG_TARGET,
×
263
            "Attempting to synchronize horizon state with `{node_id}`"
264
        );
265

266
        let config = RpcClient::builder()
×
267
            .with_deadline(self.config.rpc_deadline)
×
268
            .with_deadline_grace_period(Duration::from_secs(5));
×
269

270
        let mut client = conn
×
271
            .connect_rpc_using_builder::<rpc::BaseNodeSyncRpcClient>(config)
×
272
            .await?;
×
273

274
        let latency = client
×
275
            .get_last_request_latency()
×
276
            .expect("unreachable panic: last request latency must be set after connect");
×
277
        self.sync_peers
×
278
            .get_mut(peer_index)
×
279
            .expect("Already checked")
×
280
            .set_latency(latency);
×
281
        if latency > self.max_latency {
×
282
            return Err(HorizonSyncError::MaxLatencyExceeded {
×
283
                peer: conn.peer_node_id().clone(),
×
284
                latency,
×
285
                max_latency: self.max_latency,
×
286
            });
×
287
        }
×
288
        debug!(target: LOG_TARGET, "Sync peer latency is {latency:.2?}");
×
289

290
        Ok((
×
291
            client,
×
292
            self.sync_peers.get(peer_index).expect("Already checked").clone(),
×
293
        ))
×
294
    }
×
295

296
    async fn dial_sync_peer(&self, node_id: &NodeId) -> Result<PeerConnection, HorizonSyncError> {
×
297
        let timer = Instant::now();
×
298
        debug!(target: LOG_TARGET, "Dialing {node_id} sync peer");
×
299
        let conn = self.connectivity.dial_peer(node_id.clone()).await?;
×
300
        info!(
×
301
            target: LOG_TARGET,
×
302
            "Successfully dialed sync peer {} in {:.2?}",
303
            node_id,
304
            timer.elapsed()
×
305
        );
306
        Ok(conn)
×
307
    }
×
308

309
    async fn sync_kernels_and_outputs(
×
310
        &mut self,
×
311
        sync_peer: SyncPeer,
×
312
        client: &mut rpc::BaseNodeSyncRpcClient,
×
313
        to_header: &BlockHeader,
×
314
    ) -> Result<(), HorizonSyncError> {
×
315
        // Note: We do not need to rewind kernels if the sync fails due to it being validated when inserted into
316
        //       the database. Furthermore, these kernels will also be successfully removed when we need to rewind
317
        //       the blockchain for whatever reason.
318
        debug!(target: LOG_TARGET, "Synchronizing kernels");
×
319
        self.synchronize_kernels(sync_peer.clone(), client, to_header).await?;
×
320
        debug!(target: LOG_TARGET, "Synchronizing outputs");
×
321
        // let cloned_backup_smt = self.db.inner().smt_read_access()?.clone();
322
        match self.synchronize_outputs(sync_peer, client, to_header).await {
×
323
            Ok(_) => Ok(()),
×
324
            Err(err) => {
×
325
                // We need to clean up the outputs
326
                let _ = self.clean_up_failed_output_sync(to_header).await;
×
327
                // let mut smt = self.db.inner().smt_write_access()?;
328
                // *smt = cloned_backup_smt;
329
                Err(err)
×
330
            },
331
        }
332
    }
×
333

334
    /// Cleanup stops at the last committed checkpoint so that previously-completed tranches are not disturbed.
335
    async fn clean_up_failed_output_sync(&mut self, to_header: &BlockHeader) {
×
336
        // Determine where to stop cleaning. If a tranche checkpoint exists, stop at the checkpoint block
337
        // Otherwise fall back to the current chain tip.
338
        let stop_hash = match self.db.fetch_horizon_sync_output_checkpoint().await {
×
339
            Ok(Some(cp)) => match self.db.fetch_header(cp.checkpoint_height).await {
×
340
                Ok(Some(header)) => Some(header.hash()),
×
341
                _ => None,
×
342
            },
343
            _ => None,
×
344
        };
345
        let stop_hash = match stop_hash {
×
346
            Some(h) => h,
×
347
            None => match self.db.fetch_header(0).await {
×
348
                Ok(Some(header)) => header.hash(),
×
349
                _ => return,
×
350
            },
351
        };
352

353
        let db = self.db().clone();
×
354
        let mut txn = db.write_transaction();
×
355
        let mut current_header = to_header.clone();
×
356
        loop {
357
            if let Ok(outputs) = self.db.fetch_outputs_in_block(current_header.hash()).await {
×
358
                for (count, output) in (1..=outputs.len()).zip(outputs.iter()) {
×
359
                    txn.prune_output_from_all_dbs(
×
360
                        output.hash(),
×
361
                        output.commitment.clone(),
×
362
                        output.features.output_type,
×
363
                    );
364
                    if (count % 100 == 0 || count == outputs.len()) &&
×
365
                        let Err(e) = txn.commit().await
×
366
                    {
367
                        warn!(
×
368
                            target: LOG_TARGET,
×
369
                            "Clean up failed sync - commit prune outputs for header '{}': {}",
370
                            current_header.hash(), e
×
371
                        );
372
                    }
×
373
                }
374
            }
×
375

376
            if let Ok(header) = db.fetch_header_by_block_hash(current_header.prev_hash).await {
×
377
                if let Some(previous_header) = header {
×
378
                    current_header = previous_header;
×
379
                } else {
×
380
                    warn!(target: LOG_TARGET, "Could not clean up failed output sync, previous_header link missing from db");
×
381
                    break;
×
382
                }
383
            } else {
384
                warn!(
×
385
                    target: LOG_TARGET,
×
386
                    "Could not clean up failed output sync, header '{}' not in db",
387
                    current_header.prev_hash.to_hex()
×
388
                );
389
                break;
×
390
            }
391
            if current_header.hash() == stop_hash {
×
392
                debug!(target: LOG_TARGET, "Reached stop point while cleaning up failed output sync");
×
393
                break;
×
394
            }
×
395
        }
396

397
        if let Err(e) = txn.commit().await {
×
398
            warn!(
×
399
                target: LOG_TARGET,
×
400
                "Clean up failed output sync - final commit failed: {}",
401
                e
402
            );
403
        }
×
404
    }
×
405

406
    /// Removes any outputs stored in the given block height range from the database.
407
    /// On a fresh start all `fetch_outputs_in_block` calls return empty, so this is a no-op.
408
    async fn clean_up_height_range(&mut self, start_height: u64, end_height: u64) -> Result<(), HorizonSyncError> {
×
409
        let db = self.db().clone();
×
410
        let mut txn = db.write_transaction();
×
411
        let mut count: u64 = 0;
×
412
        for height in start_height..=end_height {
×
413
            let header = db
×
414
                .fetch_header(height)
×
415
                .await?
×
416
                .ok_or_else(|| ChainStorageError::ValueNotFound {
×
417
                    entity: "Header",
418
                    field: "height",
419
                    value: height.to_string(),
×
420
                })?;
×
421
            let outputs = db.fetch_outputs_in_block(header.hash()).await?;
×
422
            for output in outputs {
×
423
                txn.prune_output_from_all_dbs(output.hash(), output.commitment.clone(), output.features.output_type);
×
424
                count += 1;
×
NEW
425
                if count.is_multiple_of(PROGRESS_REPORT_INTERVAL) {
×
426
                    txn.commit().await?;
×
427
                    txn = db.write_transaction();
×
428
                }
×
429
            }
430
        }
431
        txn.commit().await?;
×
432
        if count > 0 {
×
433
            debug!(
×
434
                target: LOG_TARGET,
×
435
                "Cleaned up {} partial output(s) from height range {}-{}", count, start_height, end_height
436
            );
437
        }
×
438
        Ok(())
×
439
    }
×
440

441
    async fn prune_if_needed(&mut self) -> Result<(), HorizonSyncError> {
×
442
        let local_metadata = self.db.get_chain_metadata().await?;
×
443
        let new_prune_height = cmp::min(local_metadata.best_block_height(), self.horizon_sync_height);
×
444
        if local_metadata.pruned_height() < new_prune_height {
×
445
            debug!(target: LOG_TARGET, "Pruning block chain to height {new_prune_height}");
×
446
            self.db.prune_to_height(new_prune_height).await?;
×
447
        }
×
448

449
        Ok(())
×
450
    }
×
451

452
    #[allow(clippy::too_many_lines)]
453
    async fn synchronize_kernels(
×
454
        &mut self,
×
455
        mut sync_peer: SyncPeer,
×
456
        client: &mut rpc::BaseNodeSyncRpcClient,
×
457
        to_header: &BlockHeader,
×
458
    ) -> Result<(), HorizonSyncError> {
×
459
        info!(target: LOG_TARGET, "Starting kernel sync from peer {sync_peer}");
×
460
        let local_num_kernels = self.db().fetch_mmr_size(MmrTree::Kernel).await?;
×
461

462
        let remote_num_kernels = to_header.kernel_mmr_size;
×
463
        self.num_kernels = remote_num_kernels;
×
464

465
        if local_num_kernels >= remote_num_kernels {
×
466
            debug!(target: LOG_TARGET, "Local kernel set already synchronized");
×
467
            return Ok(());
×
468
        }
×
469

470
        let info = HorizonSyncInfo::new(vec![sync_peer.node_id().clone()], HorizonSyncStatus::Kernels {
×
471
            current: local_num_kernels,
×
472
            total: remote_num_kernels,
×
473
            sync_peer: sync_peer.clone(),
×
474
        });
×
475
        self.hooks.call_on_progress_horizon_hooks(info);
×
476

477
        debug!(
×
478
            target: LOG_TARGET,
×
479
            "Requesting kernels from {} to {} ({} remaining)",
480
            local_num_kernels,
481
            remote_num_kernels,
482
            remote_num_kernels.saturating_sub(local_num_kernels),
×
483
        );
484

485
        let latency = client.get_last_request_latency();
×
486
        debug!(
×
487
            target: LOG_TARGET,
×
488
            "Initiating kernel sync with peer `{}` (latency = {}ms)",
489
            sync_peer.node_id(),
×
490
            latency.unwrap_or_default().as_millis()
×
491
        );
492

493
        let mut current_header = self.db().fetch_header_containing_kernel_mmr(local_num_kernels).await?;
×
494
        let req = SyncKernelsRequest {
×
495
            start: local_num_kernels,
×
496
            end_header_hash: to_header.hash().to_vec(),
×
497
        };
×
498
        let mut kernel_stream = client.sync_kernels(req).await?;
×
499

500
        debug!(
×
501
            target: LOG_TARGET,
×
502
            "Found header for kernels at mmr pos: {} height: {}",
503
            local_num_kernels,
504
            current_header.height()
×
505
        );
506
        let mut kernel_hashes = vec![];
×
507
        let db = self.db().clone();
×
508
        let mut txn = db.write_transaction();
×
509
        let mut mmr_position = local_num_kernels;
×
510
        let end = remote_num_kernels;
×
511
        let mut last_sync_timer = Instant::now();
×
512
        let mut avg_latency = RollingAverageTime::new(20);
×
513
        while let Some(kernel) = kernel_stream.next().await {
×
514
            let latency = last_sync_timer.elapsed();
×
515
            avg_latency.add_sample(latency);
×
516
            let kernel: TransactionKernel = kernel?.try_into().map_err(HorizonSyncError::ConversionError)?;
×
517
            kernel.verify_signature()?;
×
518

519
            kernel_hashes.push(kernel.hash());
×
520

521
            if mmr_position > end {
×
522
                return Err(HorizonSyncError::IncorrectResponse(
×
523
                    "Peer sent too many kernels".to_string(),
×
524
                ));
×
525
            }
×
526

527
            txn.insert_kernel_via_horizon_sync(kernel, *current_header.hash(), mmr_position);
×
528
            if mmr_position == current_header.header().kernel_mmr_size.saturating_sub(1) {
×
529
                let num_kernels = kernel_hashes.len();
×
530
                debug!(
×
531
                    target: LOG_TARGET,
×
532
                    "Header #{} ({} kernels, latency: {:.2?})",
533
                    current_header.height(),
×
534
                    num_kernels,
535
                    latency
536
                );
537
                // Validate root
538
                let block_data = db
×
539
                    .fetch_block_accumulated_data(current_header.header().prev_hash)
×
540
                    .await?;
×
541
                let kernel_pruned_set = block_data.dissolve();
×
542
                let mut kernel_mmr = PrunedKernelMmr::new(kernel_pruned_set);
×
543

544
                for hash in kernel_hashes.drain(..) {
×
545
                    kernel_mmr.push(hash.to_vec())?;
×
546
                }
547

548
                let mmr_root = kernel_mmr.get_merkle_root()?;
×
549
                if mmr_root.as_slice() != current_header.header().kernel_mr.as_slice() {
×
550
                    return Err(HorizonSyncError::InvalidMrRoot {
×
551
                        mr_tree: MmrTree::Kernel.to_string(),
×
552
                        at_height: current_header.height(),
×
553
                        expected_hex: current_header.header().kernel_mr.to_hex(),
×
554
                        actual_hex: mmr_root.to_hex(),
×
555
                    });
×
556
                }
×
557

558
                let kernel_hash_set = kernel_mmr.get_pruned_hash_set()?;
×
559
                debug!(
×
560
                    target: LOG_TARGET,
×
561
                    "Updating block data at height {}",
562
                    current_header.height()
×
563
                );
564
                txn.update_block_accumulated_data_via_horizon_sync(
×
565
                    *current_header.hash(),
×
566
                    UpdateBlockAccumulatedData {
×
567
                        kernel_hash_set: Some(kernel_hash_set),
×
568
                        ..Default::default()
×
569
                    },
×
570
                );
571

572
                txn.commit().await?;
×
573
                debug!(
×
574
                    target: LOG_TARGET,
×
575
                    "Committed {} kernel(s), ({}/{}) {} remaining",
576
                    num_kernels,
577
                    mmr_position + 1,
×
578
                    end,
579
                    end.saturating_sub(mmr_position + 1)
×
580
                );
581
                if mmr_position < end.saturating_sub(1) {
×
582
                    current_header = db.fetch_chain_header(current_header.height() + 1).await?;
×
583
                }
×
584
            }
×
585
            mmr_position += 1;
×
586

587
            sync_peer.set_latency(latency);
×
588
            sync_peer.add_sample(last_sync_timer.elapsed());
×
589
            if mmr_position % 100 == 0 || mmr_position == self.num_kernels {
×
590
                let info = HorizonSyncInfo::new(vec![sync_peer.node_id().clone()], HorizonSyncStatus::Kernels {
×
591
                    current: mmr_position,
×
592
                    total: self.num_kernels,
×
593
                    sync_peer: sync_peer.clone(),
×
594
                });
×
595
                self.hooks.call_on_progress_horizon_hooks(info);
×
596
            }
×
597

598
            self.check_latency(sync_peer.node_id(), &avg_latency)?;
×
599

600
            last_sync_timer = Instant::now();
×
601
        }
602

603
        if mmr_position != end {
×
604
            return Err(HorizonSyncError::IncorrectResponse(
×
605
                "Sync node did not send all kernels requested".to_string(),
×
606
            ));
×
607
        }
×
608
        Ok(())
×
609
    }
×
610

611
    fn check_latency(&self, peer: &NodeId, avg_latency: &RollingAverageTime) -> Result<(), HorizonSyncError> {
×
612
        if let Some(avg_latency) = avg_latency.calculate_average_with_min_samples(5) &&
×
613
            avg_latency > self.max_latency
×
614
        {
615
            return Err(HorizonSyncError::MaxLatencyExceeded {
×
616
                peer: peer.clone(),
×
617
                latency: avg_latency,
×
618
                max_latency: self.max_latency,
×
619
            });
×
620
        }
×
621

622
        Ok(())
×
623
    }
×
624

625
    // Synchronize outputs in independently-verifiable tranches
626
    #[allow(clippy::too_many_lines)]
627
    async fn synchronize_outputs(
×
628
        &mut self,
×
629
        mut sync_peer: SyncPeer,
×
630
        client: &mut rpc::BaseNodeSyncRpcClient,
×
631
        to_header: &BlockHeader,
×
632
    ) -> Result<(), HorizonSyncError> {
×
633
        info!(target: LOG_TARGET, "Starting output sync from peer {sync_peer}");
×
634
        let db = self.db().clone();
×
635

636
        let stored_checkpoint = db.fetch_horizon_sync_output_checkpoint().await?;
×
637

638
        let checkpoint_height = match stored_checkpoint {
×
639
            Some(ref cp) if cp.sync_target_height == to_header.height && cp.sync_target_hash == to_header.hash() => {
×
640
                match db.fetch_header(cp.checkpoint_height).await? {
×
641
                    Some(header) if header.hash() == cp.checkpoint_hash => {
×
642
                        info!(
×
643
                            target: LOG_TARGET,
×
644
                            "Resuming output sync from checkpoint at height {}, target unchanged",
645
                            cp.checkpoint_height
646
                        );
647
                        Some(cp.checkpoint_height)
×
648
                    },
649
                    _ => {
650
                        warn!(
×
651
                            target: LOG_TARGET,
×
652
                            "Horizon sync checkpoint at height {} is no longer on the canonical chain (reorg \
653
                             detected). Discarding checkpoint and restarting output sync from scratch.",
654
                            cp.checkpoint_height
655
                        );
656
                        db.write_transaction()
×
657
                            .clear_horizon_sync_output_checkpoint()
×
658
                            .commit()
×
659
                            .await?;
×
660
                        None
×
661
                    },
662
                }
663
            },
664
            Some(ref cp) => {
×
665
                warn!(
×
666
                    target: LOG_TARGET,
×
667
                    "Horizon sync target changed from height {} to {}. Discarding checkpoint and cleaning up \
668
                     partial outputs.",
669
                    cp.sync_target_height,
670
                    to_header.height
671
                );
672
                db.write_transaction()
×
673
                    .clear_horizon_sync_output_checkpoint()
×
674
                    .commit()
×
675
                    .await?;
×
676
                if let Ok(Some(cleanup_header)) = db.fetch_header(cp.checkpoint_height).await {
×
677
                    self.clean_up_failed_output_sync(&cleanup_header).await;
×
678
                }
×
679
                None
×
680
            },
681
            None => None,
×
682
        };
683
        let (sync_start_height, mut jmt_version) = match checkpoint_height {
×
684
            Some(h) => {
×
685
                // Only the in-progress (first resumption) tranche may have partial output data.
686
                let first_tranche_end = cmp::min(
×
687
                    (h + 1).saturating_add(HORIZON_SYNC_TRANCHE_SIZE).saturating_sub(1),
×
688
                    to_header.height,
×
689
                );
690
                self.clean_up_height_range(h + 1, first_tranche_end).await?;
×
691
                (h + 1, h)
×
692
            },
693
            None => {
694
                self.clean_up_height_range(0, to_header.height).await?;
×
695
                (0, 0)
×
696
            },
697
        };
698

699
        if sync_start_height > to_header.height {
×
700
            info!(
×
701
                target: LOG_TARGET,
×
702
                "Output sync already complete (sync_start_height {} > horizon height {})",
703
                sync_start_height,
704
                to_header.height
705
            );
706
            return Ok(());
×
707
        }
×
708

709
        self.num_outputs = to_header.output_smt_size;
×
710

711
        let info = HorizonSyncInfo::new(vec![sync_peer.node_id().clone()], HorizonSyncStatus::Outputs {
×
712
            current: 0,
×
713
            total: self.num_outputs,
×
714
            sync_peer: sync_peer.clone(),
×
715
        });
×
716
        self.hooks.call_on_progress_horizon_hooks(info);
×
717

718
        let latency = client.get_last_request_latency();
×
719
        debug!(
×
720
            target: LOG_TARGET,
×
721
            "Initiating output sync with peer `{}`, requesting ~{} outputs from height {} to height {} \
722
            last_chain_header height `{}` (latency = {}ms)",
723
            sync_peer.node_id(),
×
724
            self.num_outputs,
725
            sync_start_height,
726
            to_header.height,
727
            db.fetch_last_chain_header().await?.height(),
×
728
            latency.unwrap_or_default().as_millis(),
×
729
        );
730

731
        let timer = Instant::now();
×
732
        let mut total_utxo_counter = 0u64;
×
733
        let mut total_stxo_counter = 0u64;
×
734
        let mut tranche_start_height = sync_start_height;
×
735

736
        // Process the full block range in tranches
737
        while tranche_start_height <= to_header.height {
×
738
            let tranche_end_height = cmp::min(
×
739
                tranche_start_height
×
740
                    .saturating_add(HORIZON_SYNC_TRANCHE_SIZE)
×
741
                    .saturating_sub(1),
×
742
                to_header.height,
×
743
            );
744

745
            let tranche_start_header =
×
746
                db.fetch_header(tranche_start_height)
×
747
                    .await?
×
748
                    .ok_or_else(|| ChainStorageError::ValueNotFound {
×
749
                        entity: "Header",
750
                        field: "height",
751
                        value: tranche_start_height.to_string(),
×
752
                    })?;
×
753
            let tranche_end_header = if tranche_end_height == to_header.height {
×
754
                to_header.clone()
×
755
            } else {
756
                db.fetch_header(tranche_end_height)
×
757
                    .await?
×
758
                    .ok_or_else(|| ChainStorageError::ValueNotFound {
×
759
                        entity: "Header",
760
                        field: "height",
761
                        value: tranche_end_height.to_string(),
×
762
                    })?
×
763
            };
764

765
            debug!(
×
766
                target: LOG_TARGET,
×
767
                "Syncing output tranche heights {}-{} ({} blocks) from peer {}",
768
                tranche_start_height,
769
                tranche_end_height,
770
                tranche_end_height - tranche_start_height + 1,
×
771
                sync_peer.node_id(),
×
772
            );
773

774
            let req = SyncUtxosRequest {
×
775
                start_header_hash: tranche_start_header.hash().to_vec(),
×
776
                end_header_hash: tranche_end_header.hash().to_vec(),
×
777
            };
×
778
            let mut output_stream = tokio::time::timeout(self.config.rpc_deadline, client.sync_utxos(req))
×
779
                .await
×
780
                .map_err(|_| {
×
781
                    HorizonSyncError::RpcStatus(RpcStatus::general(&format!(
×
782
                        "Timed out waiting for sync_utxos stream from peer {}",
×
783
                        sync_peer.node_id()
×
784
                    )))
×
785
                })??;
×
786

787
            let mut txn = db.write_transaction();
×
788
            let mut utxo_counter = 0u64;
×
789
            let mut stxo_counter = 0u64;
×
790
            let mut items_processed = 0u64;
×
791
            let mut last_sync_timer = Instant::now();
×
792
            let mut avg_latency = RollingAverageTime::new(20);
×
793

794
            // Accumulate SMT updates for the current tranche only. These are not applied until the full tranche
795
            // stream has been received and the SMT root verified.
796
            let mut state_tree_updates = BTreeMap::<FixedHash, Option<FixedHash>>::new();
×
797
            let mut inputs_to_delete = Vec::new();
×
798
            let mut batch_op_counter = 0;
×
799
            let mut last_mined_header: Option<FixedHash> = None;
×
800

801
            while let Some(response) = output_stream.next().await {
×
802
                let latency = last_sync_timer.elapsed();
×
803
                avg_latency.add_sample(latency);
×
804
                let res: SyncUtxosResponse = response?;
×
805

806
                let output_header_hash = FixedHash::try_from(res.mined_header).map_err(|e| {
×
807
                    HorizonSyncError::IncorrectResponse(format!("Peer sent invalid mined header: {}", e))
×
808
                })?;
×
809
                last_mined_header = Some(output_header_hash);
×
810
                let current_header = self
×
811
                    .db()
×
812
                    .fetch_header_by_block_hash(output_header_hash)
×
813
                    .await?
×
814
                    .ok_or_else(|| {
×
815
                        HorizonSyncError::IncorrectResponse("Peer sent mined header we do not know of".into())
×
816
                    })?;
×
817

818
                let proto_output = res.txo.ok_or_else(|| {
×
819
                    HorizonSyncError::IncorrectResponse("Peer sent no transaction output data".into())
×
820
                })?;
×
821
                match proto_output {
×
822
                    Txo::Output(output) => {
×
823
                        let output = TransactionOutput::try_from(output).map_err(HorizonSyncError::ConversionError)?;
×
824
                        if !output.is_burned() {
×
825
                            utxo_counter += 1;
×
826
                            let output_hash = output.hash();
×
827
                            debug!(
×
828
                                target: LOG_TARGET,
×
829
                                "UTXO `{}` received from sync peer ({} of {})",
830
                                output_hash,
831
                                total_utxo_counter + utxo_counter,
×
832
                                self.num_outputs,
833
                            );
834
                            let key_bytes: [u8; 32] = output.commitment.as_bytes().try_into().map_err(|e| {
×
835
                                HorizonSyncError::IncorrectResponse(format!("Peer sent malformed commitment: {}", e))
×
836
                            })?;
×
837
                            let key = FixedHash::from(key_bytes);
×
838

839
                            if state_tree_updates
×
840
                                .insert(key, Some(output.smt_hash(current_header.height)))
×
841
                                .is_some()
×
842
                            {
843
                                return Err(HorizonSyncError::IncorrectResponse(
×
844
                                    "Peer sent duplicate output commitment during horizon sync".into(),
×
845
                                ));
×
846
                            }
×
847

848
                            let constants = self.rules.consensus_constants(current_header.height).clone();
×
849
                            validate_output_version(&constants, &output)?;
×
850
                            validate_individual_output(&output, &constants)?;
×
851
                            batch_verify_range_proofs(&self.prover, &[&output])?;
×
852

853
                            txn.insert_output_via_horizon_sync(
×
854
                                output,
×
855
                                current_header.hash(),
×
856
                                current_header.height,
×
857
                                current_header.timestamp.as_u64(),
×
858
                            );
859
                            batch_op_counter += 1;
×
860
                        }
×
861
                    },
862
                    Txo::Commitment(commitment_bytes) => {
×
863
                        stxo_counter += 1;
×
864

865
                        let commitment = CompressedCommitment::from_canonical_bytes(commitment_bytes.as_slice())?;
×
866
                        match self
×
867
                            .db()
×
868
                            .fetch_unspent_output_hash_by_commitment(commitment.clone())
×
869
                            .await?
×
870
                        {
871
                            Some(output_hash) => {
×
872
                                debug!(
×
873
                                    target: LOG_TARGET,
×
874
                                    "STXO hash `{output_hash}` received from sync peer ({stxo_counter})",
875
                                );
876
                                let key_bytes: [u8; 32] = commitment_bytes.as_slice().try_into().map_err(|e| {
×
877
                                    HorizonSyncError::IncorrectResponse(format!(
×
878
                                        "Peer sent malformed commitment: {}",
×
879
                                        e
×
880
                                    ))
×
881
                                })?;
×
882
                                let key = FixedHash::from(key_bytes);
×
883

884
                                if matches!(state_tree_updates.get(&key), Some(None)) {
×
885
                                    return Err(HorizonSyncError::ChainStorageError(
×
886
                                        ChainStorageError::UnspendableInput,
×
887
                                    ));
×
888
                                }
×
889
                                state_tree_updates.insert(key, None);
×
890

891
                                let output_info = self.db().fetch_output(output_hash).await?.ok_or_else(|| {
×
892
                                    HorizonSyncError::IncorrectResponse(
×
893
                                        "Could not fetch full output for spent commitment".into(),
×
894
                                    )
×
895
                                })?;
×
896
                                inputs_to_delete.push(output_info.output);
×
897
                            },
898
                            None => {
899
                                return Err(HorizonSyncError::IncorrectResponse(
×
900
                                    "Peer sent unknown commitment hash".into(),
×
901
                                ));
×
902
                            },
903
                        }
904
                    },
905
                }
906

907
                items_processed += 1;
×
908
                if items_processed.is_multiple_of(PROGRESS_REPORT_INTERVAL) {
×
909
                    let utxo_progress = total_utxo_counter + utxo_counter;
×
910
                    let info = HorizonSyncInfo::new(vec![sync_peer.node_id().clone()], HorizonSyncStatus::Outputs {
×
911
                        current: utxo_progress,
×
912
                        total: self.num_outputs,
×
913
                        sync_peer: sync_peer.clone(),
×
914
                    });
×
915
                    self.hooks.call_on_progress_horizon_hooks(info);
×
916
                }
×
917

918
                if batch_op_counter >= HORIZON_SYNC_BATCH_SIZE {
×
919
                    txn.commit().await?;
×
920
                    txn = db.write_transaction();
×
921
                    batch_op_counter = 0;
×
922
                }
×
923

924
                sync_peer.set_latency(latency);
×
925
                sync_peer.add_sample(last_sync_timer.elapsed());
×
926
                last_sync_timer = Instant::now();
×
927
            }
928

929
            // Verify the stream completed the full tranche before committing.
930
            if let Some(last_hash) = last_mined_header &&
×
931
                last_hash != tranche_end_header.hash()
×
932
            {
933
                return Err(HorizonSyncError::IncorrectResponse(format!(
×
934
                    "Sync peer did not complete output stream for tranche {}-{}. Last block received: {}, expected: {}",
×
935
                    tranche_start_height,
×
936
                    tranche_end_height,
×
937
                    last_hash.to_hex(),
×
938
                    tranche_end_header.hash().to_hex()
×
939
                )));
×
940
            }
×
941

942
            let tranche_updates = state_tree_updates
×
943
                .into_iter()
×
944
                .map(|(key, value)| HorizonStateTreeUpdate { key, value })
×
945
                .collect::<Vec<_>>();
×
946

947
            txn.apply_horizon_state_tree_updates(jmt_version, tranche_end_header.height, tranche_updates);
×
948
            for output in &inputs_to_delete {
×
949
                if let Some(sidechain_feature) = output.features.sidechain_feature.as_ref() &&
×
950
                    let Some(vn_reg) = sidechain_feature.validator_node_registration()
×
951
                {
×
952
                    txn.delete_validator_node(
×
953
                        sidechain_feature.sidechain_public_key().cloned(),
×
954
                        vn_reg.public_key().clone(),
×
955
                    );
×
956
                }
×
957
            }
958
            for output in inputs_to_delete {
×
959
                txn.prune_output_from_all_dbs(output.hash(), output.commitment.clone(), output.features.output_type);
×
960
            }
×
961
            // Only checkpoint intermediate tranches for network recovery.
962
            // We intentionally do not checkpoint the final tranche before verification.
963
            // If the final root verification fails, the ENTIRE state is considered poisoned.
964
            if tranche_end_height < to_header.height {
×
965
                txn.set_horizon_sync_output_checkpoint(HorizonSyncOutputCheckpoint {
×
966
                    checkpoint_height: tranche_end_height,
×
967
                    checkpoint_hash: tranche_end_header.hash(),
×
968
                    sync_target_height: to_header.height,
×
969
                    sync_target_hash: to_header.hash(),
×
970
                });
×
971
            }
×
972
            txn.commit().await?;
×
973
            jmt_version = tranche_end_height;
×
974

975
            debug!(
×
976
                target: LOG_TARGET,
×
977
                "Committed output tranche heights {}-{}: {} UTXOs and {} STXOs",
978
                tranche_start_height,
979
                tranche_end_height,
980
                utxo_counter,
981
                stxo_counter,
982
            );
983

984
            total_utxo_counter += utxo_counter;
×
985
            total_stxo_counter += stxo_counter;
×
986
            tranche_start_height = tranche_end_height + 1;
×
987
        }
988

989
        if let Err(e) = db
×
990
            .verify_horizon_sync_output_root(to_header.height, to_header.output_mr)
×
991
            .await
×
992
        {
993
            warn!(
×
994
                target: LOG_TARGET,
×
995
                "Final JMT root verification failed! The entire synced state is poisoned. Clearing checkpoint."
996
            );
997
            let _unused = db
×
998
                .write_transaction()
×
999
                .clear_horizon_sync_output_checkpoint()
×
1000
                .commit()
×
1001
                .await;
×
1002
            return Err(HorizonSyncError::ChainStorageError(e));
×
1003
        }
×
1004

1005
        // Mark output sync as complete
1006
        db.write_transaction()
×
1007
            .set_horizon_sync_output_checkpoint(HorizonSyncOutputCheckpoint {
×
1008
                checkpoint_height: to_header.height,
×
1009
                checkpoint_hash: to_header.hash(),
×
1010
                sync_target_height: to_header.height,
×
1011
                sync_target_hash: to_header.hash(),
×
1012
            })
×
1013
            .commit()
×
1014
            .await?;
×
1015

1016
        debug!(
×
1017
            target: LOG_TARGET,
×
1018
            "Finished syncing TXOs: {} unspent and {} spent downloaded in {:.2?}",
1019
            total_utxo_counter,
1020
            total_stxo_counter,
1021
            timer.elapsed()
×
1022
        );
1023
        Ok(())
×
1024
    }
×
1025

1026
    // Finalize the horizon state synchronization by setting the chain metadata to the local tip and committing
1027
    // the horizon state to the blockchain backend.
1028
    async fn finalize_horizon_sync(&mut self, sync_peer: &SyncPeer) -> Result<(), HorizonSyncError> {
×
1029
        debug!(target: LOG_TARGET, "Validating horizon state");
×
1030

1031
        self.hooks.call_on_progress_horizon_hooks(HorizonSyncInfo::new(
×
1032
            vec![sync_peer.node_id().clone()],
×
1033
            HorizonSyncStatus::Finalizing,
×
1034
        ));
1035

1036
        let header = self.db().fetch_chain_header(self.horizon_sync_height).await?;
×
1037
        let (calc_utxo_sum, calc_kernel_sum, calc_burned_sum) = self.calculate_commitment_sums(&header).await?;
×
1038

1039
        self.final_state_validator
×
1040
            .validate(
×
1041
                &*self.db().inner().db_read_access()?,
×
1042
                header.height(),
×
1043
                &calc_utxo_sum,
×
1044
                &calc_kernel_sum,
×
1045
                &calc_burned_sum,
×
1046
            )
1047
            .map_err(HorizonSyncError::FinalStateValidationFailed)?;
×
1048

1049
        let metadata = self.db().get_chain_metadata().await?;
×
1050
        info!(
×
1051
            target: LOG_TARGET,
×
1052
            "Horizon state validation succeeded! Committing horizon state."
1053
        );
1054
        self.db()
×
1055
            .write_transaction()
×
1056
            .set_best_block(
×
1057
                header.height(),
×
1058
                *header.hash(),
×
1059
                header.accumulated_data().total_accumulated_difficulty,
×
1060
                *metadata.best_block_hash(),
×
1061
                header.timestamp(),
×
1062
            )
×
1063
            .set_pruned_height(header.height())
×
1064
            .set_horizon_data(calc_kernel_sum, calc_utxo_sum)
×
1065
            .clear_horizon_sync_output_checkpoint()
×
1066
            .commit()
×
1067
            .await?;
×
1068

1069
        Ok(())
×
1070
    }
×
1071

1072
    /// (UTXO sum, Kernel sum)
1073
    async fn calculate_commitment_sums(
×
1074
        &mut self,
×
1075
        header: &ChainHeader,
×
1076
    ) -> Result<(CompressedCommitment, CompressedCommitment, CompressedCommitment), HorizonSyncError> {
×
1077
        let mut utxo_sum = HomomorphicCommitment::default();
×
1078
        let mut kernel_sum = HomomorphicCommitment::default();
×
1079
        let mut burned_sum = HomomorphicCommitment::default();
×
1080

1081
        let mut prev_kernel_mmr = 0;
×
1082

1083
        let height = header.height();
×
1084
        let db = self.db().inner().clone();
×
1085
        let header_hash = *header.hash();
×
1086
        task::spawn_blocking(move || {
×
1087
            for h in 0..=height {
×
1088
                let curr_header = db.fetch_chain_header(h)?;
×
1089
                trace!(
×
1090
                    target: LOG_TARGET,
×
1091
                    "Fetching utxos from db: height:{}",
1092
                    curr_header.height(),
×
1093
                );
1094
                let utxos = db.fetch_outputs_in_block_with_spend_state(*curr_header.hash(), Some(header_hash))?;
×
1095
                debug!(
×
1096
                    target: LOG_TARGET,
×
1097
                    "{} output(s) loaded for height {}",
1098
                    utxos.len(),
×
1099
                    curr_header.height()
×
1100
                );
1101
                trace!(
×
1102
                    target: LOG_TARGET,
×
1103
                    "Fetching kernels from db: height:{}, header.kernel_mmr:{}, prev_mmr:{}, end:{}",
1104
                    curr_header.height(),
×
1105
                    curr_header.header().kernel_mmr_size,
×
1106
                    prev_kernel_mmr,
1107
                    curr_header.header().kernel_mmr_size.saturating_sub(1)
×
1108
                );
1109

1110
                trace!(target: LOG_TARGET, "Number of utxos returned: {}", utxos.len());
×
1111
                for (u, spent) in utxos {
×
1112
                    if !spent {
×
1113
                        utxo_sum = &u.commitment.to_commitment()? + &utxo_sum;
×
1114
                    }
×
1115
                }
1116

1117
                let kernels = db.fetch_kernels_in_block(*curr_header.hash())?;
×
1118
                trace!(target: LOG_TARGET, "Number of kernels returned: {}", kernels.len());
×
1119
                for k in kernels {
×
1120
                    kernel_sum = &k.excess.to_commitment()? + &kernel_sum;
×
1121
                    if k.is_burned() {
×
1122
                        burned_sum = &(k.get_burn_commitment()?.to_commitment()?) + &burned_sum;
×
1123
                    }
×
1124
                }
1125
                prev_kernel_mmr = curr_header.header().kernel_mmr_size;
×
1126

1127
                if h % 1000 == 0 && height != 0 {
×
1128
                    debug!(
×
1129
                        target: LOG_TARGET,
×
1130
                        "Final Validation: {:.2}% complete. Height: {} sync",
1131
                        (h as f32 / height as f32) * 100.0,
×
1132
                        h,
1133
                    );
1134
                }
×
1135
            }
1136

1137
            Ok((
×
1138
                CompressedCommitment::from_commitment(utxo_sum),
×
1139
                CompressedCommitment::from_commitment(kernel_sum),
×
1140
                CompressedCommitment::from_commitment(burned_sum),
×
1141
            ))
×
1142
        })
×
1143
        .await?
×
1144
    }
×
1145

1146
    // Sync peers are also removed from the list of sync peers if the ban duration is longer than the short ban period.
1147
    fn remove_sync_peer(&mut self, node_id: &NodeId) {
×
1148
        if let Some(pos) = self.sync_peers.iter().position(|p| p.node_id() == node_id) {
×
1149
            self.sync_peers.remove(pos);
×
1150
        }
×
1151
    }
×
1152

1153
    // Helper function to get the index to the node_id inside of the vec of peers
1154
    fn get_sync_peer_index(&mut self, node_id: &NodeId) -> Option<usize> {
×
1155
        self.sync_peers.iter().position(|p| p.node_id() == node_id)
×
1156
    }
×
1157

1158
    #[inline]
1159
    fn db(&self) -> &AsyncBlockchainDb<B> {
×
1160
        &self.db
×
1161
    }
×
1162
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc