• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 19920181775

04 Dec 2025 06:43AM UTC coverage: 60.517% (-0.3%) from 60.819%
19920181775

push

github

web-flow
feat: improve scanning feedback (#7622)

Description
---
Improve the scanning feddback

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

* **Chores**
* Enhanced state inspection and logging for UTXO processing in wallet
server operations, including mined and deletion status tracking.
* Added Display formatting for transaction types to support improved
debugging and logging output.

<sub>✏️ Tip: You can customize this high-level summary in your review
settings.</sub>

<!-- end of auto-generated comment: release notes by coderabbit.ai -->

0 of 26 new or added lines in 1 file covered. (0.0%)

532 existing lines in 21 files now uncovered.

70369 of 116280 relevant lines covered (60.52%)

299331.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs
1
//  Copyright 2020, The Tari Project
2
//
3
//  Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
//  following conditions are met:
5
//
6
//  1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
//  disclaimer.
8
//
9
//  2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
//  following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
//  3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
//  products derived from this software without specific prior written permission.
14
//
15
//  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
//  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
//  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
//  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
//  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
//  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
//  USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
use std::{
24
    convert::{TryFrom, TryInto},
25
    sync::Arc,
26
    time::{Duration, Instant},
27
};
28

29
use futures::StreamExt;
30
use log::*;
31
use tari_comms::{connectivity::ConnectivityRequester, peer_manager::NodeId, protocol::rpc::RpcClient, PeerConnection};
32
use tari_node_components::blocks::{Block, ChainBlock};
33
use tari_transaction_components::{aggregated_body::AggregateBody, BanPeriod};
34
use tari_utilities::hex::Hex;
35

36
use super::error::BlockSyncError;
37
use crate::{
38
    base_node::{
39
        sync::{ban::PeerBanManager, hooks::Hooks, rpc, SyncPeer},
40
        BlockchainSyncConfig,
41
    },
42
    chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend},
43
    common::rolling_avg::RollingAverageTime,
44
    proto::base_node::SyncBlocksRequest,
45
    validation::{BlockBodyValidator, ValidationError},
46
};
47
const LOG_TARGET: &str = "c::bn::block_sync";
48

49
const MAX_LATENCY_INCREASES: usize = 5;
50

51
pub struct BlockSynchronizer<'a, B> {
52
    config: BlockchainSyncConfig,
53
    db: AsyncBlockchainDb<B>,
54
    connectivity: ConnectivityRequester,
55
    sync_peers: &'a mut Vec<SyncPeer>,
56
    block_validator: Arc<dyn BlockBodyValidator<B>>,
57
    hooks: Hooks,
58
    peer_ban_manager: PeerBanManager,
59
}
60

61
impl<'a, B: BlockchainBackend + 'static> BlockSynchronizer<'a, B> {
UNCOV
62
    pub fn new(
×
UNCOV
63
        config: BlockchainSyncConfig,
×
UNCOV
64
        db: AsyncBlockchainDb<B>,
×
UNCOV
65
        connectivity: ConnectivityRequester,
×
UNCOV
66
        sync_peers: &'a mut Vec<SyncPeer>,
×
UNCOV
67
        block_validator: Arc<dyn BlockBodyValidator<B>>,
×
UNCOV
68
    ) -> Self {
×
UNCOV
69
        let peer_ban_manager = PeerBanManager::new(config.clone(), connectivity.clone());
×
UNCOV
70
        Self {
×
UNCOV
71
            config,
×
UNCOV
72
            db,
×
UNCOV
73
            connectivity,
×
UNCOV
74
            sync_peers,
×
UNCOV
75
            block_validator,
×
UNCOV
76
            hooks: Default::default(),
×
UNCOV
77
            peer_ban_manager,
×
UNCOV
78
        }
×
UNCOV
79
    }
×
80

UNCOV
81
    pub fn on_starting<H>(&mut self, hook: H)
×
UNCOV
82
    where for<'r> H: FnOnce(&SyncPeer) + Send + Sync + 'static {
×
UNCOV
83
        self.hooks.add_on_starting_hook(hook);
×
UNCOV
84
    }
×
85

UNCOV
86
    pub fn on_progress<H>(&mut self, hook: H)
×
UNCOV
87
    where H: Fn(Arc<ChainBlock>, u64, &SyncPeer) + Send + Sync + 'static {
×
UNCOV
88
        self.hooks.add_on_progress_block_hook(hook);
×
UNCOV
89
    }
×
90

UNCOV
91
    pub fn on_complete<H>(&mut self, hook: H)
×
UNCOV
92
    where H: Fn(Arc<ChainBlock>, u64) + Send + Sync + 'static {
×
UNCOV
93
        self.hooks.add_on_complete_hook(hook);
×
UNCOV
94
    }
×
95

UNCOV
96
    pub async fn synchronize(&mut self) -> Result<(), BlockSyncError> {
×
UNCOV
97
        let mut max_latency = self.config.initial_max_sync_latency;
×
UNCOV
98
        let mut sync_round = 0;
×
UNCOV
99
        let mut latency_increases_counter = 0;
×
100
        loop {
UNCOV
101
            match self.attempt_block_sync(max_latency).await {
×
UNCOV
102
                Ok(_) => return Ok(()),
×
103
                Err(err @ BlockSyncError::AllSyncPeersExceedLatency) => {
×
104
                    warn!(target: LOG_TARGET, "{err}");
×
105
                    max_latency += self.config.max_latency_increase;
×
106
                    warn!(
×
107
                        target: LOG_TARGET,
×
108
                        "Retrying block sync with increased max latency {:.2?} with {} sync peers",
×
109
                        max_latency,
110
                        self.sync_peers.len()
×
111
                    );
112
                    latency_increases_counter += 1;
×
113
                    if latency_increases_counter > MAX_LATENCY_INCREASES {
×
114
                        return Err(err);
×
115
                    }
×
116
                    // Prohibit using a few slow sync peers only, rather get new sync peers assigned
117
                    if self.sync_peers.len() < 2 {
×
118
                        return Err(err);
×
119
                    } else {
120
                        continue;
×
121
                    }
122
                },
123
                Err(err @ BlockSyncError::SyncRoundFailed) => {
×
124
                    sync_round += 1;
×
125
                    warn!(target: LOG_TARGET, "{err} ({sync_round})");
×
126
                    continue;
×
127
                },
128
                Err(err) => {
×
129
                    return Err(err);
×
130
                },
131
            }
132
        }
UNCOV
133
    }
×
134

UNCOV
135
    async fn attempt_block_sync(&mut self, max_latency: Duration) -> Result<(), BlockSyncError> {
×
UNCOV
136
        let sync_peer_node_ids = self.sync_peers.iter().map(|p| p.node_id()).cloned().collect::<Vec<_>>();
×
UNCOV
137
        info!(
×
138
            target: LOG_TARGET,
×
139
            "Attempting to sync blocks({} sync peers)",
×
140
            sync_peer_node_ids.len()
×
141
        );
UNCOV
142
        let mut latency_counter = 0usize;
×
UNCOV
143
        for node_id in sync_peer_node_ids {
×
UNCOV
144
            let peer_index = self.get_sync_peer_index(&node_id).ok_or(BlockSyncError::PeerNotFound)?;
×
UNCOV
145
            let sync_peer = self.sync_peers.get(peer_index).expect("Already checked");
×
UNCOV
146
            self.hooks.call_on_starting_hook(sync_peer);
×
UNCOV
147
            let mut conn = match self.connect_to_sync_peer(node_id.clone()).await {
×
UNCOV
148
                Ok(val) => val,
×
149
                Err(e) => {
×
150
                    warn!(
×
151
                        target: LOG_TARGET,
×
152
                        "Failed to connect to sync peer `{node_id}`: {e}"
×
153
                    );
154
                    self.remove_sync_peer(&node_id);
×
155
                    continue;
×
156
                },
157
            };
UNCOV
158
            let config = RpcClient::builder()
×
UNCOV
159
                .with_deadline(self.config.rpc_deadline)
×
UNCOV
160
                .with_deadline_grace_period(Duration::from_secs(5));
×
UNCOV
161
            let mut client = match conn
×
UNCOV
162
                .connect_rpc_using_builder::<rpc::BaseNodeSyncRpcClient>(config)
×
UNCOV
163
                .await
×
164
            {
UNCOV
165
                Ok(val) => val,
×
166
                Err(e) => {
×
167
                    warn!(
×
168
                        target: LOG_TARGET,
×
169
                        "Failed to obtain RPC connection from sync peer `{node_id}`: {e}"
×
170
                    );
171
                    self.remove_sync_peer(&node_id);
×
172
                    continue;
×
173
                },
174
            };
UNCOV
175
            let latency = client
×
UNCOV
176
                .get_last_request_latency()
×
UNCOV
177
                .expect("unreachable panic: last request latency must be set after connect");
×
UNCOV
178
            self.sync_peers
×
UNCOV
179
                .get_mut(peer_index)
×
UNCOV
180
                .expect("Already checked")
×
UNCOV
181
                .set_latency(latency);
×
UNCOV
182
            let sync_peer = self.sync_peers.get(peer_index).expect("Already checked").clone();
×
UNCOV
183
            info!(
×
184
                target: LOG_TARGET,
×
185
                "Attempting to synchronize blocks with `{node_id}` latency: {latency:.2?}"
×
186
            );
UNCOV
187
            match self.synchronize_blocks(sync_peer, client, max_latency).await {
×
UNCOV
188
                Ok(_) => return Ok(()),
×
189
                Err(err) => {
×
190
                    warn!(target: LOG_TARGET, "{err}");
×
191
                    let ban_reason = BlockSyncError::get_ban_reason(&err);
×
192
                    if let Some(reason) = ban_reason {
×
193
                        let duration = match reason.ban_duration {
×
194
                            BanPeriod::Short => self.config.short_ban_period,
×
195
                            BanPeriod::Long => self.config.ban_period,
×
196
                        };
197
                        self.peer_ban_manager
×
198
                            .ban_peer_if_required(&node_id, reason.reason, duration)
×
199
                            .await;
×
200
                    }
×
201
                    if let BlockSyncError::MaxLatencyExceeded { .. } = err {
×
202
                        latency_counter += 1;
×
203
                    } else {
×
204
                        self.remove_sync_peer(&node_id);
×
205
                    }
×
206
                },
207
            }
208
        }
209

210
        if self.sync_peers.is_empty() {
×
211
            Err(BlockSyncError::NoMoreSyncPeers("Block sync failed".to_string()))
×
212
        } else if latency_counter >= self.sync_peers.len() {
×
213
            Err(BlockSyncError::AllSyncPeersExceedLatency)
×
214
        } else {
215
            Err(BlockSyncError::SyncRoundFailed)
×
216
        }
UNCOV
217
    }
×
218

UNCOV
219
    async fn connect_to_sync_peer(&self, peer: NodeId) -> Result<PeerConnection, BlockSyncError> {
×
UNCOV
220
        let connection = self.connectivity.dial_peer(peer).await?;
×
UNCOV
221
        Ok(connection)
×
UNCOV
222
    }
×
223

224
    #[allow(clippy::too_many_lines)]
UNCOV
225
    async fn synchronize_blocks(
×
UNCOV
226
        &mut self,
×
UNCOV
227
        mut sync_peer: SyncPeer,
×
UNCOV
228
        mut client: rpc::BaseNodeSyncRpcClient,
×
UNCOV
229
        max_latency: Duration,
×
UNCOV
230
    ) -> Result<(), BlockSyncError> {
×
UNCOV
231
        info!(target: LOG_TARGET, "Starting block sync from peer {}", sync_peer.node_id());
×
232

UNCOV
233
        let tip_header = self.db.fetch_last_header().await?;
×
UNCOV
234
        let local_metadata = self.db.get_chain_metadata().await?;
×
235

UNCOV
236
        if tip_header.height <= local_metadata.best_block_height() {
×
237
            debug!(
×
238
                target: LOG_TARGET,
×
239
                "Blocks already synchronized to height {}.", tip_header.height
×
240
            );
241
            return Ok(());
×
UNCOV
242
        }
×
243

UNCOV
244
        let tip_hash = tip_header.hash();
×
UNCOV
245
        let tip_height = tip_header.height;
×
UNCOV
246
        let best_height = local_metadata.best_block_height();
×
UNCOV
247
        let chain_header = self.db.fetch_chain_header(best_height).await?;
×
248

UNCOV
249
        let best_full_block_hash = chain_header.accumulated_data().hash;
×
UNCOV
250
        debug!(
×
251
            target: LOG_TARGET,
×
252
            "Starting block sync from peer `{}`. Current best block is #{} `{}`. Syncing to #{} ({}).",
×
253
            sync_peer,
254
            best_height,
255
            best_full_block_hash.to_hex(),
×
256
            tip_height,
257
            tip_hash.to_hex()
×
258
        );
UNCOV
259
        let request = SyncBlocksRequest {
×
UNCOV
260
            start_hash: best_full_block_hash.to_vec(),
×
UNCOV
261
            // To the tip!
×
UNCOV
262
            end_hash: tip_hash.to_vec(),
×
UNCOV
263
        };
×
264

UNCOV
265
        let mut block_stream = client.sync_blocks(request).await?;
×
UNCOV
266
        let mut prev_hash = best_full_block_hash;
×
UNCOV
267
        let mut current_block = None;
×
UNCOV
268
        let mut last_sync_timer = Instant::now();
×
UNCOV
269
        let mut avg_latency = RollingAverageTime::new(20);
×
UNCOV
270
        while let Some(block_result) = block_stream.next().await {
×
UNCOV
271
            let latency = last_sync_timer.elapsed();
×
UNCOV
272
            avg_latency.add_sample(latency);
×
UNCOV
273
            let block_body_response = block_result?;
×
274

UNCOV
275
            let header = self
×
UNCOV
276
                .db
×
UNCOV
277
                .fetch_chain_header_by_block_hash(block_body_response.hash.clone().try_into()?)
×
UNCOV
278
                .await?
×
UNCOV
279
                .ok_or_else(|| {
×
280
                    BlockSyncError::UnknownHeaderHash(format!(
×
281
                        "Peer sent hash ({}) for block header we do not have",
×
282
                        block_body_response.hash.to_hex()
×
283
                    ))
×
284
                })?;
×
285

UNCOV
286
            let current_height = header.height();
×
UNCOV
287
            let header_hash = *header.hash();
×
UNCOV
288
            let timestamp = header.timestamp();
×
289

UNCOV
290
            if header.header().prev_hash != prev_hash {
×
291
                return Err(BlockSyncError::BlockWithoutParent {
×
292
                    expected: prev_hash.to_hex(),
×
293
                    got: header.header().prev_hash.to_hex(),
×
294
                });
×
UNCOV
295
            }
×
296

UNCOV
297
            prev_hash = header_hash;
×
298

UNCOV
299
            let body = block_body_response
×
UNCOV
300
                .body
×
UNCOV
301
                .map(AggregateBody::try_from)
×
UNCOV
302
                .ok_or_else(|| BlockSyncError::InvalidBlockBody("Peer sent empty block".to_string()))?
×
UNCOV
303
                .map_err(BlockSyncError::InvalidBlockBody)?;
×
304

UNCOV
305
            debug!(
×
306
                target: LOG_TARGET,
×
307
                "Validating block body #{} (PoW = {}, {}, latency: {:.2?})",
×
308
                current_height,
309
                header.header().pow_algo(),
×
310
                body.to_counts_string(),
×
311
                latency
312
            );
313

UNCOV
314
            let timer = Instant::now();
×
UNCOV
315
            let (header, header_accum_data) = header.into_parts();
×
UNCOV
316
            let block = Block::new(header, body);
×
317

318
            // Validate the block inside a tokio task
UNCOV
319
            let task_block = block.clone();
×
UNCOV
320
            let db = self.db.inner().clone();
×
UNCOV
321
            let validator = self.block_validator.clone();
×
UNCOV
322
            let res = {
×
UNCOV
323
                let txn = db.db_read_access()?;
×
UNCOV
324
                validator.validate_body(&*txn, &task_block)
×
325
            };
326

UNCOV
327
            let block = match res {
×
UNCOV
328
                Ok(block) => block,
×
329
                Err(err @ ValidationError::BadBlockFound { .. }) | Err(err @ ValidationError::FatalStorageError(_)) => {
×
330
                    return Err(err.into());
×
331
                },
332
                Err(err) => {
×
333
                    // Add to bad blocks
334
                    if let Err(err) = self
×
335
                        .db
×
336
                        .write_transaction()
×
337
                        .delete_orphan(header_hash)
×
338
                        .insert_bad_block(header_hash, current_height, err.to_string())
×
339
                        .commit()
×
340
                        .await
×
341
                    {
342
                        error!(target: LOG_TARGET, "Failed to insert bad block: {err}");
×
343
                    }
×
344
                    return Err(err.into());
×
345
                },
346
            };
347

UNCOV
348
            let block = ChainBlock::try_construct(Arc::new(block), header_accum_data)
×
UNCOV
349
                .map(Arc::new)
×
UNCOV
350
                .ok_or(BlockSyncError::FailedToConstructChainBlock)?;
×
351

UNCOV
352
            debug!(
×
353
                target: LOG_TARGET,
×
354
                "Validated in {:.0?}. Storing block body #{} (PoW = {}, {})",
×
355
                timer.elapsed(),
×
356
                block.header().height,
×
357
                block.header().pow_algo(),
×
358
                block.block().body.to_counts_string(),
×
359
            );
UNCOV
360
            trace!(
×
361
                target: LOG_TARGET,
×
362
                "{block}"
×
363
            );
364

UNCOV
365
            self.db
×
UNCOV
366
                .write_transaction()
×
UNCOV
367
                .delete_orphan(header_hash)
×
UNCOV
368
                .insert_tip_block_body(block.clone())
×
UNCOV
369
                .set_best_block(
×
UNCOV
370
                    block.height(),
×
UNCOV
371
                    header_hash,
×
UNCOV
372
                    block.accumulated_data().total_accumulated_difficulty,
×
UNCOV
373
                    block.header().prev_hash,
×
UNCOV
374
                    timestamp,
×
UNCOV
375
                )
×
UNCOV
376
                .commit()
×
UNCOV
377
                .await?;
×
378

379
            // Average time between receiving blocks from the peer - used to detect a slow sync peer
UNCOV
380
            let last_avg_latency = avg_latency.calculate_average_with_min_samples(5);
×
UNCOV
381
            if let Some(latency) = last_avg_latency {
×
382
                sync_peer.set_latency(latency);
×
UNCOV
383
            }
×
384
            // Includes time to add block to database, used to show blocks/s on status line
UNCOV
385
            sync_peer.add_sample(last_sync_timer.elapsed());
×
UNCOV
386
            self.hooks
×
UNCOV
387
                .call_on_progress_block_hooks(block.clone(), tip_height, &sync_peer);
×
388

UNCOV
389
            if let Some(avg_latency) = last_avg_latency {
×
390
                if avg_latency > max_latency {
×
391
                    return Err(BlockSyncError::MaxLatencyExceeded {
×
392
                        peer: sync_peer.node_id().clone(),
×
393
                        latency: avg_latency,
×
394
                        max_latency,
×
395
                    });
×
396
                }
×
UNCOV
397
            }
×
398

UNCOV
399
            current_block = Some(block);
×
UNCOV
400
            last_sync_timer = Instant::now();
×
401
        }
UNCOV
402
        debug!(
×
403
            "Sync peer claim at start  - height: {}, accumulated difficulty: {}",
×
404
            sync_peer.claimed_chain_metadata().best_block_height(),
×
405
            sync_peer.claimed_chain_metadata().accumulated_difficulty(),
×
406
        );
UNCOV
407
        debug!(
×
408
            "Our best header at start  - height: {}, accumulated difficulty: {}",
×
409
            best_height,
410
            chain_header.accumulated_data().total_accumulated_difficulty,
×
411
        );
UNCOV
412
        let metadata_after_sync = self.db.get_chain_metadata().await?;
×
UNCOV
413
        debug!(
×
414
            "Our best block after sync - height: {}, accumulated difficulty: {}",
×
415
            metadata_after_sync.best_block_height(),
×
416
            metadata_after_sync.accumulated_difficulty(),
×
417
        );
418

UNCOV
419
        if metadata_after_sync.accumulated_difficulty() < sync_peer.claimed_chain_metadata().accumulated_difficulty() {
×
420
            return Err(BlockSyncError::PeerDidNotSupplyAllClaimedBlocks(format!(
×
421
                "Their claim - height: {}, accumulated difficulty: {}. Our status after block sync - height: {}, \
×
422
                 accumulated difficulty: {}",
×
423
                sync_peer.claimed_chain_metadata().best_block_height(),
×
424
                sync_peer.claimed_chain_metadata().accumulated_difficulty(),
×
425
                metadata_after_sync.best_block_height(),
×
426
                metadata_after_sync.accumulated_difficulty(),
×
427
            )));
×
UNCOV
428
        }
×
429

UNCOV
430
        if let Some(block) = current_block {
×
UNCOV
431
            self.hooks.call_on_complete_hooks(block, best_height);
×
UNCOV
432
        }
×
433

UNCOV
434
        debug!(target: LOG_TARGET, "Completed block sync with peer `{sync_peer}`");
×
435

UNCOV
436
        Ok(())
×
UNCOV
437
    }
×
438

439
    // Sync peers are also removed from the list of sync peers if the ban duration is longer than the short ban period.
440
    fn remove_sync_peer(&mut self, node_id: &NodeId) {
×
441
        if let Some(pos) = self.sync_peers.iter().position(|p| p.node_id() == node_id) {
×
442
            self.sync_peers.remove(pos);
×
443
        }
×
444
    }
×
445

446
    // Helper function to get the index to the node_id inside of the vec of peers
UNCOV
447
    fn get_sync_peer_index(&mut self, node_id: &NodeId) -> Option<usize> {
×
UNCOV
448
        self.sync_peers.iter().position(|p| p.node_id() == node_id)
×
UNCOV
449
    }
×
450
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc