• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 16990089413

15 Aug 2025 12:36PM UTC coverage: 54.497% (+0.06%) from 54.441%
16990089413

push

github

web-flow
chore: cleanup indexes (#7411)

Description
---
Forces clean indexs

970 of 2919 new or added lines in 369 files covered. (33.23%)

60 existing lines in 33 files now uncovered.

76698 of 140739 relevant lines covered (54.5%)

193749.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/base_layer/wallet/src/base_node_service/monitor.rs
1
//  Copyright 2021, The Tari Project
2
//
3
//  Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
//  following conditions are met:
5
//
6
//  1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
//  disclaimer.
8
//
9
//  2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
//  following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
//  3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
//  products derived from this software without specific prior written permission.
14
//
15
//  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
//  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
//  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
//  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
//  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
//  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
//  USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22
use std::{sync::Arc, time::Duration};
23

24
use chrono::Utc;
25
use log::*;
26
use minotari_node_wallet_client::BaseNodeWalletClient;
27
use tari_common_types::types::FixedHash;
28
use tari_comms::protocol::rpc::RpcError;
29
use tari_shutdown::ShutdownSignal;
30
use tokio::{select, sync::RwLock, time::interval};
31

32
use crate::{
33
    base_node_service::{
34
        handle::{BaseNodeEvent, BaseNodeEventSender},
35
        service::BaseNodeState,
36
    },
37
    connectivity_service::WalletConnectivityInterface,
38
    error::WalletStorageError,
39
};
40

41
const LOG_TARGET: &str = "wallet::base_node_service::chain_metadata_monitor";
42

43
pub struct BaseNodeMonitor<TWalletConnectivity> {
44
    state: Arc<RwLock<BaseNodeState>>,
45
    wallet_connectivity: TWalletConnectivity,
46
    event_publisher: BaseNodeEventSender,
47
}
48

49
impl<TWalletConnectivity> BaseNodeMonitor<TWalletConnectivity>
50
where TWalletConnectivity: WalletConnectivityInterface
51
{
52
    pub fn new(
×
53
        state: Arc<RwLock<BaseNodeState>>,
×
54
        wallet_connectivity: TWalletConnectivity,
×
55
        event_publisher: BaseNodeEventSender,
×
56
    ) -> Self {
×
57
        Self {
×
58
            state,
×
59
            wallet_connectivity,
×
60
            event_publisher,
×
61
        }
×
62
    }
×
63

64
    pub async fn run(mut self, shutdown_signal: ShutdownSignal) {
×
65
        match self.monitor_node(shutdown_signal).await {
×
66
            Ok(_) => {
67
                debug!(
×
68
                    target: LOG_TARGET,
×
69
                    "Wallet Base Node Service chain metadata task completed successfully"
×
70
                );
71
            },
72

73
            Err(e @ BaseNodeMonitorError::RpcFailed(_)) => {
×
NEW
74
                warn!(target: LOG_TARGET, "Connectivity failure to base node: {e}");
×
75
                self.update_state(BaseNodeState {
×
76
                    chain_metadata: None,
×
77
                    is_synced: None,
×
78
                    updated: None,
×
79
                    latency: None,
×
80
                })
×
81
                .await;
×
82
            },
83
            Err(e @ BaseNodeMonitorError::InvalidBaseNodeResponse(_)) |
×
84
            Err(e @ BaseNodeMonitorError::WalletStorageError(_)) => {
×
NEW
85
                error!(target: LOG_TARGET, "{e}");
×
86
            },
87
        }
88
    }
×
89

90
    async fn monitor_node(&mut self, mut shutdown_signal: ShutdownSignal) -> Result<(), BaseNodeMonitorError> {
×
91
        let mut interval = interval(Duration::from_secs(10));
×
92
        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
×
93
        let mut last_checked_hash = FixedHash::zero();
×
94

95
        loop {
96
            select! {
×
97

98
                        _ = shutdown_signal.wait() => {
×
99
                                return Ok(());
×
100
                        },
101
                        _ = interval.tick() => {
×
102
                            // continue to the next iteration
103
                    let  client = self.wallet_connectivity.obtain_base_node_wallet_rpc_client().await;
×
104

105

106

107
                    let tip_info = client
×
108
                        .get_tip_info()
×
109
                        .await
×
110
                        .map_err(|e| BaseNodeMonitorError::InvalidBaseNodeResponse(e.to_string()))?;
×
111
                    let chain_metadata = tip_info
×
112
                        .metadata
×
113
                        .ok_or_else(|| BaseNodeMonitorError::InvalidBaseNodeResponse("Tip info no metadata".to_string()))?;
×
114

115
                    let latency = match client.get_last_request_latency().await {
×
116
                        Some(latency) => latency,
×
117
                        None => {
118
                            continue;
×
119
                        },
120
                    };
121
                    debug!(
×
122
                        target: LOG_TARGET,
×
123
                        "Base node height:{} latency: {} ms",
×
124
                        chain_metadata.best_block_height(),
×
125
                        latency.as_millis()
×
126
                    );
127

128
                    let is_synced = tip_info.is_synced;
×
129
                    let best_block_height = chain_metadata.best_block_height();
×
130
                    trace!(
×
131
                        target: LOG_TARGET,
×
132
                        "Base node Tip: {} ({}) Latency: {} ms",
×
133
                        best_block_height,
×
134
                        if is_synced { "Synced" } else { "Syncing..." },
×
135
                        latency.as_millis()
×
136
                    );
137

138
                    let tip_hash = chain_metadata.best_block_hash();
×
139
                    if last_checked_hash == *tip_hash {
×
140
                        // no new block, continue to the next iteration
141
                        continue;
×
142
                    }
×
143
                    last_checked_hash = *tip_hash;
×
144

×
145
                    self
×
146
                        .update_state(BaseNodeState {
×
147
                            chain_metadata: Some(chain_metadata),
×
148
                            is_synced: Some(is_synced),
×
149
                            updated: Some(Utc::now()),
×
150
                            latency: Some(latency),
×
151
                        })
×
152
                        .await;
×
153

154

155

156
               }
157
            }
158
        }
159

160
        // loop only exits on shutdown/error
161
        #[allow(unreachable_code)]
162
        Ok(())
163
    }
×
164

165
    // returns true if a new block, otherwise false
166
    async fn update_state(&self, new_state: BaseNodeState) {
×
167
        let mut lock = self.state.write().await;
×
168

169
        *lock = new_state.clone();
×
170

×
171
        self.publish_event(BaseNodeEvent::BaseNodeStateChanged(new_state));
×
172
    }
×
173

174
    fn publish_event(&self, event: BaseNodeEvent) {
×
175
        let _size = self.event_publisher.send(Arc::new(event));
×
176
    }
×
177
}
178

179
#[derive(thiserror::Error, Debug)]
180
enum BaseNodeMonitorError {
181
    #[error("Rpc error: {0}")]
182
    RpcFailed(#[from] RpcError),
183
    #[error("Invalid base node response: {0}")]
184
    InvalidBaseNodeResponse(String),
185
    #[error("Wallet storage error: {0}")]
186
    WalletStorageError(#[from] WalletStorageError),
187
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc