• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 16933396277

13 Aug 2025 09:35AM UTC coverage: 54.463% (+0.2%) from 54.254%
16933396277

push

github

web-flow
feat: add seed peer exclusion to the proactive dialer (#7396)

Description
---
Added seed peer exclusion to proactive dialing when selecting available
candidates from the peer_db.

Motivation and Context
---
Seed peers are known entities; they have been dialled during initial
seed_strap, and a well-connected network should try to learn about and
connect to other peers as well.

How Has This Been Tested?
---
System-level testing.

What process can a PR reviewer use to test or verify this change?
---
Code review.

<!-- Checklist -->
<!-- 1. Is the title of your PR in the form that would make nice release
notes? The title, excluding the conventional commit
tag, will be included exactly as is in the CHANGELOG, so please think
about it carefully. -->


Breaking Changes
---

- [x] None
- [ ] Requires data directory on base node to be deleted
- [ ] Requires hard fork
- [ ] Other - Please specify

<!-- Does this include a breaking change? If so, include this line as a
footer -->
<!-- BREAKING CHANGE: Description what the user should do, e.g. delete a
database, resync the chain -->


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

* **Bug Fixes**
* Improved connection management by excluding seed peers from proactive
dialing candidates, enhancing network stability and reducing unnecessary
connection attempts and failed dials.

* **Documentation**
* Added a brief doc comment describing how to retrieve the list of seed
peers.

* **Tests**
* Expanded test coverage to validate discovery and syncing behavior when
seed peers are present and when filtering by external addresses.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

41 of 42 new or added lines in 3 files covered. (97.62%)

1673 existing lines in 28 files now uncovered.

76415 of 140305 relevant lines covered (54.46%)

194087.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs
1
//  Copyright 2020, The Tari Project
2
//
3
//  Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
//  following conditions are met:
5
//
6
//  1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
//  disclaimer.
8
//
9
//  2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
//  following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
//  3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
//  products derived from this software without specific prior written permission.
14
//
15
//  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
//  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
//  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
//  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
//  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
//  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
//  USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
use std::{
24
    convert::{TryFrom, TryInto},
25
    sync::Arc,
26
    time::{Duration, Instant},
27
};
28

29
use futures::StreamExt;
30
use log::*;
31
use tari_comms::{connectivity::ConnectivityRequester, peer_manager::NodeId, protocol::rpc::RpcClient, PeerConnection};
32
use tari_utilities::hex::Hex;
33

34
use super::error::BlockSyncError;
35
use crate::{
36
    base_node::{
37
        sync::{ban::PeerBanManager, hooks::Hooks, rpc, SyncPeer},
38
        BlockchainSyncConfig,
39
    },
40
    blocks::{Block, ChainBlock},
41
    chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend},
42
    common::{rolling_avg::RollingAverageTime, BanPeriod},
43
    proto::base_node::SyncBlocksRequest,
44
    transactions::aggregated_body::AggregateBody,
45
    validation::{BlockBodyValidator, ValidationError},
46
};
47

48
const LOG_TARGET: &str = "c::bn::block_sync";
49

50
const MAX_LATENCY_INCREASES: usize = 5;
51

52
pub struct BlockSynchronizer<'a, B> {
53
    config: BlockchainSyncConfig,
54
    db: AsyncBlockchainDb<B>,
55
    connectivity: ConnectivityRequester,
56
    sync_peers: &'a mut Vec<SyncPeer>,
57
    block_validator: Arc<dyn BlockBodyValidator<B>>,
58
    hooks: Hooks,
59
    peer_ban_manager: PeerBanManager,
60
}
61

62
impl<'a, B: BlockchainBackend + 'static> BlockSynchronizer<'a, B> {
63
    pub fn new(
×
64
        config: BlockchainSyncConfig,
×
65
        db: AsyncBlockchainDb<B>,
×
66
        connectivity: ConnectivityRequester,
×
67
        sync_peers: &'a mut Vec<SyncPeer>,
×
68
        block_validator: Arc<dyn BlockBodyValidator<B>>,
×
69
    ) -> Self {
×
70
        let peer_ban_manager = PeerBanManager::new(config.clone(), connectivity.clone());
×
71
        Self {
×
72
            config,
×
73
            db,
×
74
            connectivity,
×
75
            sync_peers,
×
76
            block_validator,
×
77
            hooks: Default::default(),
×
78
            peer_ban_manager,
×
79
        }
×
80
    }
×
81

82
    pub fn on_starting<H>(&mut self, hook: H)
×
83
    where for<'r> H: FnOnce(&SyncPeer) + Send + Sync + 'static {
×
84
        self.hooks.add_on_starting_hook(hook);
×
85
    }
×
86

87
    pub fn on_progress<H>(&mut self, hook: H)
×
88
    where H: Fn(Arc<ChainBlock>, u64, &SyncPeer) + Send + Sync + 'static {
×
89
        self.hooks.add_on_progress_block_hook(hook);
×
90
    }
×
91

92
    pub fn on_complete<H>(&mut self, hook: H)
×
93
    where H: Fn(Arc<ChainBlock>, u64) + Send + Sync + 'static {
×
94
        self.hooks.add_on_complete_hook(hook);
×
95
    }
×
96

97
    pub async fn synchronize(&mut self) -> Result<(), BlockSyncError> {
×
98
        let mut max_latency = self.config.initial_max_sync_latency;
×
99
        let mut sync_round = 0;
×
100
        let mut latency_increases_counter = 0;
×
101
        loop {
102
            match self.attempt_block_sync(max_latency).await {
×
103
                Ok(_) => return Ok(()),
×
104
                Err(err @ BlockSyncError::AllSyncPeersExceedLatency) => {
×
105
                    warn!(target: LOG_TARGET, "{}", err);
×
106
                    max_latency += self.config.max_latency_increase;
×
107
                    warn!(
×
108
                        target: LOG_TARGET,
×
109
                        "Retrying block sync with increased max latency {:.2?} with {} sync peers",
×
110
                        max_latency,
×
111
                        self.sync_peers.len()
×
112
                    );
113
                    latency_increases_counter += 1;
×
114
                    if latency_increases_counter > MAX_LATENCY_INCREASES {
×
115
                        return Err(err);
×
116
                    }
×
117
                    // Prohibit using a few slow sync peers only, rather get new sync peers assigned
×
118
                    if self.sync_peers.len() < 2 {
×
119
                        return Err(err);
×
120
                    } else {
121
                        continue;
×
122
                    }
123
                },
124
                Err(err @ BlockSyncError::SyncRoundFailed) => {
×
125
                    sync_round += 1;
×
126
                    warn!(target: LOG_TARGET, "{} ({})", err, sync_round);
×
127
                    continue;
×
128
                },
129
                Err(err) => {
×
130
                    return Err(err);
×
131
                },
132
            }
133
        }
134
    }
×
135

136
    async fn attempt_block_sync(&mut self, max_latency: Duration) -> Result<(), BlockSyncError> {
×
137
        let sync_peer_node_ids = self.sync_peers.iter().map(|p| p.node_id()).cloned().collect::<Vec<_>>();
×
138
        info!(
×
139
            target: LOG_TARGET,
×
140
            "Attempting to sync blocks({} sync peers)",
×
141
            sync_peer_node_ids.len()
×
142
        );
143
        let mut latency_counter = 0usize;
×
144
        for node_id in sync_peer_node_ids {
×
145
            let peer_index = self.get_sync_peer_index(&node_id).ok_or(BlockSyncError::PeerNotFound)?;
×
146
            let sync_peer = &self.sync_peers[peer_index];
×
147
            self.hooks.call_on_starting_hook(sync_peer);
×
148
            let mut conn = match self.connect_to_sync_peer(node_id.clone()).await {
×
149
                Ok(val) => val,
×
150
                Err(e) => {
×
151
                    warn!(
×
152
                        target: LOG_TARGET,
×
153
                        "Failed to connect to sync peer `{}`: {}", node_id, e
×
154
                    );
155
                    self.remove_sync_peer(&node_id);
×
156
                    continue;
×
157
                },
158
            };
159
            let config = RpcClient::builder()
×
160
                .with_deadline(self.config.rpc_deadline)
×
161
                .with_deadline_grace_period(Duration::from_secs(5));
×
162
            let mut client = match conn
×
163
                .connect_rpc_using_builder::<rpc::BaseNodeSyncRpcClient>(config)
×
164
                .await
×
165
            {
166
                Ok(val) => val,
×
167
                Err(e) => {
×
168
                    warn!(
×
169
                        target: LOG_TARGET,
×
170
                        "Failed to obtain RPC connection from sync peer `{}`: {}", node_id, e
×
171
                    );
172
                    self.remove_sync_peer(&node_id);
×
173
                    continue;
×
174
                },
175
            };
176
            let latency = client
×
177
                .get_last_request_latency()
×
178
                .expect("unreachable panic: last request latency must be set after connect");
×
179
            self.sync_peers[peer_index].set_latency(latency);
×
180
            let sync_peer = self.sync_peers[peer_index].clone();
×
181
            info!(
×
182
                target: LOG_TARGET,
×
183
                "Attempting to synchronize blocks with `{}` latency: {:.2?}", node_id, latency
×
184
            );
185
            match self.synchronize_blocks(sync_peer, client, max_latency).await {
×
186
                Ok(_) => return Ok(()),
×
187
                Err(err) => {
×
188
                    warn!(target: LOG_TARGET, "{}", err);
×
189
                    let ban_reason = BlockSyncError::get_ban_reason(&err);
×
190
                    if let Some(reason) = ban_reason {
×
191
                        let duration = match reason.ban_duration {
×
192
                            BanPeriod::Short => self.config.short_ban_period,
×
193
                            BanPeriod::Long => self.config.ban_period,
×
194
                        };
195
                        self.peer_ban_manager
×
196
                            .ban_peer_if_required(&node_id, reason.reason, duration)
×
197
                            .await;
×
198
                    }
×
199
                    if let BlockSyncError::MaxLatencyExceeded { .. } = err {
×
200
                        latency_counter += 1;
×
201
                    } else {
×
202
                        self.remove_sync_peer(&node_id);
×
203
                    }
×
204
                },
205
            }
206
        }
207

208
        if self.sync_peers.is_empty() {
×
209
            Err(BlockSyncError::NoMoreSyncPeers("Block sync failed".to_string()))
×
210
        } else if latency_counter >= self.sync_peers.len() {
×
211
            Err(BlockSyncError::AllSyncPeersExceedLatency)
×
212
        } else {
213
            Err(BlockSyncError::SyncRoundFailed)
×
214
        }
215
    }
×
216

217
    async fn connect_to_sync_peer(&self, peer: NodeId) -> Result<PeerConnection, BlockSyncError> {
×
218
        let connection = self.connectivity.dial_peer(peer).await?;
×
219
        Ok(connection)
×
220
    }
×
221

222
    #[allow(clippy::too_many_lines)]
223
    async fn synchronize_blocks(
×
224
        &mut self,
×
225
        mut sync_peer: SyncPeer,
×
226
        mut client: rpc::BaseNodeSyncRpcClient,
×
227
        max_latency: Duration,
×
228
    ) -> Result<(), BlockSyncError> {
×
229
        info!(target: LOG_TARGET, "Starting block sync from peer {}", sync_peer.node_id());
×
230

231
        let tip_header = self.db.fetch_last_header().await?;
×
232
        let local_metadata = self.db.get_chain_metadata().await?;
×
233

234
        if tip_header.height <= local_metadata.best_block_height() {
×
235
            debug!(
×
236
                target: LOG_TARGET,
×
237
                "Blocks already synchronized to height {}.", tip_header.height
×
238
            );
239
            return Ok(());
×
240
        }
×
241

×
242
        let tip_hash = tip_header.hash();
×
243
        let tip_height = tip_header.height;
×
244
        let best_height = local_metadata.best_block_height();
×
245
        let chain_header = self.db.fetch_chain_header(best_height).await?;
×
246

247
        let best_full_block_hash = chain_header.accumulated_data().hash;
×
248
        debug!(
×
249
            target: LOG_TARGET,
×
250
            "Starting block sync from peer `{}`. Current best block is #{} `{}`. Syncing to #{} ({}).",
×
251
            sync_peer,
×
252
            best_height,
×
253
            best_full_block_hash.to_hex(),
×
254
            tip_height,
×
255
            tip_hash.to_hex()
×
256
        );
257
        let request = SyncBlocksRequest {
×
258
            start_hash: best_full_block_hash.to_vec(),
×
259
            // To the tip!
×
260
            end_hash: tip_hash.to_vec(),
×
261
        };
×
262

263
        let mut block_stream = client.sync_blocks(request).await?;
×
264
        let mut prev_hash = best_full_block_hash;
×
265
        let mut current_block = None;
×
266
        let mut last_sync_timer = Instant::now();
×
267
        let mut avg_latency = RollingAverageTime::new(20);
×
268
        while let Some(block_result) = block_stream.next().await {
×
269
            let latency = last_sync_timer.elapsed();
×
270
            avg_latency.add_sample(latency);
×
271
            let block_body_response = block_result?;
×
272

273
            let header = self
×
274
                .db
×
275
                .fetch_chain_header_by_block_hash(block_body_response.hash.clone().try_into()?)
×
276
                .await?
×
277
                .ok_or_else(|| {
×
278
                    BlockSyncError::UnknownHeaderHash(format!(
×
279
                        "Peer sent hash ({}) for block header we do not have",
×
280
                        block_body_response.hash.to_hex()
×
281
                    ))
×
282
                })?;
×
283

284
            let current_height = header.height();
×
285
            let header_hash = *header.hash();
×
286
            let timestamp = header.timestamp();
×
287

×
288
            if header.header().prev_hash != prev_hash {
×
289
                return Err(BlockSyncError::BlockWithoutParent {
×
290
                    expected: prev_hash.to_hex(),
×
291
                    got: header.header().prev_hash.to_hex(),
×
292
                });
×
293
            }
×
294

×
295
            prev_hash = header_hash;
×
296

297
            let body = block_body_response
×
298
                .body
×
299
                .map(AggregateBody::try_from)
×
300
                .ok_or_else(|| BlockSyncError::InvalidBlockBody("Peer sent empty block".to_string()))?
×
301
                .map_err(BlockSyncError::InvalidBlockBody)?;
×
302

303
            debug!(
×
304
                target: LOG_TARGET,
×
305
                "Validating block body #{} (PoW = {}, {}, latency: {:.2?})",
×
306
                current_height,
×
307
                header.header().pow_algo(),
×
308
                body.to_counts_string(),
×
309
                latency
310
            );
311

312
            let timer = Instant::now();
×
313
            let (header, header_accum_data) = header.into_parts();
×
314
            let block = Block::new(header, body);
×
315

×
316
            // Validate the block inside a tokio task
×
317
            let task_block = block.clone();
×
318
            let db = self.db.inner().clone();
×
319
            let validator = self.block_validator.clone();
×
320
            let res = {
×
321
                let txn = db.db_read_access()?;
×
322
                validator.validate_body(&*txn, &task_block)
×
323
            };
324

325
            let block = match res {
×
326
                Ok(block) => block,
×
327
                Err(err @ ValidationError::BadBlockFound { .. }) | Err(err @ ValidationError::FatalStorageError(_)) => {
×
328
                    return Err(err.into());
×
329
                },
330
                Err(err) => {
×
331
                    // Add to bad blocks
332
                    if let Err(err) = self
×
333
                        .db
×
334
                        .write_transaction()
×
335
                        .delete_orphan(header_hash)
×
336
                        .insert_bad_block(header_hash, current_height, err.to_string())
×
337
                        .commit()
×
338
                        .await
×
339
                    {
340
                        error!(target: LOG_TARGET, "Failed to insert bad block: {}", err);
×
341
                    }
×
342
                    return Err(err.into());
×
343
                },
344
            };
345

346
            let block = ChainBlock::try_construct(Arc::new(block), header_accum_data)
×
347
                .map(Arc::new)
×
348
                .ok_or(BlockSyncError::FailedToConstructChainBlock)?;
×
349

350
            debug!(
×
351
                target: LOG_TARGET,
×
352
                "Validated in {:.0?}. Storing block body #{} (PoW = {}, {})",
×
353
                timer.elapsed(),
×
354
                block.header().height,
×
355
                block.header().pow_algo(),
×
356
                block.block().body.to_counts_string(),
×
357
            );
358
            trace!(
×
359
                target: LOG_TARGET,
×
360
                "{}",block
×
361
            );
362

363
            self.db
×
364
                .write_transaction()
×
365
                .delete_orphan(header_hash)
×
366
                .insert_tip_block_body(block.clone())
×
367
                .set_best_block(
×
368
                    block.height(),
×
369
                    header_hash,
×
370
                    block.accumulated_data().total_accumulated_difficulty,
×
371
                    block.header().prev_hash,
×
372
                    timestamp,
×
373
                )
×
374
                .commit()
×
375
                .await?;
×
376

377
            // Average time between receiving blocks from the peer - used to detect a slow sync peer
UNCOV
378
            let last_avg_latency = avg_latency.calculate_average_with_min_samples(5);
×
379
            if let Some(latency) = last_avg_latency {
×
380
                sync_peer.set_latency(latency);
×
381
            }
×
382
            // Includes time to add block to database, used to show blocks/s on status line
UNCOV
383
            sync_peer.add_sample(last_sync_timer.elapsed());
×
384
            self.hooks
×
385
                .call_on_progress_block_hooks(block.clone(), tip_height, &sync_peer);
×
386

387
            if let Some(avg_latency) = last_avg_latency {
×
388
                if avg_latency > max_latency {
×
389
                    return Err(BlockSyncError::MaxLatencyExceeded {
×
390
                        peer: sync_peer.node_id().clone(),
×
391
                        latency: avg_latency,
×
392
                        max_latency,
×
393
                    });
×
394
                }
×
395
            }
×
396

397
            current_block = Some(block);
×
398
            last_sync_timer = Instant::now();
×
399
        }
UNCOV
400
        debug!(
×
401
            "Sync peer claim at start  - height: {}, accumulated difficulty: {}",
×
402
            sync_peer.claimed_chain_metadata().best_block_height(),
×
403
            sync_peer.claimed_chain_metadata().accumulated_difficulty(),
×
404
        );
405
        debug!(
×
406
            "Our best header at start  - height: {}, accumulated difficulty: {}",
×
407
            best_height,
×
408
            chain_header.accumulated_data().total_accumulated_difficulty,
×
409
        );
UNCOV
410
        let metadata_after_sync = self.db.get_chain_metadata().await?;
×
411
        debug!(
×
412
            "Our best block after sync - height: {}, accumulated difficulty: {}",
×
UNCOV
413
            metadata_after_sync.best_block_height(),
×
414
            metadata_after_sync.accumulated_difficulty(),
×
415
        );
416

417
        if metadata_after_sync.accumulated_difficulty() < sync_peer.claimed_chain_metadata().accumulated_difficulty() {
×
UNCOV
418
            return Err(BlockSyncError::PeerDidNotSupplyAllClaimedBlocks(format!(
×
419
                "Their claim - height: {}, accumulated difficulty: {}. Our status after block sync - height: {}, \
×
420
                 accumulated difficulty: {}",
×
421
                sync_peer.claimed_chain_metadata().best_block_height(),
×
422
                sync_peer.claimed_chain_metadata().accumulated_difficulty(),
×
UNCOV
423
                metadata_after_sync.best_block_height(),
×
424
                metadata_after_sync.accumulated_difficulty(),
×
425
            )));
×
426
        }
×
427

428
        if let Some(block) = current_block {
×
UNCOV
429
            self.hooks.call_on_complete_hooks(block, best_height);
×
UNCOV
430
        }
×
431

432
        debug!(target: LOG_TARGET, "Completed block sync with peer `{}`", sync_peer);
×
433

434
        Ok(())
×
435
    }
×
436

437
    // Sync peers are also removed from the list of sync peers if the ban duration is longer than the short ban period.
438
    fn remove_sync_peer(&mut self, node_id: &NodeId) {
×
439
        if let Some(pos) = self.sync_peers.iter().position(|p| p.node_id() == node_id) {
×
440
            self.sync_peers.remove(pos);
×
UNCOV
441
        }
×
442
    }
×
443

444
    // Helper function to get the index to the node_id inside of the vec of peers
UNCOV
445
    fn get_sync_peer_index(&mut self, node_id: &NodeId) -> Option<usize> {
×
446
        self.sync_peers.iter().position(|p| p.node_id() == node_id)
×
UNCOV
447
    }
×
448
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc