• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 16990089413

15 Aug 2025 12:36PM UTC coverage: 54.497% (+0.06%) from 54.441%
16990089413

push

github

web-flow
chore: cleanup indexes (#7411)

Description
---
Forces clean indexs

970 of 2919 new or added lines in 369 files covered. (33.23%)

60 existing lines in 33 files now uncovered.

76698 of 140739 relevant lines covered (54.5%)

193749.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/base_layer/core/src/base_node/state_machine_service/state_machine.rs
1
// Copyright 2019. The Tari Project
2
//
3
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
// following conditions are met:
5
//
6
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
// disclaimer.
8
//
9
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
// following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
// products derived from this software without specific prior written permission.
14
//
15
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22
use std::{future::Future, sync::Arc, time::Duration};
23

24
use futures::{future, future::Either};
25
use log::*;
26
use randomx_rs::RandomXFlag;
27
use serde::{Deserialize, Serialize};
28
use tari_common::configuration::serializers;
29
use tari_comms::{connectivity::ConnectivityRequester, PeerManager};
30
use tari_comms_dht::event::DhtEventReceiver;
31
use tari_shutdown::ShutdownSignal;
32
use tokio::sync::{broadcast, watch};
33

34
use crate::{
35
    base_node::{
36
        chain_metadata_service::ChainMetadataEvent,
37
        comms_interface::LocalNodeCommsInterface,
38
        state_machine_service::{
39
            states,
40
            states::{BaseNodeState, HeaderSyncState, StateEvent, StateInfo, StatusInfo, SyncStatus},
41
        },
42
        sync::{BlockchainSyncConfig, SyncValidators},
43
    },
44
    chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend},
45
    consensus::ConsensusManager,
46
    proof_of_work::randomx_factory::RandomXFactory,
47
};
48
const LOG_TARGET: &str = "c::bn::base_node";
49

50
/// Configuration for the BaseNodeStateMachine.
51
#[derive(Clone, Serialize, Deserialize, Debug)]
×
52
#[serde(deny_unknown_fields)]
53
pub struct BaseNodeStateMachineConfig {
54
    pub blockchain_sync_config: BlockchainSyncConfig,
55
    /// The maximum amount of VMs that RandomX will be use
56
    /// The amount of blocks this node can be behind a peer before considered to be lagging (to test the block
57
    /// propagation by delaying lagging)
58
    pub blocks_behind_before_considered_lagging: u64,
59
    /// The amount of time this node can know about a stronger chain before considered to be lagging.
60
    /// This is to give a node time to receive the block via propagation, which is usually less network
61
    /// intensive. Be careful of setting this higher than the block time, which would potentially cause it
62
    /// to always be behind the network
63
    #[serde(with = "serializers::seconds")]
64
    pub time_before_considered_lagging: Duration,
65
    /// This is the amount of metadata events that a node will wait for before decide to start syncing for a peer,
66
    /// choosing the best peer out of the list
67
    pub initial_sync_peer_count: u64,
68
}
69

70
#[allow(clippy::derivable_impls)]
71
impl Default for BaseNodeStateMachineConfig {
72
    fn default() -> Self {
×
73
        Self {
×
74
            blockchain_sync_config: Default::default(),
×
75
            blocks_behind_before_considered_lagging: 1,
×
76
            time_before_considered_lagging: Duration::from_secs(10),
×
77
            initial_sync_peer_count: 5,
×
78
        }
×
79
    }
×
80
}
81

82
/// A Minotari full node, aka Base Node.
83
///
84
/// This service is essentially a finite state machine that synchronises its blockchain state with its peers and
85
/// then listens for new blocks to add to the blockchain. See the [SynchronizationState] documentation for more details.
86
///
87
/// This struct holds fields that will be used by all the various FSM state instances, including the local blockchain
88
/// database and hooks to the p2p network
89
pub struct BaseNodeStateMachine<B: BlockchainBackend> {
90
    pub(super) db: AsyncBlockchainDb<B>,
91
    pub(super) local_node_interface: LocalNodeCommsInterface,
92
    pub(super) connectivity: ConnectivityRequester,
93
    pub(super) metadata_event_stream: broadcast::Receiver<Arc<ChainMetadataEvent>>,
94
    pub(super) peer_manager: Arc<PeerManager>,
95
    pub(super) dht_event_stream: DhtEventReceiver,
96
    pub(super) config: BaseNodeStateMachineConfig,
97
    pub(super) info: StateInfo,
98
    pub(super) sync_validators: SyncValidators<B>,
99
    pub(super) consensus_rules: ConsensusManager,
100
    pub(super) status_event_sender: Arc<watch::Sender<StatusInfo>>,
101
    pub(super) randomx_factory: RandomXFactory,
102
    is_bootstrapped: bool,
103
    pub(super) is_primary_bootstrap_complete: bool,
104
    event_publisher: broadcast::Sender<Arc<StateEvent>>,
105
    interrupt_signal: ShutdownSignal,
106
}
107

108
impl<B: BlockchainBackend + 'static> BaseNodeStateMachine<B> {
109
    // Instantiate a new Base Node.
110
    #[allow(clippy::too_many_arguments)]
111
    pub fn new(
×
112
        db: AsyncBlockchainDb<B>,
×
113
        local_node_interface: LocalNodeCommsInterface,
×
114
        connectivity: ConnectivityRequester,
×
115
        metadata_event_stream: broadcast::Receiver<Arc<ChainMetadataEvent>>,
×
116
        peer_manager: Arc<PeerManager>,
×
117
        dht_event_stream: DhtEventReceiver, // New parameter
×
118
        config: BaseNodeStateMachineConfig,
×
119
        sync_validators: SyncValidators<B>,
×
120
        status_event_sender: watch::Sender<StatusInfo>,
×
121
        event_publisher: broadcast::Sender<Arc<StateEvent>>,
×
122
        randomx_factory: RandomXFactory,
×
123
        consensus_rules: ConsensusManager,
×
124
        interrupt_signal: ShutdownSignal,
×
125
    ) -> Self {
×
126
        Self {
×
127
            db,
×
128
            local_node_interface,
×
129
            connectivity,
×
130
            metadata_event_stream,
×
131
            peer_manager,
×
132
            dht_event_stream,
×
133
            config,
×
134
            info: StateInfo::StartUp,
×
135
            event_publisher,
×
136
            status_event_sender: Arc::new(status_event_sender),
×
137
            sync_validators,
×
138
            randomx_factory,
×
139
            is_bootstrapped: false,
×
140
            is_primary_bootstrap_complete: false,
×
141
            consensus_rules,
×
142
            interrupt_signal,
×
143
        }
×
144
    }
×
145

146
    /// Describe the Finite State Machine for the base node. This function describes _every possible_ state
147
    /// transition for the node given its current state and an event that gets triggered.
148
    pub fn transition(&self, state: BaseNodeState, event: StateEvent) -> BaseNodeState {
×
149
        let db = self.db.inner();
×
150
        #[allow(clippy::enum_glob_use)]
151
        use self::{BaseNodeState::*, StateEvent::*, SyncStatus::Lagging};
152
        match (state, event) {
×
153
            (Starting(s), Initialized(network_silence)) => Listening(s.into(), network_silence),
×
154
            (
155
                Listening(..),
156
                FallenBehind(Lagging {
157
                    local: local_metadata,
×
158
                    sync_peers,
×
159
                    ..
×
160
                }),
×
161
            ) => {
×
162
                db.set_disable_add_block_flag();
×
163
                HeaderSync(HeaderSyncState::new(sync_peers, local_metadata))
×
164
            },
165
            (HeaderSync(s), HeaderSyncFailed(_err)) => {
×
166
                db.clear_disable_add_block_flag();
×
167
                Waiting(s.into())
×
168
            },
169
            (HeaderSync(s), Continue | NetworkSilence) => {
×
170
                db.clear_disable_add_block_flag();
×
171
                Listening(s.into(), false)
×
172
            },
173
            (HeaderSync(s), HeadersSynchronized(..)) => DecideNextSync(s.into()),
×
174

175
            (DecideNextSync(_), ProceedToHorizonSync(peers)) => HorizonStateSync(peers.into()),
×
176
            (DecideNextSync(s), Continue) => {
×
177
                db.clear_disable_add_block_flag();
×
178
                Listening(s.into(), false)
×
179
            },
180
            (HorizonStateSync(s), HorizonStateSynchronized) => BlockSync(s.into()),
×
181
            (HorizonStateSync(s), HorizonStateSyncFailure) => {
×
182
                db.clear_disable_add_block_flag();
×
183
                Waiting(s.into())
×
184
            },
185

186
            (DecideNextSync(_), ProceedToBlockSync(peers)) => BlockSync(peers.into()),
×
187
            (BlockSync(s), BlocksSynchronized) => {
×
188
                db.clear_disable_add_block_flag();
×
189
                Listening(s.into(), false)
×
190
            },
191
            (BlockSync(s), BlockSyncFailed) => {
×
192
                db.clear_disable_add_block_flag();
×
193
                Waiting(s.into())
×
194
            },
195

196
            (Waiting(s), Continue) => Listening(s.into(), false),
×
197
            (_, FatalError(s)) => panic!("{}", s),
×
198
            (_, UserQuit) => Shutdown(states::Shutdown::with_reason("Shutdown initiated by user".to_string())),
×
199
            (s, e) => {
×
200
                warn!(
×
201
                    target: LOG_TARGET,
×
NEW
202
                    "No state transition occurs for event {e:?} in state {s}"
×
203
                );
204
                s
×
205
            },
206
        }
207
    }
×
208

209
    /// This function will publish the current StatusInfo to the channel
210
    pub fn publish_event_info(&self) {
×
211
        let status = StatusInfo {
×
212
            bootstrapped: self.is_bootstrapped(),
×
213
            state_info: self.info.clone(),
×
214
            randomx_vm_cnt: self.randomx_factory.get_count().unwrap_or(0),
×
215
            randomx_vm_flags: self.randomx_factory.get_flags().unwrap_or_default(),
×
216
        };
×
217

218
        if let Err(e) = self.status_event_sender.send(status) {
×
NEW
219
            debug!(target: LOG_TARGET, "Error broadcasting a StatusEvent update: {e}");
×
220
        }
×
221
    }
×
222

223
    /// Sets the StatusInfo.
224
    pub fn set_state_info(&mut self, info: StateInfo) {
×
225
        self.info = info;
×
226
        if self.info.is_synced() && !self.is_bootstrapped {
×
227
            debug!(target: LOG_TARGET, "Node has bootstrapped");
×
228
            self.is_bootstrapped = true;
×
229
        }
×
230
        self.publish_event_info();
×
231
    }
×
232

233
    pub fn is_bootstrapped(&self) -> bool {
×
234
        self.is_bootstrapped
×
235
    }
×
236

237
    pub fn get_randomx_vm_cnt(&self) -> usize {
×
238
        self.randomx_factory.get_count().unwrap_or_default()
×
239
    }
×
240

241
    pub fn get_randomx_vm_flags(&self) -> RandomXFlag {
×
242
        self.randomx_factory.get_flags().unwrap_or_default()
×
243
    }
×
244

245
    /// Start the base node runtime.
246
    pub async fn run(mut self) {
×
247
        use BaseNodeState::{Shutdown, Starting};
248
        let mut state = Starting(states::Starting);
×
249
        loop {
250
            if let Shutdown(reason) = &state {
×
251
                info!(
×
252
                    target: LOG_TARGET,
×
NEW
253
                    "Base Node state machine is shutting down because {reason}"
×
254
                );
255
                break;
×
256
            }
×
257

×
258
            let interrupt_signal = self.get_interrupt_signal();
×
259
            let next_state_future = self.next_state_event(&mut state);
×
260

261
            // Get the next `StateEvent`, returning a `UserQuit` state event if the interrupt signal is triggered
262
            let next_event = select_next_state_event(interrupt_signal, next_state_future).await;
×
263
            // Publish the event on the event bus
264
            let _size = self.event_publisher.send(Arc::new(next_event.clone()));
×
265
            trace!(
×
266
                target: LOG_TARGET,
×
NEW
267
                "Base Node event in State [{state}]:  {next_event}"
×
268
            );
269
            state = self.transition(state, next_event);
×
270
        }
271
    }
×
272

273
    /// Processes and returns the next `StateEvent`
274
    async fn next_state_event(&mut self, state: &mut BaseNodeState) -> StateEvent {
×
275
        #[allow(clippy::enum_glob_use)]
276
        use states::BaseNodeState::*;
277
        let shared_state = self;
×
278
        match state {
×
279
            Starting(s) => s.next_event(shared_state).await,
×
280
            HeaderSync(s) => s.next_event(shared_state).await,
×
281
            DecideNextSync(s) => s.next_event(shared_state).await,
×
282
            HorizonStateSync(s) => s.next_event(shared_state).await,
×
283
            BlockSync(s) => s.next_event(shared_state).await,
×
284
            Listening(s, network_silence) => s.next_event(shared_state, *network_silence).await,
×
285
            Waiting(s) => s.next_event().await,
×
286
            Shutdown(_) => unreachable!("called get_next_state_event while in Shutdown state"),
×
287
        }
288
    }
×
289

290
    pub fn set_primary_bootstrap_complete(&mut self, complete: bool) {
×
291
        info!(
×
292
            target: LOG_TARGET,
×
293
            "[BN SM UPDATE] Setting primary_bootstrap_complete to {}. Was: {}. Current state: {}",
×
294
            complete,
×
295
            self.is_primary_bootstrap_complete,
×
296
            self.info.short_desc()
×
297
        );
298

299
        self.is_primary_bootstrap_complete = complete;
×
300

301
        if let StateInfo::Listening(mut info) = self.info.clone() {
×
302
            let had_bootstrap_phase = info.bootstrap_phase.is_some();
×
303
            if complete {
×
304
                info.bootstrap_phase = None;
×
305
            }
×
306
            self.set_state_info(StateInfo::Listening(info));
×
307

×
308
            info!(
×
309
                target: LOG_TARGET,
×
NEW
310
                "[BN SM UPDATE] Updated Listening state. Removed bootstrap_phase: {had_bootstrap_phase}. Console should now show 'Listening'"
×
311
            );
312
        } else {
313
            debug!(
×
314
                target: LOG_TARGET,
×
315
                "[BN SM UPDATE] Not in Listening state ({}), bootstrap_complete flag updated but UI unchanged",
×
316
                self.info.short_desc()
×
317
            );
318
        }
319
    }
×
320

321
    /// Return a copy of the `interrupt_signal` for this node. This is a `ShutdownSignal` future that will be ready when
322
    /// the node will enter a `Shutdown` state.
323
    pub fn get_interrupt_signal(&self) -> ShutdownSignal {
×
324
        self.interrupt_signal.clone()
×
325
    }
×
326
}
327

328
/// Polls both the interrupt signal and the given future. If the given future `state_fut` is ready first it's value is
329
/// returned, otherwise if the interrupt signal is triggered, `StateEvent::UserQuit` is returned.
330
async fn select_next_state_event<F, I>(interrupt_signal: I, state_fut: F) -> StateEvent
×
331
where
×
332
    F: Future<Output = StateEvent>,
×
333
    I: Future<Output = ()>,
×
334
{
×
335
    futures::pin_mut!(state_fut);
×
336
    futures::pin_mut!(interrupt_signal);
×
337
    // If future A and B are both ready `future::select` will prefer A
×
338
    match future::select(interrupt_signal, state_fut).await {
×
339
        Either::Left(_) => StateEvent::UserQuit,
×
340
        Either::Right((state, _)) => state,
×
341
    }
342
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc