• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 15326284040

29 May 2025 02:33PM UTC coverage: 72.791% (-0.8%) from 73.545%
15326284040

push

github

web-flow
fix: peer retention and connections (#7123)

Description
---
- Fixed peer retention in the database by improving delete all stale
peers logic.
- Improved error responses when peers could not be found in the
database.
- Improved DHT connection pools sync with peer database.

Motivation and Context
---
DHT neighbour and random pools were not in sync with actual connections
and peers in the peer db.

Issue - Why `PeerManagerError(PeerNotFoundError)`
```
2025-05-28 04:49:55.101009900 [comms::dht::connectivity] ERROR Error refreshing neighbour peer pool: PeerManagerError(PeerNotFoundError)
2025-05-28 04:49:55.101120100 [comms::connectivity::manager] TRACE Request (14743808475136314793): GetAllowList(Sender { inner: Some(Inner { state: State { is_complete: false, is_closed: false, is_rx_task_set: true, is_tx_task_set: false } }) })
2025-05-28 04:49:55.101160300 [comms::connectivity::manager] TRACE Request (14743808475136314793) done
2025-05-28 04:49:55.104823200 [comms::dht::connectivity] ERROR Error refreshing random peer pool: PeerManagerError(PeerNotFoundError)
```
Issue  -  Why `0 connected` but `active DHT connections: 10/12`
```
2025-05-28 04:49:55.104929100 [comms::dht::connectivity] DEBUG DHT connectivity status: neighbour pool: 8/8 (0 connected), random pool: 4/4 (0 connected, last refreshed 12777s ago), active DHT connections: 10/12
```
Issue - Why `Inbound pipeline returned an error: 'The requested peer
does not exist'`
```
2025-05-28 10:42:21.447513700 [comms::pipeline::inbound] WARN  Inbound pipeline returned an error: 'The requested peer does not exist'
```
How Has This Been Tested?
---
- Adapted unit test for improved delete all stale peers logic.
- System-level testing [**TBD**]

What process can a PR reviewer use to test or verify this change?
---
- Code review.
- System-level testing

<!-- Checklist -->
<!-- 1. Is the title of your PR in the form that would make nice release
notes? The title, excluding the conventional commit
tag, will be ... (continued)

163 of 194 new or added lines in 11 files covered. (84.02%)

1028 existing lines in 27 files now uncovered.

82035 of 112699 relevant lines covered (72.79%)

252963.2 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/base_layer/core/src/base_node/state_machine_service/initializer.rs
1
// Copyright 2020. The Tari Project
2
//
3
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
// following conditions are met:
5
//
6
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
// disclaimer.
8
//
9
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
// following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
// products derived from this software without specific prior written permission.
14
//
15
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
use log::*;
24
use tari_comms::connectivity::ConnectivityRequester;
25
use tari_comms_dht::Dht;
26
use tari_service_framework::{async_trait, ServiceInitializationError, ServiceInitializer, ServiceInitializerContext};
27
use tokio::sync::{broadcast, watch};
28

29
use crate::{
30
    base_node::{
31
        chain_metadata_service::ChainMetadataHandle,
32
        state_machine_service::{
33
            handle::StateMachineHandle,
34
            state_machine::{BaseNodeStateMachine, BaseNodeStateMachineConfig},
35
            states::StatusInfo,
36
        },
37
        sync::SyncValidators,
38
        LocalNodeCommsInterface,
39
    },
40
    chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend},
41
    consensus::ConsensusManager,
42
    proof_of_work::randomx_factory::RandomXFactory,
43
    transactions::CryptoFactories,
44
};
45

46
const LOG_TARGET: &str = "c::bn::state_machine_service::initializer";
47

48
pub struct BaseNodeStateMachineInitializer<B> {
49
    db: AsyncBlockchainDb<B>,
50
    config: BaseNodeStateMachineConfig,
51
    rules: ConsensusManager,
52
    factories: CryptoFactories,
53
    randomx_factory: RandomXFactory,
54
    bypass_range_proof_verification: bool,
55
}
56

57
impl<B> BaseNodeStateMachineInitializer<B>
58
where B: BlockchainBackend + 'static
59
{
UNCOV
60
    pub fn new(
×
61
        db: AsyncBlockchainDb<B>,
×
62
        config: BaseNodeStateMachineConfig,
×
63
        rules: ConsensusManager,
×
64
        factories: CryptoFactories,
×
65
        randomx_factory: RandomXFactory,
×
66
        bypass_range_proof_verification: bool,
×
67
    ) -> Self {
×
68
        Self {
×
69
            db,
×
70
            config,
×
71
            rules,
×
72
            factories,
×
73
            randomx_factory,
×
74
            bypass_range_proof_verification,
×
75
        }
×
76
    }
×
77
}
78

79
#[async_trait]
80
impl<B> ServiceInitializer for BaseNodeStateMachineInitializer<B>
81
where B: BlockchainBackend + 'static
82
{
UNCOV
83
    async fn initialize(&mut self, context: ServiceInitializerContext) -> Result<(), ServiceInitializationError> {
×
84
        trace!(target: LOG_TARGET, "Initializing Base Node State Machine Service");
×
85
        let (state_event_publisher, _) = broadcast::channel(500);
×
86
        let (status_event_sender, status_event_receiver) = watch::channel(StatusInfo::new());
×
87

×
88
        let handle = StateMachineHandle::new(
×
89
            state_event_publisher.clone(),
×
90
            status_event_receiver,
×
91
            context.get_shutdown_signal(),
×
92
        );
×
93
        context.register_handle(handle);
×
94

×
95
        let factories = self.factories.clone();
×
96
        let rules = self.rules.clone();
×
97
        let db = self.db.clone();
×
98
        let config = self.config.clone();
×
99
        let randomx_factory = self.randomx_factory.clone();
×
100
        let bypass_range_proof_verification = self.bypass_range_proof_verification;
×
101

×
102
        context.spawn_when_ready(move |handles| async move {
×
103
            let chain_metadata_service = handles.expect_handle::<ChainMetadataHandle>();
×
104
            let node_local_interface = handles.expect_handle::<LocalNodeCommsInterface>();
×
105
            let connectivity = handles.expect_handle::<ConnectivityRequester>();
×
106

×
107
            let sync_validators =
×
108
                SyncValidators::full_consensus(rules.clone(), factories, bypass_range_proof_verification);
×
109

×
110
            let dht = handles.expect_handle::<Dht>(); // Get Dht handle
×
111
            let node = BaseNodeStateMachine::new(
×
112
                db,
×
113
                node_local_interface,
×
114
                connectivity,
×
115
                chain_metadata_service.get_event_stream(),
×
116
                dht.subscribe_dht_events(), // Pass DhtEventReceiver
×
117
                config,
×
118
                sync_validators,
×
119
                status_event_sender,
×
120
                state_event_publisher,
×
121
                randomx_factory,
×
122
                rules,
×
123
                handles.get_shutdown_signal(),
×
124
            );
×
125
            node.run().await;
×
126
            info!(target: LOG_TARGET, "Base Node State Machine Service has shut down");
×
127
        });
×
128
        debug!(target: LOG_TARGET, "Base Node State Machine Service initialized");
×
129

130
        Ok(())
×
UNCOV
131
    }
×
132
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc