• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 15326284040

29 May 2025 02:33PM UTC coverage: 72.791% (-0.8%) from 73.545%
15326284040

push

github

web-flow
fix: peer retention and connections (#7123)

Description
---
- Fixed peer retention in the database by improving delete all stale
peers logic.
- Improved error responses when peers could not be found in the
database.
- Improved DHT connection pools sync with peer database.

Motivation and Context
---
DHT neighbour and random pools were not in sync with actual connections
and peers in the peer db.

Issue - Why `PeerManagerError(PeerNotFoundError)`
```
2025-05-28 04:49:55.101009900 [comms::dht::connectivity] ERROR Error refreshing neighbour peer pool: PeerManagerError(PeerNotFoundError)
2025-05-28 04:49:55.101120100 [comms::connectivity::manager] TRACE Request (14743808475136314793): GetAllowList(Sender { inner: Some(Inner { state: State { is_complete: false, is_closed: false, is_rx_task_set: true, is_tx_task_set: false } }) })
2025-05-28 04:49:55.101160300 [comms::connectivity::manager] TRACE Request (14743808475136314793) done
2025-05-28 04:49:55.104823200 [comms::dht::connectivity] ERROR Error refreshing random peer pool: PeerManagerError(PeerNotFoundError)
```
Issue  -  Why `0 connected` but `active DHT connections: 10/12`
```
2025-05-28 04:49:55.104929100 [comms::dht::connectivity] DEBUG DHT connectivity status: neighbour pool: 8/8 (0 connected), random pool: 4/4 (0 connected, last refreshed 12777s ago), active DHT connections: 10/12
```
Issue - Why `Inbound pipeline returned an error: 'The requested peer
does not exist'`
```
2025-05-28 10:42:21.447513700 [comms::pipeline::inbound] WARN  Inbound pipeline returned an error: 'The requested peer does not exist'
```
How Has This Been Tested?
---
- Adapted unit test for improved delete all stale peers logic.
- System-level testing [**TBD**]

What process can a PR reviewer use to test or verify this change?
---
- Code review.
- System-level testing

<!-- Checklist -->
<!-- 1. Is the title of your PR in the form that would make nice release
notes? The title, excluding the conventional commit
tag, will be ... (continued)

163 of 194 new or added lines in 11 files covered. (84.02%)

1028 existing lines in 27 files now uncovered.

82035 of 112699 relevant lines covered (72.79%)

252963.2 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.89
/base_layer/core/src/base_node/state_machine_service/state_machine.rs
1
// Copyright 2019. The Tari Project
2
//
3
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
// following conditions are met:
5
//
6
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
// disclaimer.
8
//
9
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
// following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
// products derived from this software without specific prior written permission.
14
//
15
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22
use std::{future::Future, sync::Arc, time::Duration};
23

24
use futures::{future, future::Either};
25
use log::*;
26
use randomx_rs::RandomXFlag;
27
use serde::{Deserialize, Serialize};
28
use tari_common::configuration::serializers;
29
use tari_comms::connectivity::ConnectivityRequester;
30
use tari_comms_dht::event::DhtEventReceiver;
31
use tari_shutdown::ShutdownSignal;
32
use tokio::sync::{broadcast, watch};
33

34
use crate::{
35
    base_node::{
36
        chain_metadata_service::ChainMetadataEvent,
37
        comms_interface::LocalNodeCommsInterface,
38
        state_machine_service::{
39
            states,
40
            states::{BaseNodeState, HeaderSyncState, StateEvent, StateInfo, StatusInfo, SyncStatus},
41
        },
42
        sync::{BlockchainSyncConfig, SyncValidators},
43
    },
44
    chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend},
45
    consensus::ConsensusManager,
46
    proof_of_work::randomx_factory::RandomXFactory,
47
};
48
const LOG_TARGET: &str = "c::bn::base_node";
49

50
/// Configuration for the BaseNodeStateMachine.
51
#[derive(Clone, Serialize, Deserialize, Debug)]
×
52
#[serde(deny_unknown_fields)]
53
pub struct BaseNodeStateMachineConfig {
54
    pub blockchain_sync_config: BlockchainSyncConfig,
55
    /// The maximum amount of VMs that RandomX will be use
56
    /// The amount of blocks this node can be behind a peer before considered to be lagging (to test the block
57
    /// propagation by delaying lagging)
58
    pub blocks_behind_before_considered_lagging: u64,
59
    /// The amount of time this node can know about a stronger chain before considered to be lagging.
60
    /// This is to give a node time to receive the block via propagation, which is usually less network
61
    /// intensive. Be careful of setting this higher than the block time, which would potentially cause it
62
    /// to always be behind the network
63
    #[serde(with = "serializers::seconds")]
64
    pub time_before_considered_lagging: Duration,
65
    /// This is the amount of metadata events that a node will wait for before decide to start syncing for a peer,
66
    /// choosing the best peer out of the list
67
    pub initial_sync_peer_count: u64,
68
}
69

70
#[allow(clippy::derivable_impls)]
71
impl Default for BaseNodeStateMachineConfig {
72
    fn default() -> Self {
73✔
73
        Self {
73✔
74
            blockchain_sync_config: Default::default(),
73✔
75
            blocks_behind_before_considered_lagging: 1,
73✔
76
            time_before_considered_lagging: Duration::from_secs(10),
73✔
77
            initial_sync_peer_count: 5,
73✔
78
        }
73✔
79
    }
73✔
80
}
81

82
/// A Minotari full node, aka Base Node.
83
///
84
/// This service is essentially a finite state machine that synchronises its blockchain state with its peers and
85
/// then listens for new blocks to add to the blockchain. See the [SynchronizationState] documentation for more details.
86
///
87
/// This struct holds fields that will be used by all the various FSM state instances, including the local blockchain
88
/// database and hooks to the p2p network
89
pub struct BaseNodeStateMachine<B: BlockchainBackend> {
90
    pub(super) db: AsyncBlockchainDb<B>,
91
    pub(super) local_node_interface: LocalNodeCommsInterface,
92
    pub(super) connectivity: ConnectivityRequester,
93
    pub(super) metadata_event_stream: broadcast::Receiver<Arc<ChainMetadataEvent>>,
94
    pub(super) dht_event_stream: DhtEventReceiver,
95
    pub(super) config: BaseNodeStateMachineConfig,
96
    pub(super) info: StateInfo,
97
    pub(super) sync_validators: SyncValidators<B>,
98
    pub(super) consensus_rules: ConsensusManager,
99
    pub(super) status_event_sender: Arc<watch::Sender<StatusInfo>>,
100
    pub(super) randomx_factory: RandomXFactory,
101
    is_bootstrapped: bool,
102
    pub(super) is_primary_bootstrap_complete: bool,
103
    event_publisher: broadcast::Sender<Arc<StateEvent>>,
104
    interrupt_signal: ShutdownSignal,
105
}
106

107
impl<B: BlockchainBackend + 'static> BaseNodeStateMachine<B> {
108
    // Instantiate a new Base Node.
109
    #[allow(clippy::too_many_arguments)]
110
    pub fn new(
23✔
111
        db: AsyncBlockchainDb<B>,
23✔
112
        local_node_interface: LocalNodeCommsInterface,
23✔
113
        connectivity: ConnectivityRequester,
23✔
114
        metadata_event_stream: broadcast::Receiver<Arc<ChainMetadataEvent>>,
23✔
115
        dht_event_stream: DhtEventReceiver, // New parameter
23✔
116
        config: BaseNodeStateMachineConfig,
23✔
117
        sync_validators: SyncValidators<B>,
23✔
118
        status_event_sender: watch::Sender<StatusInfo>,
23✔
119
        event_publisher: broadcast::Sender<Arc<StateEvent>>,
23✔
120
        randomx_factory: RandomXFactory,
23✔
121
        consensus_rules: ConsensusManager,
23✔
122
        interrupt_signal: ShutdownSignal,
23✔
123
    ) -> Self {
23✔
124
        Self {
23✔
125
            db,
23✔
126
            local_node_interface,
23✔
127
            connectivity,
23✔
128
            metadata_event_stream,
23✔
129
            dht_event_stream, // Initialize new field
23✔
130
            config,
23✔
131
            info: StateInfo::StartUp,
23✔
132
            event_publisher,
23✔
133
            status_event_sender: Arc::new(status_event_sender),
23✔
134
            sync_validators,
23✔
135
            randomx_factory,
23✔
136
            is_bootstrapped: false,
23✔
137
            is_primary_bootstrap_complete: false,
23✔
138
            consensus_rules,
23✔
139
            interrupt_signal,
23✔
140
        }
23✔
141
    }
23✔
142

143
    /// Describe the Finite State Machine for the base node. This function describes _every possible_ state
144
    /// transition for the node given its current state and an event that gets triggered.
145
    pub fn transition(&self, state: BaseNodeState, event: StateEvent) -> BaseNodeState {
1✔
146
        let db = self.db.inner();
1✔
147
        #[allow(clippy::enum_glob_use)]
148
        use self::{BaseNodeState::*, StateEvent::*, SyncStatus::Lagging};
149
        match (state, event) {
1✔
150
            (Starting(s), Initialized(network_silence)) => Listening(s.into(), network_silence),
1✔
151
            (
152
                Listening(..),
153
                FallenBehind(Lagging {
UNCOV
154
                    local: local_metadata,
×
UNCOV
155
                    sync_peers,
×
UNCOV
156
                    ..
×
UNCOV
157
                }),
×
UNCOV
158
            ) => {
×
159
                db.set_disable_add_block_flag();
×
160
                HeaderSync(HeaderSyncState::new(sync_peers, local_metadata))
×
161
            },
UNCOV
162
            (HeaderSync(s), HeaderSyncFailed(_err)) => {
×
163
                db.clear_disable_add_block_flag();
×
164
                Waiting(s.into())
×
165
            },
UNCOV
166
            (HeaderSync(s), Continue | NetworkSilence) => {
×
167
                db.clear_disable_add_block_flag();
×
UNCOV
168
                Listening(s.into(), false)
×
169
            },
170
            (HeaderSync(s), HeadersSynchronized(..)) => DecideNextSync(s.into()),
×
171

172
            (DecideNextSync(_), ProceedToHorizonSync(peers)) => HorizonStateSync(peers.into()),
×
UNCOV
173
            (DecideNextSync(s), Continue) => {
×
174
                db.clear_disable_add_block_flag();
×
175
                Listening(s.into(), false)
×
176
            },
177
            (HorizonStateSync(s), HorizonStateSynchronized) => BlockSync(s.into()),
×
UNCOV
178
            (HorizonStateSync(s), HorizonStateSyncFailure) => {
×
UNCOV
179
                db.clear_disable_add_block_flag();
×
180
                Waiting(s.into())
×
181
            },
182

183
            (DecideNextSync(_), ProceedToBlockSync(peers)) => BlockSync(peers.into()),
×
UNCOV
184
            (BlockSync(s), BlocksSynchronized) => {
×
185
                db.clear_disable_add_block_flag();
×
186
                Listening(s.into(), false)
×
187
            },
UNCOV
188
            (BlockSync(s), BlockSyncFailed) => {
×
UNCOV
189
                db.clear_disable_add_block_flag();
×
190
                Waiting(s.into())
×
191
            },
192

193
            (Waiting(s), Continue) => Listening(s.into(), false),
×
194
            (_, FatalError(s)) => panic!("{}", s),
×
195
            (_, UserQuit) => Shutdown(states::Shutdown::with_reason("Shutdown initiated by user".to_string())),
×
196
            (s, e) => {
×
UNCOV
197
                warn!(
×
198
                    target: LOG_TARGET,
×
UNCOV
199
                    "No state transition occurs for event {:?} in state {}", e, s
×
200
                );
UNCOV
201
                s
×
202
            },
203
        }
204
    }
1✔
205

206
    /// This function will publish the current StatusInfo to the channel
207
    pub fn publish_event_info(&self) {
9✔
208
        let status = StatusInfo {
9✔
209
            bootstrapped: self.is_bootstrapped(),
9✔
210
            state_info: self.info.clone(),
9✔
211
            randomx_vm_cnt: self.randomx_factory.get_count().unwrap_or(0),
9✔
212
            randomx_vm_flags: self.randomx_factory.get_flags().unwrap_or_default(),
9✔
213
        };
9✔
214

215
        if let Err(e) = self.status_event_sender.send(status) {
9✔
UNCOV
216
            debug!(target: LOG_TARGET, "Error broadcasting a StatusEvent update: {}", e);
×
217
        }
9✔
218
    }
9✔
219

220
    /// Sets the StatusInfo.
221
    pub fn set_state_info(&mut self, info: StateInfo) {
9✔
222
        self.info = info;
9✔
223
        if self.info.is_synced() && !self.is_bootstrapped {
9✔
224
            debug!(target: LOG_TARGET, "Node has bootstrapped");
1✔
225
            self.is_bootstrapped = true;
1✔
226
        }
8✔
227
        self.publish_event_info();
9✔
228
    }
9✔
229

230
    pub fn is_bootstrapped(&self) -> bool {
25✔
231
        self.is_bootstrapped
25✔
232
    }
25✔
233

234
    pub fn get_randomx_vm_cnt(&self) -> usize {
16✔
235
        self.randomx_factory.get_count().unwrap_or_default()
16✔
236
    }
16✔
237

238
    pub fn get_randomx_vm_flags(&self) -> RandomXFlag {
16✔
239
        self.randomx_factory.get_flags().unwrap_or_default()
16✔
240
    }
16✔
241

242
    /// Start the base node runtime.
243
    pub async fn run(mut self) {
1✔
244
        use BaseNodeState::{Shutdown, Starting};
245
        let mut state = Starting(states::Starting);
1✔
246
        loop {
247
            if let Shutdown(reason) = &state {
2✔
UNCOV
248
                info!(
×
249
                    target: LOG_TARGET,
×
UNCOV
250
                    "Base Node state machine is shutting down because {}", reason
×
251
                );
UNCOV
252
                break;
×
253
            }
2✔
254

2✔
255
            let interrupt_signal = self.get_interrupt_signal();
2✔
256
            let next_state_future = self.next_state_event(&mut state);
2✔
257

258
            // Get the next `StateEvent`, returning a `UserQuit` state event if the interrupt signal is triggered
259
            let next_event = select_next_state_event(interrupt_signal, next_state_future).await;
2✔
260
            // Publish the event on the event bus
261
            let _size = self.event_publisher.send(Arc::new(next_event.clone()));
1✔
262
            trace!(
1✔
UNCOV
263
                target: LOG_TARGET,
×
UNCOV
264
                "Base Node event in State [{}]:  {}",
×
265
                state,
266
                next_event
267
            );
268
            state = self.transition(state, next_event);
1✔
269
        }
UNCOV
270
    }
×
271

272
    /// Processes and returns the next `StateEvent`
273
    async fn next_state_event(&mut self, state: &mut BaseNodeState) -> StateEvent {
2✔
274
        #[allow(clippy::enum_glob_use)]
275
        use states::BaseNodeState::*;
276
        let shared_state = self;
2✔
277
        match state {
2✔
278
            Starting(s) => s.next_event(shared_state).await,
1✔
279
            HeaderSync(s) => s.next_event(shared_state).await,
×
UNCOV
280
            DecideNextSync(s) => s.next_event(shared_state).await,
×
281
            HorizonStateSync(s) => s.next_event(shared_state).await,
×
282
            BlockSync(s) => s.next_event(shared_state).await,
×
283
            Listening(s, network_silence) => s.next_event(shared_state, *network_silence).await,
1✔
UNCOV
284
            Waiting(s) => s.next_event().await,
×
UNCOV
285
            Shutdown(_) => unreachable!("called get_next_state_event while in Shutdown state"),
×
286
        }
287
    }
1✔
288

289
    pub fn set_primary_bootstrap_complete(&mut self, complete: bool) {
3✔
290
        info!(
3✔
UNCOV
291
            target: LOG_TARGET,
×
UNCOV
292
            "[BN SM UPDATE] Setting primary_bootstrap_complete to {}. Was: {}. Current state: {}",
×
UNCOV
293
            complete,
×
UNCOV
294
            self.is_primary_bootstrap_complete,
×
UNCOV
295
            self.info.short_desc()
×
296
        );
297

298
        self.is_primary_bootstrap_complete = complete;
3✔
299

300
        if let StateInfo::Listening(mut info) = self.info.clone() {
3✔
UNCOV
301
            let had_bootstrap_phase = info.bootstrap_phase.is_some();
×
UNCOV
302
            if complete {
×
UNCOV
303
                info.bootstrap_phase = None;
×
304
            }
×
UNCOV
305
            self.set_state_info(StateInfo::Listening(info));
×
UNCOV
306

×
UNCOV
307
            info!(
×
UNCOV
308
                target: LOG_TARGET,
×
UNCOV
309
                "[BN SM UPDATE] Updated Listening state. Removed bootstrap_phase: {}. Console should now show 'Listening'",
×
310
                had_bootstrap_phase
311
            );
312
        } else {
313
            warn!(
3✔
UNCOV
314
                target: LOG_TARGET,
×
UNCOV
315
                "[BN SM UPDATE] Not in Listening state ({}), bootstrap_complete flag updated but UI unchanged",
×
UNCOV
316
                self.info.short_desc()
×
317
            );
318
        }
319
    }
3✔
320

321
    /// Return a copy of the `interrupt_signal` for this node. This is a `ShutdownSignal` future that will be ready when
322
    /// the node will enter a `Shutdown` state.
323
    pub fn get_interrupt_signal(&self) -> ShutdownSignal {
2✔
324
        self.interrupt_signal.clone()
2✔
325
    }
2✔
326
}
327

328
/// Polls both the interrupt signal and the given future. If the given future `state_fut` is ready first it's value is
329
/// returned, otherwise if the interrupt signal is triggered, `StateEvent::UserQuit` is returned.
330
async fn select_next_state_event<F, I>(interrupt_signal: I, state_fut: F) -> StateEvent
2✔
331
where
2✔
332
    F: Future<Output = StateEvent>,
2✔
333
    I: Future<Output = ()>,
2✔
334
{
2✔
335
    futures::pin_mut!(state_fut);
2✔
336
    futures::pin_mut!(interrupt_signal);
2✔
337
    // If future A and B are both ready `future::select` will prefer A
2✔
338
    match future::select(interrupt_signal, state_fut).await {
2✔
UNCOV
339
        Either::Left(_) => StateEvent::UserQuit,
×
340
        Either::Right((state, _)) => state,
1✔
341
    }
342
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc