• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 24836861553

23 Apr 2026 01:04PM UTC coverage: 60.949% (-0.07%) from 61.023%
24836861553

push

github

SWvheerden
chore: new release v5.3.0-pre.12

70768 of 116111 relevant lines covered (60.95%)

224032.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.53
/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs
1
//  Copyright 2020, The Tari Project
2
//
3
//  Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
//  following conditions are met:
5
//
6
//  1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
//  disclaimer.
8
//
9
//  2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
//  following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
//  3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
//  products derived from this software without specific prior written permission.
14
//
15
//  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
//  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
//  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
//  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
//  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
//  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
//  USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
use std::{
24
    convert::{TryFrom, TryInto},
25
    sync::Arc,
26
    time::{Duration, Instant},
27
};
28

29
use futures::StreamExt;
30
use log::*;
31
use tari_comms::{PeerConnection, connectivity::ConnectivityRequester, peer_manager::NodeId, protocol::rpc::RpcClient};
32
use tari_node_components::blocks::{Block, ChainBlock};
33
use tari_transaction_components::{BanPeriod, aggregated_body::AggregateBody};
34
use tari_utilities::hex::Hex;
35

36
use super::error::BlockSyncError;
37
use crate::{
38
    base_node::{
39
        BlockchainSyncConfig,
40
        sync::{SyncPeer, ban::PeerBanManager, hooks::Hooks, rpc},
41
    },
42
    chain_storage::{BlockchainBackend, async_db::AsyncBlockchainDb},
43
    common::rolling_avg::RollingAverageTime,
44
    proto::base_node::SyncBlocksRequest,
45
    validation::{BlockBodyValidator, ValidationError},
46
};
47
const LOG_TARGET: &str = "c::bn::block_sync";
48

49
const MAX_LATENCY_INCREASES: usize = 5;
50

51
pub struct BlockSynchronizer<'a, B> {
52
    config: BlockchainSyncConfig,
53
    db: AsyncBlockchainDb<B>,
54
    connectivity: ConnectivityRequester,
55
    sync_peers: &'a mut Vec<SyncPeer>,
56
    block_validator: Arc<dyn BlockBodyValidator<B>>,
57
    hooks: Hooks,
58
    peer_ban_manager: PeerBanManager,
59
}
60

61
impl<'a, B: BlockchainBackend + 'static> BlockSynchronizer<'a, B> {
62
    pub fn new(
5✔
63
        config: BlockchainSyncConfig,
5✔
64
        db: AsyncBlockchainDb<B>,
5✔
65
        connectivity: ConnectivityRequester,
5✔
66
        sync_peers: &'a mut Vec<SyncPeer>,
5✔
67
        block_validator: Arc<dyn BlockBodyValidator<B>>,
5✔
68
    ) -> Self {
5✔
69
        let peer_ban_manager = PeerBanManager::new(config.clone(), connectivity.clone());
5✔
70
        Self {
5✔
71
            config,
5✔
72
            db,
5✔
73
            connectivity,
5✔
74
            sync_peers,
5✔
75
            block_validator,
5✔
76
            hooks: Default::default(),
5✔
77
            peer_ban_manager,
5✔
78
        }
5✔
79
    }
5✔
80

81
    pub fn on_starting<H>(&mut self, hook: H)
5✔
82
    where for<'r> H: FnOnce(&SyncPeer) + Send + Sync + 'static {
5✔
83
        self.hooks.add_on_starting_hook(hook);
5✔
84
    }
5✔
85

86
    pub fn on_progress<H>(&mut self, hook: H)
5✔
87
    where H: Fn(Arc<ChainBlock>, u64, &SyncPeer) + Send + Sync + 'static {
5✔
88
        self.hooks.add_on_progress_block_hook(hook);
5✔
89
    }
5✔
90

91
    pub fn on_complete<H>(&mut self, hook: H)
5✔
92
    where H: Fn(Arc<ChainBlock>, u64) + Send + Sync + 'static {
5✔
93
        self.hooks.add_on_complete_hook(hook);
5✔
94
    }
5✔
95

96
    pub async fn synchronize(&mut self) -> Result<(), BlockSyncError> {
5✔
97
        // Protect every sync-candidate peer from being collaterally disconnected by unrelated
98
        // subsystems (e.g. DhtConnectivity's random-pool-full pruning) while this sync runs.
99
        // `add_peer_to_sync_list` returns an `Arc<NodeId>` handle; holding the handles alive for
100
        // the duration of sync keeps each peer on the connectivity manager's sync list. When
101
        // this function returns and the handles drop, the manager's next sweep prunes the
102
        // entries and the peers become eligible for normal disconnect behaviour again.
103
        let mut _sync_guards: Vec<Arc<NodeId>> = Vec::with_capacity(self.sync_peers.len());
5✔
104
        for peer in self.sync_peers.iter() {
5✔
105
            match self.connectivity.add_peer_to_sync_list(peer.node_id().clone()).await {
5✔
106
                Ok(handle) => _sync_guards.push(handle),
5✔
107
                Err(e) => debug!(
×
108
                    target: LOG_TARGET,
×
109
                    "Failed to register sync peer {} on sync list: {e}", peer.node_id()
×
110
                ),
111
            }
112
        }
113

114
        self.synchronize_inner().await
5✔
115
        // `_sync_guards` drops here (success or error), releasing this sync's references on
116
        // each peer's sync-list entry.
117
    }
5✔
118

119
    async fn synchronize_inner(&mut self) -> Result<(), BlockSyncError> {
5✔
120
        let mut max_latency = self.config.initial_max_sync_latency;
5✔
121
        let mut sync_round = 0;
5✔
122
        let mut latency_increases_counter = 0;
5✔
123
        loop {
124
            match self.attempt_block_sync(max_latency).await {
5✔
125
                Ok(_) => return Ok(()),
3✔
126
                Err(err @ BlockSyncError::AllSyncPeersExceedLatency) => {
×
127
                    warn!(target: LOG_TARGET, "{err}");
×
128
                    max_latency += self.config.max_latency_increase;
×
129
                    warn!(
×
130
                        target: LOG_TARGET,
×
131
                        "Retrying block sync with increased max latency {:.2?} with {} sync peers",
132
                        max_latency,
133
                        self.sync_peers.len()
×
134
                    );
135
                    latency_increases_counter += 1;
×
136
                    if latency_increases_counter > MAX_LATENCY_INCREASES {
×
137
                        return Err(err);
×
138
                    }
×
139
                    // Prohibit using a few slow sync peers only, rather get new sync peers assigned
140
                    if self.sync_peers.len() < 2 {
×
141
                        return Err(err);
×
142
                    } else {
143
                        continue;
×
144
                    }
145
                },
146
                Err(err @ BlockSyncError::SyncRoundFailed) => {
×
147
                    sync_round += 1;
×
148
                    warn!(target: LOG_TARGET, "{err} ({sync_round})");
×
149
                    continue;
×
150
                },
151
                Err(err) => {
2✔
152
                    return Err(err);
2✔
153
                },
154
            }
155
        }
156
    }
5✔
157

158
    #[allow(clippy::too_many_lines)]
159
    async fn attempt_block_sync(&mut self, max_latency: Duration) -> Result<(), BlockSyncError> {
5✔
160
        let sync_peer_node_ids = self.sync_peers.iter().map(|p| p.node_id()).cloned().collect::<Vec<_>>();
5✔
161
        info!(
5✔
162
            target: LOG_TARGET,
×
163
            "Attempting to sync blocks({} sync peers)",
164
            sync_peer_node_ids.len()
×
165
        );
166
        let mut latency_counter = 0usize;
5✔
167
        for node_id in sync_peer_node_ids {
5✔
168
            let peer_index = self.get_sync_peer_index(&node_id).ok_or(BlockSyncError::PeerNotFound)?;
5✔
169
            let sync_peer = self.sync_peers.get(peer_index).expect("Already checked");
5✔
170
            self.hooks.call_on_starting_hook(sync_peer);
5✔
171
            let mut conn = match self.connect_to_sync_peer(node_id.clone()).await {
5✔
172
                Ok(val) => val,
5✔
173
                Err(e) => {
×
174
                    warn!(
×
175
                        target: LOG_TARGET,
×
176
                        "Failed to connect to sync peer `{node_id}`: {e}"
177
                    );
178
                    self.remove_sync_peer(&node_id);
×
179
                    continue;
×
180
                },
181
            };
182
            // Defensive: the connection may have been torn down by another subsystem between
183
            // dial_peer() returning and this point (e.g. DhtConnectivity pruning). Without this
184
            // check we would attempt an RPC negotiation on a dead channel.
185
            if !conn.is_connected() {
5✔
186
                warn!(
×
187
                    target: LOG_TARGET,
×
188
                    "Sync peer `{node_id}` was disconnected before RPC negotiation could begin"
189
                );
190
                self.remove_sync_peer(&node_id);
×
191
                continue;
×
192
            }
5✔
193
            let config = RpcClient::builder()
5✔
194
                .with_deadline(self.config.rpc_deadline)
5✔
195
                .with_deadline_grace_period(Duration::from_secs(5));
5✔
196
            // Bound RPC negotiation: without this the negotiation itself (as opposed to
197
            // individual RPC requests) has no timeout and can wedge the sync loop indefinitely.
198
            let mut client = match tokio::time::timeout(
5✔
199
                self.config.rpc_deadline,
5✔
200
                conn.connect_rpc_using_builder::<rpc::BaseNodeSyncRpcClient>(config),
5✔
201
            )
202
            .await
5✔
203
            {
204
                Ok(Ok(val)) => val,
5✔
205
                Ok(Err(e)) => {
×
206
                    warn!(
×
207
                        target: LOG_TARGET,
×
208
                        "Failed to obtain RPC connection from sync peer `{node_id}`: {e}"
209
                    );
210
                    self.remove_sync_peer(&node_id);
×
211
                    continue;
×
212
                },
213
                Err(_) => {
214
                    warn!(
×
215
                        target: LOG_TARGET,
×
216
                        "Timed out establishing RPC connection with sync peer `{node_id}` after {:.2?}",
217
                        self.config.rpc_deadline,
218
                    );
219
                    self.remove_sync_peer(&node_id);
×
220
                    continue;
×
221
                },
222
            };
223
            let latency = client
5✔
224
                .get_last_request_latency()
5✔
225
                .expect("unreachable panic: last request latency must be set after connect");
5✔
226
            self.sync_peers
5✔
227
                .get_mut(peer_index)
5✔
228
                .expect("Already checked")
5✔
229
                .set_latency(latency);
5✔
230
            let sync_peer = self.sync_peers.get(peer_index).expect("Already checked").clone();
5✔
231
            info!(
5✔
232
                target: LOG_TARGET,
×
233
                "Attempting to synchronize blocks with `{node_id}` latency: {latency:.2?}"
234
            );
235
            match self.synchronize_blocks(sync_peer, client, max_latency).await {
5✔
236
                Ok(_) => return Ok(()),
3✔
237
                Err(err) => {
2✔
238
                    warn!(target: LOG_TARGET, "{err}");
2✔
239
                    let ban_reason = BlockSyncError::get_ban_reason(&err);
2✔
240
                    if let Some(reason) = ban_reason {
2✔
241
                        let duration = match reason.ban_duration {
2✔
242
                            BanPeriod::Short => self.config.short_ban_period,
2✔
243
                            BanPeriod::Long => self.config.ban_period,
×
244
                        };
245
                        self.peer_ban_manager
2✔
246
                            .ban_peer_if_required(&node_id, reason.reason, duration)
2✔
247
                            .await;
2✔
248
                    }
×
249
                    if let BlockSyncError::MaxLatencyExceeded { .. } = err {
2✔
250
                        latency_counter += 1;
×
251
                    } else {
2✔
252
                        self.remove_sync_peer(&node_id);
2✔
253
                    }
2✔
254
                },
255
            }
256
        }
257

258
        if self.sync_peers.is_empty() {
2✔
259
            Err(BlockSyncError::NoMoreSyncPeers("Block sync failed".to_string()))
2✔
260
        } else if latency_counter >= self.sync_peers.len() {
×
261
            Err(BlockSyncError::AllSyncPeersExceedLatency)
×
262
        } else {
263
            Err(BlockSyncError::SyncRoundFailed)
×
264
        }
265
    }
5✔
266

267
    async fn connect_to_sync_peer(&self, peer: NodeId) -> Result<PeerConnection, BlockSyncError> {
5✔
268
        let connection = self.connectivity.dial_peer(peer).await?;
5✔
269
        Ok(connection)
5✔
270
    }
5✔
271

272
    #[allow(clippy::too_many_lines)]
273
    async fn synchronize_blocks(
5✔
274
        &mut self,
5✔
275
        mut sync_peer: SyncPeer,
5✔
276
        mut client: rpc::BaseNodeSyncRpcClient,
5✔
277
        max_latency: Duration,
5✔
278
    ) -> Result<(), BlockSyncError> {
5✔
279
        info!(target: LOG_TARGET, "Starting block sync from peer {}", sync_peer.node_id());
5✔
280

281
        let tip_header = self.db.fetch_last_header().await?;
5✔
282
        let local_metadata = self.db.get_chain_metadata().await?;
5✔
283

284
        if tip_header.height <= local_metadata.best_block_height() {
5✔
285
            debug!(
1✔
286
                target: LOG_TARGET,
×
287
                "Blocks already synchronized to height {}.", tip_header.height
288
            );
289
            return Ok(());
1✔
290
        }
4✔
291

292
        let tip_hash = tip_header.hash();
4✔
293
        let tip_height = tip_header.height;
4✔
294
        let best_height = local_metadata.best_block_height();
4✔
295
        let chain_header = self.db.fetch_chain_header(best_height).await?;
4✔
296

297
        let best_full_block_hash = chain_header.accumulated_data().hash;
4✔
298
        debug!(
4✔
299
            target: LOG_TARGET,
×
300
            "Starting block sync from peer `{}`. Current best block is #{} `{}`. Syncing to #{} ({}).",
301
            sync_peer,
302
            best_height,
303
            best_full_block_hash.to_hex(),
×
304
            tip_height,
305
            tip_hash.to_hex()
×
306
        );
307
        let request = SyncBlocksRequest {
4✔
308
            start_hash: best_full_block_hash.to_vec(),
4✔
309
            // To the tip!
4✔
310
            end_hash: tip_hash.to_vec(),
4✔
311
        };
4✔
312

313
        let mut block_stream = client.sync_blocks(request).await?;
4✔
314
        let mut prev_hash = best_full_block_hash;
4✔
315
        let mut current_block = None;
4✔
316
        let mut last_sync_timer = Instant::now();
4✔
317
        let mut avg_latency = RollingAverageTime::new(20);
4✔
318
        while let Some(block_result) = block_stream.next().await {
19✔
319
            let latency = last_sync_timer.elapsed();
17✔
320
            avg_latency.add_sample(latency);
17✔
321
            let block_body_response = block_result?;
17✔
322

323
            let header = self
15✔
324
                .db
15✔
325
                .fetch_chain_header_by_block_hash(block_body_response.hash.clone().try_into()?)
15✔
326
                .await?
15✔
327
                .ok_or_else(|| {
15✔
328
                    BlockSyncError::UnknownHeaderHash(format!(
×
329
                        "Peer sent hash ({}) for block header we do not have",
×
330
                        block_body_response.hash.to_hex()
×
331
                    ))
×
332
                })?;
×
333

334
            let current_height = header.height();
15✔
335
            let header_hash = *header.hash();
15✔
336
            let timestamp = header.timestamp();
15✔
337

338
            if header.header().prev_hash != prev_hash {
15✔
339
                return Err(BlockSyncError::BlockWithoutParent {
×
340
                    expected: prev_hash.to_hex(),
×
341
                    got: header.header().prev_hash.to_hex(),
×
342
                });
×
343
            }
15✔
344

345
            prev_hash = header_hash;
15✔
346

347
            let body = block_body_response
15✔
348
                .body
15✔
349
                .map(AggregateBody::try_from)
15✔
350
                .ok_or_else(|| BlockSyncError::InvalidBlockBody("Peer sent empty block".to_string()))?
15✔
351
                .map_err(BlockSyncError::InvalidBlockBody)?;
15✔
352

353
            debug!(
15✔
354
                target: LOG_TARGET,
×
355
                "Validating block body #{} (PoW = {}, {}, latency: {:.2?})",
356
                current_height,
357
                header.header().pow_algo(),
×
358
                body.to_counts_string(),
×
359
                latency
360
            );
361

362
            let timer = Instant::now();
15✔
363
            let (header, header_accum_data) = header.into_parts();
15✔
364
            let block = Block::new(header, body);
15✔
365

366
            // Validate the block inside a tokio task
367
            let task_block = block.clone();
15✔
368
            let db = self.db.inner().clone();
15✔
369
            let validator = self.block_validator.clone();
15✔
370
            let res = {
15✔
371
                let txn = db.db_read_access()?;
15✔
372
                validator.validate_body(&*txn, &task_block)
15✔
373
            };
374

375
            let block = match res {
15✔
376
                Ok(block) => block,
15✔
377
                Err(err @ ValidationError::BadBlockFound { .. }) | Err(err @ ValidationError::FatalStorageError(_)) => {
×
378
                    return Err(err.into());
×
379
                },
380
                Err(err) => {
×
381
                    // Add to bad blocks
382
                    if let Err(err) = self
×
383
                        .db
×
384
                        .write_transaction()
×
385
                        .delete_orphan(header_hash)
×
386
                        .insert_bad_block(header_hash, current_height, err.to_string())
×
387
                        .commit()
×
388
                        .await
×
389
                    {
390
                        error!(target: LOG_TARGET, "Failed to insert bad block: {err}");
×
391
                    }
×
392
                    return Err(err.into());
×
393
                },
394
            };
395

396
            let block = ChainBlock::try_construct(Arc::new(block), header_accum_data)
15✔
397
                .map(Arc::new)
15✔
398
                .ok_or(BlockSyncError::FailedToConstructChainBlock)?;
15✔
399

400
            debug!(
15✔
401
                target: LOG_TARGET,
×
402
                "Validated in {:.0?}. Storing block body #{} (PoW = {}, {})",
403
                timer.elapsed(),
×
404
                block.header().height,
×
405
                block.header().pow_algo(),
×
406
                block.block().body.to_counts_string(),
×
407
            );
408
            trace!(
15✔
409
                target: LOG_TARGET,
×
410
                "{block}"
411
            );
412

413
            self.db
15✔
414
                .write_transaction()
15✔
415
                .delete_orphan(header_hash)
15✔
416
                .insert_tip_block_body(block.clone())
15✔
417
                .set_best_block(
15✔
418
                    block.height(),
15✔
419
                    header_hash,
15✔
420
                    block.accumulated_data().total_accumulated_difficulty,
15✔
421
                    block.header().prev_hash,
15✔
422
                    timestamp,
15✔
423
                )
15✔
424
                .commit()
15✔
425
                .await?;
15✔
426

427
            // Average time between receiving blocks from the peer - used to detect a slow sync peer
428
            let last_avg_latency = avg_latency.calculate_average_with_min_samples(5);
15✔
429
            if let Some(latency) = last_avg_latency {
15✔
430
                sync_peer.set_latency(latency);
7✔
431
            }
8✔
432
            // Includes time to add block to database, used to show blocks/s on status line
433
            sync_peer.add_sample(last_sync_timer.elapsed());
15✔
434
            self.hooks
15✔
435
                .call_on_progress_block_hooks(block.clone(), tip_height, &sync_peer);
15✔
436

437
            if let Some(avg_latency) = last_avg_latency &&
15✔
438
                avg_latency > max_latency
7✔
439
            {
440
                return Err(BlockSyncError::MaxLatencyExceeded {
×
441
                    peer: sync_peer.node_id().clone(),
×
442
                    latency: avg_latency,
×
443
                    max_latency,
×
444
                });
×
445
            }
15✔
446

447
            current_block = Some(block);
15✔
448
            last_sync_timer = Instant::now();
15✔
449
        }
450
        debug!(
2✔
451
            "Sync peer claim at start  - height: {}, accumulated difficulty: {}",
452
            sync_peer.claimed_chain_metadata().best_block_height(),
×
453
            sync_peer.claimed_chain_metadata().accumulated_difficulty(),
×
454
        );
455
        debug!(
2✔
456
            "Our best header at start  - height: {}, accumulated difficulty: {}",
457
            best_height,
458
            chain_header.accumulated_data().total_accumulated_difficulty,
×
459
        );
460
        let metadata_after_sync = self.db.get_chain_metadata().await?;
2✔
461
        debug!(
2✔
462
            "Our best block after sync - height: {}, accumulated difficulty: {}",
463
            metadata_after_sync.best_block_height(),
×
464
            metadata_after_sync.accumulated_difficulty(),
×
465
        );
466

467
        if metadata_after_sync.accumulated_difficulty() < sync_peer.claimed_chain_metadata().accumulated_difficulty() {
2✔
468
            return Err(BlockSyncError::PeerDidNotSupplyAllClaimedBlocks(format!(
×
469
                "Their claim - height: {}, accumulated difficulty: {}. Our status after block sync - height: {}, \
×
470
                 accumulated difficulty: {}",
×
471
                sync_peer.claimed_chain_metadata().best_block_height(),
×
472
                sync_peer.claimed_chain_metadata().accumulated_difficulty(),
×
473
                metadata_after_sync.best_block_height(),
×
474
                metadata_after_sync.accumulated_difficulty(),
×
475
            )));
×
476
        }
2✔
477

478
        if let Some(block) = current_block {
2✔
479
            self.hooks.call_on_complete_hooks(block, best_height);
2✔
480
        }
2✔
481

482
        debug!(target: LOG_TARGET, "Completed block sync with peer `{sync_peer}`");
2✔
483

484
        Ok(())
2✔
485
    }
5✔
486

487
    // Sync peers are also removed from the list of sync peers if the ban duration is longer than the short ban period.
488
    fn remove_sync_peer(&mut self, node_id: &NodeId) {
2✔
489
        if let Some(pos) = self.sync_peers.iter().position(|p| p.node_id() == node_id) {
2✔
490
            self.sync_peers.remove(pos);
2✔
491
        }
2✔
492
    }
2✔
493

494
    // Helper function to get the index to the node_id inside of the vec of peers
495
    fn get_sync_peer_index(&mut self, node_id: &NodeId) -> Option<usize> {
5✔
496
        self.sync_peers.iter().position(|p| p.node_id() == node_id)
5✔
497
    }
5✔
498
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc