• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 13947560149

19 Mar 2025 01:20PM UTC coverage: 73.397% (-0.2%) from 73.603%
13947560149

push

github

web-flow
feat: add num connections to network state (#6884)

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- The network state now reports the current number of active peer
connections, giving users improved visibility into connectivity.
- A new field for the number of connections has been added to the
network state response.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

83004 of 113089 relevant lines covered (73.4%)

258041.01 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.8
/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs
1
//  Copyright 2020, The Tari Project
2
//
3
//  Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
//  following conditions are met:
5
//
6
//  1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
//  disclaimer.
8
//
9
//  2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
//  following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
//  3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
//  products derived from this software without specific prior written permission.
14
//
15
//  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
//  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
//  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
//  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
//  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
//  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
//  USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
use std::{
24
    convert::{TryFrom, TryInto},
25
    sync::{atomic::AtomicBool, Arc},
26
    time::{Duration, Instant},
27
};
28

29
use futures::StreamExt;
30
use log::*;
31
use tari_comms::{connectivity::ConnectivityRequester, peer_manager::NodeId, protocol::rpc::RpcClient, PeerConnection};
32
use tari_utilities::hex::Hex;
33
use tokio::task;
34

35
use super::error::BlockSyncError;
36
use crate::{
37
    base_node::{
38
        sync::{ban::PeerBanManager, hooks::Hooks, rpc, SyncPeer},
39
        BlockchainSyncConfig,
40
    },
41
    blocks::{Block, ChainBlock},
42
    chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend},
43
    common::{rolling_avg::RollingAverageTime, BanPeriod},
44
    proto::base_node::SyncBlocksRequest,
45
    transactions::aggregated_body::AggregateBody,
46
    validation::{BlockBodyValidator, ValidationError},
47
};
48

49
const LOG_TARGET: &str = "c::bn::block_sync";
50

51
const MAX_LATENCY_INCREASES: usize = 5;
52

53
pub struct BlockSynchronizer<'a, B> {
54
    config: BlockchainSyncConfig,
55
    db: AsyncBlockchainDb<B>,
56
    connectivity: ConnectivityRequester,
57
    sync_peers: &'a mut Vec<SyncPeer>,
58
    block_validator: Arc<dyn BlockBodyValidator<B>>,
59
    hooks: Hooks,
60
    peer_ban_manager: PeerBanManager,
61
}
62

63
impl<'a, B: BlockchainBackend + 'static> BlockSynchronizer<'a, B> {
64
    pub fn new(
8✔
65
        config: BlockchainSyncConfig,
8✔
66
        db: AsyncBlockchainDb<B>,
8✔
67
        connectivity: ConnectivityRequester,
8✔
68
        sync_peers: &'a mut Vec<SyncPeer>,
8✔
69
        block_validator: Arc<dyn BlockBodyValidator<B>>,
8✔
70
    ) -> Self {
8✔
71
        let peer_ban_manager = PeerBanManager::new(config.clone(), connectivity.clone());
8✔
72
        Self {
8✔
73
            config,
8✔
74
            db,
8✔
75
            connectivity,
8✔
76
            sync_peers,
8✔
77
            block_validator,
8✔
78
            hooks: Default::default(),
8✔
79
            peer_ban_manager,
8✔
80
        }
8✔
81
    }
8✔
82

83
    pub fn on_starting<H>(&mut self, hook: H)
8✔
84
    where for<'r> H: FnOnce(&SyncPeer) + Send + Sync + 'static {
8✔
85
        self.hooks.add_on_starting_hook(hook);
8✔
86
    }
8✔
87

88
    pub fn on_progress<H>(&mut self, hook: H)
8✔
89
    where H: Fn(Arc<ChainBlock>, u64, &SyncPeer) + Send + Sync + 'static {
8✔
90
        self.hooks.add_on_progress_block_hook(hook);
8✔
91
    }
8✔
92

93
    pub fn on_complete<H>(&mut self, hook: H)
8✔
94
    where H: Fn(Arc<ChainBlock>, u64) + Send + Sync + 'static {
8✔
95
        self.hooks.add_on_complete_hook(hook);
8✔
96
    }
8✔
97

98
    pub async fn synchronize(&mut self) -> Result<(), BlockSyncError> {
8✔
99
        let mut max_latency = self.config.initial_max_sync_latency;
8✔
100
        let mut sync_round = 0;
8✔
101
        let mut latency_increases_counter = 0;
8✔
102
        loop {
103
            match self.attempt_block_sync(max_latency).await {
132✔
104
                Ok(_) => return Ok(()),
6✔
105
                Err(err @ BlockSyncError::AllSyncPeersExceedLatency) => {
×
106
                    warn!(target: LOG_TARGET, "{}", err);
×
107
                    max_latency += self.config.max_latency_increase;
×
108
                    warn!(
×
109
                        target: LOG_TARGET,
×
110
                        "Retrying block sync with increased max latency {:.2?} with {} sync peers",
×
111
                        max_latency,
×
112
                        self.sync_peers.len()
×
113
                    );
114
                    latency_increases_counter += 1;
×
115
                    if latency_increases_counter > MAX_LATENCY_INCREASES {
×
116
                        return Err(err);
×
117
                    }
×
118
                    // Prohibit using a few slow sync peers only, rather get new sync peers assigned
×
119
                    if self.sync_peers.len() < 2 {
×
120
                        return Err(err);
×
121
                    } else {
122
                        continue;
×
123
                    }
124
                },
125
                Err(err @ BlockSyncError::SyncRoundFailed) => {
×
126
                    sync_round += 1;
×
127
                    warn!(target: LOG_TARGET, "{} ({})", err, sync_round);
×
128
                    continue;
×
129
                },
130
                Err(err) => {
2✔
131
                    return Err(err);
2✔
132
                },
133
            }
134
        }
135
    }
8✔
136

137
    async fn attempt_block_sync(&mut self, max_latency: Duration) -> Result<(), BlockSyncError> {
8✔
138
        let sync_peer_node_ids = self.sync_peers.iter().map(|p| p.node_id()).cloned().collect::<Vec<_>>();
8✔
139
        info!(
8✔
140
            target: LOG_TARGET,
×
141
            "Attempting to sync blocks({} sync peers)",
×
142
            sync_peer_node_ids.len()
×
143
        );
144
        let mut latency_counter = 0usize;
8✔
145
        for node_id in sync_peer_node_ids {
10✔
146
            let peer_index = self.get_sync_peer_index(&node_id).ok_or(BlockSyncError::PeerNotFound)?;
8✔
147
            let sync_peer = &self.sync_peers[peer_index];
8✔
148
            self.hooks.call_on_starting_hook(sync_peer);
8✔
149
            let mut conn = match self.connect_to_sync_peer(node_id.clone()).await {
8✔
150
                Ok(val) => val,
8✔
151
                Err(e) => {
×
152
                    warn!(
×
153
                        target: LOG_TARGET,
×
154
                        "Failed to connect to sync peer `{}`: {}", node_id, e
×
155
                    );
156
                    self.remove_sync_peer(&node_id);
×
157
                    continue;
×
158
                },
159
            };
160
            let config = RpcClient::builder()
8✔
161
                .with_deadline(self.config.rpc_deadline)
8✔
162
                .with_deadline_grace_period(Duration::from_secs(5));
8✔
163
            let mut client = match conn
8✔
164
                .connect_rpc_using_builder::<rpc::BaseNodeSyncRpcClient>(config)
8✔
165
                .await
16✔
166
            {
167
                Ok(val) => val,
8✔
168
                Err(e) => {
×
169
                    warn!(
×
170
                        target: LOG_TARGET,
×
171
                        "Failed to obtain RPC connection from sync peer `{}`: {}", node_id, e
×
172
                    );
173
                    self.remove_sync_peer(&node_id);
×
174
                    continue;
×
175
                },
176
            };
177
            let latency = client
8✔
178
                .get_last_request_latency()
8✔
179
                .expect("unreachable panic: last request latency must be set after connect");
8✔
180
            self.sync_peers[peer_index].set_latency(latency);
8✔
181
            let sync_peer = self.sync_peers[peer_index].clone();
8✔
182
            info!(
8✔
183
                target: LOG_TARGET,
×
184
                "Attempting to synchronize blocks with `{}` latency: {:.2?}", node_id, latency
×
185
            );
186
            match self.synchronize_blocks(sync_peer, client, max_latency).await {
108✔
187
                Ok(_) => return Ok(()),
6✔
188
                Err(err) => {
2✔
189
                    warn!(target: LOG_TARGET, "{}", err);
2✔
190
                    let ban_reason = BlockSyncError::get_ban_reason(&err);
2✔
191
                    if let Some(reason) = ban_reason {
2✔
192
                        let duration = match reason.ban_duration {
2✔
193
                            BanPeriod::Short => self.config.short_ban_period,
2✔
194
                            BanPeriod::Long => self.config.ban_period,
×
195
                        };
196
                        self.peer_ban_manager
2✔
197
                            .ban_peer_if_required(&node_id, reason.reason, duration)
2✔
198
                            .await;
×
199
                    }
×
200
                    if let BlockSyncError::MaxLatencyExceeded { .. } = err {
2✔
201
                        latency_counter += 1;
×
202
                    } else {
2✔
203
                        self.remove_sync_peer(&node_id);
2✔
204
                    }
2✔
205
                },
206
            }
207
        }
208

209
        if self.sync_peers.is_empty() {
2✔
210
            Err(BlockSyncError::NoMoreSyncPeers("Block sync failed".to_string()))
2✔
211
        } else if latency_counter >= self.sync_peers.len() {
×
212
            Err(BlockSyncError::AllSyncPeersExceedLatency)
×
213
        } else {
214
            Err(BlockSyncError::SyncRoundFailed)
×
215
        }
216
    }
8✔
217

218
    async fn connect_to_sync_peer(&self, peer: NodeId) -> Result<PeerConnection, BlockSyncError> {
8✔
219
        let connection = self.connectivity.dial_peer(peer).await?;
8✔
220
        Ok(connection)
8✔
221
    }
8✔
222

223
    #[allow(clippy::too_many_lines)]
224
    async fn synchronize_blocks(
8✔
225
        &mut self,
8✔
226
        mut sync_peer: SyncPeer,
8✔
227
        mut client: rpc::BaseNodeSyncRpcClient,
8✔
228
        max_latency: Duration,
8✔
229
    ) -> Result<(), BlockSyncError> {
8✔
230
        info!(target: LOG_TARGET, "Starting block sync from peer {}", sync_peer.node_id());
8✔
231

232
        let tip_header = self.db.fetch_last_header().await?;
8✔
233
        let local_metadata = self.db.get_chain_metadata().await?;
8✔
234

235
        if tip_header.height <= local_metadata.best_block_height() {
8✔
236
            debug!(
1✔
237
                target: LOG_TARGET,
×
238
                "Blocks already synchronized to height {}.", tip_header.height
×
239
            );
240
            return Ok(());
1✔
241
        }
7✔
242

7✔
243
        let tip_hash = tip_header.hash();
7✔
244
        let tip_height = tip_header.height;
7✔
245
        let best_height = local_metadata.best_block_height();
7✔
246
        let chain_header = self.db.fetch_chain_header(best_height).await?;
7✔
247

248
        let best_full_block_hash = chain_header.accumulated_data().hash;
7✔
249
        debug!(
7✔
250
            target: LOG_TARGET,
×
251
            "Starting block sync from peer `{}`. Current best block is #{} `{}`. Syncing to #{} ({}).",
×
252
            sync_peer,
×
253
            best_height,
×
254
            best_full_block_hash.to_hex(),
×
255
            tip_height,
×
256
            tip_hash.to_hex()
×
257
        );
258
        let request = SyncBlocksRequest {
7✔
259
            start_hash: best_full_block_hash.to_vec(),
7✔
260
            // To the tip!
7✔
261
            end_hash: tip_hash.to_vec(),
7✔
262
        };
7✔
263

264
        let mut block_stream = client.sync_blocks(request).await?;
7✔
265
        let mut prev_hash = best_full_block_hash;
7✔
266
        let mut current_block = None;
7✔
267
        let mut last_sync_timer = Instant::now();
7✔
268
        let mut avg_latency = RollingAverageTime::new(20);
7✔
269
        while let Some(block_result) = block_stream.next().await {
29✔
270
            let latency = last_sync_timer.elapsed();
24✔
271
            avg_latency.add_sample(latency);
24✔
272
            let block_body_response = block_result?;
24✔
273

274
            let header = self
22✔
275
                .db
22✔
276
                .fetch_chain_header_by_block_hash(block_body_response.hash.clone().try_into()?)
22✔
277
                .await?
22✔
278
                .ok_or_else(|| {
22✔
279
                    BlockSyncError::UnknownHeaderHash(format!(
×
280
                        "Peer sent hash ({}) for block header we do not have",
×
281
                        block_body_response.hash.to_hex()
×
282
                    ))
×
283
                })?;
22✔
284

285
            let current_height = header.height();
22✔
286
            let header_hash = *header.hash();
22✔
287
            let timestamp = header.timestamp();
22✔
288

22✔
289
            if header.header().prev_hash != prev_hash {
22✔
290
                return Err(BlockSyncError::BlockWithoutParent {
×
291
                    expected: prev_hash.to_hex(),
×
292
                    got: header.header().prev_hash.to_hex(),
×
293
                });
×
294
            }
22✔
295

22✔
296
            prev_hash = header_hash;
22✔
297

298
            let body = block_body_response
22✔
299
                .body
22✔
300
                .map(AggregateBody::try_from)
22✔
301
                .ok_or_else(|| BlockSyncError::InvalidBlockBody("Peer sent empty block".to_string()))?
22✔
302
                .map_err(BlockSyncError::InvalidBlockBody)?;
22✔
303

304
            debug!(
22✔
305
                target: LOG_TARGET,
×
306
                "Validating block body #{} (PoW = {}, {}, latency: {:.2?})",
×
307
                current_height,
×
308
                header.header().pow_algo(),
×
309
                body.to_counts_string(),
×
310
                latency
311
            );
312

313
            let timer = Instant::now();
22✔
314
            let (header, header_accum_data) = header.into_parts();
22✔
315
            let block = Block::new(header, body);
22✔
316

22✔
317
            // Validate the block inside a tokio task
22✔
318
            let task_block = block.clone();
22✔
319
            let db = self.db.inner().clone();
22✔
320
            let validator = self.block_validator.clone();
22✔
321
            let res = task::spawn_blocking(move || {
22✔
322
                let txn = db.db_read_access()?;
22✔
323
                let smt = db.smt().clone();
22✔
324
                validator.validate_body(&*txn, &task_block, smt)
22✔
325
            })
22✔
326
            .await?;
22✔
327

328
            let block = match res {
22✔
329
                Ok(block) => block,
22✔
330
                Err(err @ ValidationError::BadBlockFound { .. }) | Err(err @ ValidationError::FatalStorageError(_)) => {
×
331
                    return Err(err.into());
×
332
                },
333
                Err(err) => {
×
334
                    // Add to bad blocks
335
                    if let Err(err) = self
×
336
                        .db
×
337
                        .write_transaction()
×
338
                        .delete_orphan(header_hash)
×
339
                        .insert_bad_block(header_hash, current_height, err.to_string())
×
340
                        .commit()
×
341
                        .await
×
342
                    {
343
                        error!(target: LOG_TARGET, "Failed to insert bad block: {}", err);
×
344
                    }
×
345
                    return Err(err.into());
×
346
                },
347
            };
348

349
            let block = ChainBlock::try_construct(Arc::new(block), header_accum_data)
22✔
350
                .map(Arc::new)
22✔
351
                .ok_or(BlockSyncError::FailedToConstructChainBlock)?;
22✔
352

353
            debug!(
22✔
354
                target: LOG_TARGET,
×
355
                "Validated in {:.0?}. Storing block body #{} (PoW = {}, {})",
×
356
                timer.elapsed(),
×
357
                block.header().height,
×
358
                block.header().pow_algo(),
×
359
                block.block().body.to_counts_string(),
×
360
            );
361
            trace!(
22✔
362
                target: LOG_TARGET,
×
363
                "{}",block
×
364
            );
365

366
            let timer = Instant::now();
22✔
367
            let allow_smt_change = Arc::new(AtomicBool::new(true));
22✔
368
            self.db
22✔
369
                .write_transaction()
22✔
370
                .delete_orphan(header_hash)
22✔
371
                .insert_tip_block_body(block.clone(), self.db.inner().smt(), allow_smt_change.clone())
22✔
372
                .set_best_block(
22✔
373
                    block.height(),
22✔
374
                    header_hash,
22✔
375
                    block.accumulated_data().total_accumulated_difficulty,
22✔
376
                    block.header().prev_hash,
22✔
377
                    timestamp,
22✔
378
                )
22✔
379
                .commit()
22✔
380
                .await?;
22✔
381

382
            // Average time between receiving blocks from the peer - used to detect a slow sync peer
383
            let last_avg_latency = avg_latency.calculate_average_with_min_samples(5);
22✔
384
            if let Some(latency) = last_avg_latency {
22✔
385
                sync_peer.set_latency(latency);
8✔
386
            }
14✔
387
            // Includes time to add block to database, used to show blocks/s on status line
388
            sync_peer.add_sample(last_sync_timer.elapsed());
22✔
389
            self.hooks
22✔
390
                .call_on_progress_block_hooks(block.clone(), tip_height, &sync_peer);
22✔
391

22✔
392
            debug!(
22✔
393
                target: LOG_TARGET,
×
394
                "Block body #{} added in {:.0?}, Tot_acc_diff {}, Monero {}, SHA3 {}, latency: {:.2?}",
×
395
                block.height(),
×
396
                timer.elapsed(),
×
397
                block
×
398
                    .accumulated_data()
×
399
                    .total_accumulated_difficulty,
×
400
                block.accumulated_data().accumulated_randomx_difficulty,
×
401
                block.accumulated_data().accumulated_sha3x_difficulty,
×
402
                latency
403
            );
404
            if let Some(avg_latency) = last_avg_latency {
22✔
405
                if avg_latency > max_latency {
8✔
406
                    return Err(BlockSyncError::MaxLatencyExceeded {
×
407
                        peer: sync_peer.node_id().clone(),
×
408
                        latency: avg_latency,
×
409
                        max_latency,
×
410
                    });
×
411
                }
8✔
412
            }
14✔
413

414
            current_block = Some(block);
22✔
415
            last_sync_timer = Instant::now();
22✔
416
        }
417
        debug!(
5✔
418
            "Sync peer claim at start  - height: {}, accumulated difficulty: {}",
×
419
            sync_peer.claimed_chain_metadata().best_block_height(),
×
420
            sync_peer.claimed_chain_metadata().accumulated_difficulty(),
×
421
        );
422
        debug!(
5✔
423
            "Our best header at start  - height: {}, accumulated difficulty: {}",
×
424
            best_height,
×
425
            chain_header.accumulated_data().total_accumulated_difficulty,
×
426
        );
427
        let metadata_after_sync = self.db.get_chain_metadata().await?;
5✔
428
        debug!(
5✔
429
            "Our best block after sync - height: {}, accumulated difficulty: {}",
×
430
            metadata_after_sync.best_block_height(),
×
431
            metadata_after_sync.accumulated_difficulty(),
×
432
        );
433

434
        if metadata_after_sync.accumulated_difficulty() < sync_peer.claimed_chain_metadata().accumulated_difficulty() {
5✔
435
            return Err(BlockSyncError::PeerDidNotSupplyAllClaimedBlocks(format!(
×
436
                "Their claim - height: {}, accumulated difficulty: {}. Our status after block sync - height: {}, \
×
437
                 accumulated difficulty: {}",
×
438
                sync_peer.claimed_chain_metadata().best_block_height(),
×
439
                sync_peer.claimed_chain_metadata().accumulated_difficulty(),
×
440
                metadata_after_sync.best_block_height(),
×
441
                metadata_after_sync.accumulated_difficulty(),
×
442
            )));
×
443
        }
5✔
444

445
        if let Some(block) = current_block {
5✔
446
            self.hooks.call_on_complete_hooks(block, best_height);
5✔
447
        }
5✔
448

449
        debug!(target: LOG_TARGET, "Completed block sync with peer `{}`", sync_peer);
5✔
450

451
        Ok(())
5✔
452
    }
8✔
453

454
    // Sync peers are also removed from the list of sync peers if the ban duration is longer than the short ban period.
455
    fn remove_sync_peer(&mut self, node_id: &NodeId) {
2✔
456
        if let Some(pos) = self.sync_peers.iter().position(|p| p.node_id() == node_id) {
2✔
457
            self.sync_peers.remove(pos);
2✔
458
        }
2✔
459
    }
2✔
460

461
    // Helper function to get the index to the node_id inside of the vec of peers
462
    fn get_sync_peer_index(&mut self, node_id: &NodeId) -> Option<usize> {
8✔
463
        self.sync_peers.iter().position(|p| p.node_id() == node_id)
8✔
464
    }
8✔
465
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc