• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 15280118615

27 May 2025 04:01PM UTC coverage: 73.59% (+0.4%) from 73.233%
15280118615

push

github

web-flow
feat: add base node HTTP wallet service (#7061)

Description
---
Added a new HTTP server for base node that exposes some wallet related
query functionality.

Current new endpoints (examples on **esmeralda** network):
 - http://127.0.0.1:9005/get_tip_info
 - http://127.0.0.1:9005/get_header_by_height?height=6994
 - http://127.0.0.1:9005/get_height_at_time?time=1747739959

Default ports for http service (by network):
```
MainNet: 9000,
StageNet: 9001,
NextNet: 9002,
LocalNet: 9003,
Igor: 9004,
Esmeralda: 9005,
```

New configuration needs to be set in base node:
```toml
[base_node.http_wallet_query_service]
port = 9000
external_address = "http://127.0.0.1:9000" # this is optional, but if not set, when someone requests for the external address, just returns a None, so wallets can't contact base node
```

Motivation and Context
---


How Has This Been Tested?
---
### Manually

#### Basic test
1. Build new base node
2. Set base node configuration by adding the following:
```toml
[base_node.http_wallet_query_service]
port = 9000
external_address = "http://127.0.0.1:9000"
```
This way we set the port and external address (which is sent to wallet
client when requesting, so in real world it must be public)
3. Set logging level of base node logs to DEBUG
4. Start base node
5. Build and start console wallet
6. See that it is still able to synchronize
7. Check logs of base node (with `tail -f ...` command for instance) and
see that the HTTP endpoints are used

#### Use RPC fallback test
1. Build new base node
2. Set base node configuration by adding the following:
```toml
[base_node.http_wallet_query_service]
port = 9000
external_address = "http://127.0.0.1:9001"
```
This way we set the port and external address (which is sent to wallet
client when requesting, so in real world it must be public)
3. Set logging level of base node logs to DEBUG
4. Start base node
5. Build and start console wallet
6. See that it is still able to synchronize
7. Check logs of base nod... (continued)

9 of 114 new or added lines in 4 files covered. (7.89%)

1592 existing lines in 62 files now uncovered.

82227 of 111736 relevant lines covered (73.59%)

272070.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

14.29
/base_layer/core/src/base_node/sync/rpc/sync_utxos_task.rs
1
// Copyright 2021. The Tari Project
2
//
3
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
// following conditions are met:
5
//
6
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
// disclaimer.
8
//
9
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
// following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
// products derived from this software without specific prior written permission.
14
//
15
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
use std::{
24
    convert::{TryFrom, TryInto},
25
    sync::Arc,
26
    time::Instant,
27
};
28

29
use log::*;
30
use tari_comms::{
31
    peer_manager::NodeId,
32
    protocol::rpc::{Request, RpcStatus, RpcStatusResultExt},
33
    utils,
34
};
35
use tari_utilities::{hex::Hex, ByteArray};
36
use tokio::{sync::mpsc, task};
37

38
#[cfg(feature = "metrics")]
39
use crate::base_node::metrics;
40
use crate::{
41
    blocks::BlockHeader,
42
    chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend},
43
    proto,
44
    proto::base_node::{sync_utxos_response::Txo, SyncUtxosRequest, SyncUtxosResponse},
45
};
46

47
const LOG_TARGET: &str = "c::base_node::sync_rpc::sync_utxo_task";
48

49
pub(crate) struct SyncUtxosTask<B> {
50
    db: AsyncBlockchainDb<B>,
51
    peer_node_id: Arc<NodeId>,
52
}
53

54
impl<B> SyncUtxosTask<B>
55
where B: BlockchainBackend + 'static
56
{
57
    pub(crate) fn new(db: AsyncBlockchainDb<B>, peer_node_id: Arc<NodeId>) -> Self {
2✔
58
        Self { db, peer_node_id }
2✔
59
    }
2✔
60

61
    pub(crate) async fn run(
2✔
62
        self,
2✔
63
        request: Request<SyncUtxosRequest>,
2✔
64
        mut tx: mpsc::Sender<Result<SyncUtxosResponse, RpcStatus>>,
2✔
65
    ) -> Result<(), RpcStatus> {
2✔
66
        let msg = request.into_message();
2✔
67
        let start_hash = msg
2✔
68
            .start_header_hash
2✔
69
            .clone()
2✔
70
            .try_into()
2✔
71
            .rpc_status_bad_request("Invalid header hash")?;
2✔
72

73
        let start_header = self
2✔
74
            .db
2✔
75
            .fetch_header_by_block_hash(start_hash)
2✔
76
            .await
2✔
77
            .rpc_status_internal_error(LOG_TARGET)?
2✔
78
            .ok_or_else(|| RpcStatus::not_found("Start header hash was not found"))?;
2✔
79

80
        let end_hash = msg
1✔
81
            .end_header_hash
1✔
82
            .clone()
1✔
83
            .try_into()
1✔
84
            .rpc_status_bad_request("Invalid header hash")?;
1✔
85

86
        let end_header = self
1✔
87
            .db
1✔
88
            .fetch_header_by_block_hash(end_hash)
1✔
89
            .await
1✔
90
            .rpc_status_internal_error(LOG_TARGET)?
1✔
91
            .ok_or_else(|| RpcStatus::not_found("End header hash was not found"))?;
1✔
UNCOV
92
        if start_header.height > end_header.height {
×
93
            return Err(RpcStatus::bad_request(&format!(
×
94
                "Start header height({}) cannot be greater than the end header height({})",
×
95
                start_header.height, end_header.height
×
96
            )));
×
UNCOV
97
        }
×
UNCOV
98

×
UNCOV
99
        task::spawn(async move {
×
UNCOV
100
            debug!(
×
101
                target: LOG_TARGET,
×
102
                "Starting UTXO stream for peer '{}'", self.peer_node_id
×
103
            );
UNCOV
104
            if let Err(err) = self.start_streaming(&mut tx, start_header, end_header).await {
×
105
                debug!(
×
106
                    target: LOG_TARGET,
×
107
                    "UTXO stream errored for peer '{}': {}", self.peer_node_id, err
×
108
                );
109
                let _result = tx.send(Err(err)).await;
×
UNCOV
110
            }
×
UNCOV
111
            debug!(
×
112
                target: LOG_TARGET,
×
113
                "UTXO stream completed for peer '{}'", self.peer_node_id
×
114
            );
115
            #[cfg(feature = "metrics")]
UNCOV
116
            metrics::active_sync_peers().dec();
×
UNCOV
117
        });
×
UNCOV
118

×
UNCOV
119
        Ok(())
×
120
    }
2✔
121

122
    #[allow(clippy::too_many_lines)]
UNCOV
123
    async fn start_streaming(
×
UNCOV
124
        &self,
×
UNCOV
125
        tx: &mut mpsc::Sender<Result<SyncUtxosResponse, RpcStatus>>,
×
UNCOV
126
        mut current_header: BlockHeader,
×
UNCOV
127
        end_header: BlockHeader,
×
UNCOV
128
    ) -> Result<(), RpcStatus> {
×
UNCOV
129
        debug!(
×
130
            target: LOG_TARGET,
×
131
            "Starting stream task with current_header: {}, end_header: {}",
×
132
            current_header.hash().to_hex(),
×
133
            end_header.hash().to_hex(),
×
134
        );
135

136
        // If this is a pruned node and outputs have been requested for an initial sync, we need to discover and send
137
        // the outputs from the genesis block that have been pruned as well
UNCOV
138
        let mut pruned_genesis_block_outputs = Vec::new();
×
UNCOV
139
        let metadata = self
×
UNCOV
140
            .db
×
UNCOV
141
            .get_chain_metadata()
×
UNCOV
142
            .await
×
UNCOV
143
            .rpc_status_internal_error(LOG_TARGET)?;
×
UNCOV
144
        if current_header.height == 1 && metadata.is_pruned_node() {
×
145
            let genesis_block = self.db.fetch_genesis_block();
×
146
            for output in genesis_block.block().body.outputs() {
×
147
                let output_hash = output.hash();
×
148
                if self
×
149
                    .db
×
150
                    .fetch_output(output_hash)
×
151
                    .await
×
152
                    .rpc_status_internal_error(LOG_TARGET)?
×
153
                    .is_none()
×
154
                {
155
                    trace!(
×
156
                        target: LOG_TARGET,
×
157
                        "Spent genesis TXO (commitment '{}') to peer",
×
158
                        output.commitment.to_hex()
×
159
                    );
160
                    pruned_genesis_block_outputs.push(Ok(SyncUtxosResponse {
×
161
                        txo: Some(Txo::Commitment(output.commitment.as_bytes().to_vec())),
×
162
                        mined_header: current_header.hash().to_vec(),
×
163
                    }));
×
164
                }
×
165
            }
UNCOV
166
        }
×
167

UNCOV
168
        let start_header = current_header.clone();
×
169
        loop {
UNCOV
170
            let timer = Instant::now();
×
UNCOV
171
            let current_header_hash = current_header.hash();
×
UNCOV
172
            debug!(
×
173
                target: LOG_TARGET,
×
174
                "Streaming TXO(s) for block #{} ({})",
×
175
                current_header.height,
×
176
                current_header_hash.to_hex()
×
177
            );
UNCOV
178
            if tx.is_closed() {
×
179
                debug!(target: LOG_TARGET, "Peer '{}' exited TXO sync session early", self.peer_node_id);
×
180
                break;
×
UNCOV
181
            }
×
182

UNCOV
183
            let outputs_with_statuses = self
×
UNCOV
184
                .db
×
UNCOV
185
                .fetch_outputs_in_block_with_spend_state(current_header_hash, Some(end_header.hash()))
×
UNCOV
186
                .await
×
UNCOV
187
                .rpc_status_internal_error(LOG_TARGET)?;
×
UNCOV
188
            if tx.is_closed() {
×
189
                debug!(target: LOG_TARGET, "Peer '{}' exited TXO sync session early", self.peer_node_id);
×
190
                break;
×
UNCOV
191
            }
×
UNCOV
192

×
UNCOV
193
            let mut outputs = Vec::with_capacity(outputs_with_statuses.len());
×
UNCOV
194
            for (output, spent) in outputs_with_statuses {
×
UNCOV
195
                if output.is_burned() {
×
196
                    continue;
×
UNCOV
197
                }
×
UNCOV
198
                if !spent {
×
UNCOV
199
                    match proto::types::TransactionOutput::try_from(output.clone()) {
×
UNCOV
200
                        Ok(tx_ouput) => {
×
UNCOV
201
                            trace!(
×
202
                                target: LOG_TARGET,
×
203
                                "Unspent TXO (commitment '{}') to peer",
×
204
                                output.commitment.to_hex()
×
205
                            );
UNCOV
206
                            outputs.push(Ok(SyncUtxosResponse {
×
UNCOV
207
                                txo: Some(Txo::Output(tx_ouput)),
×
UNCOV
208
                                mined_header: current_header_hash.to_vec(),
×
UNCOV
209
                            }));
×
210
                        },
211
                        Err(e) => {
×
212
                            return Err(RpcStatus::general(&format!(
×
213
                                "Output '{}' RPC conversion error ({})",
×
214
                                output.hash().to_hex(),
×
215
                                e
×
216
                            )))
×
217
                        },
218
                    }
UNCOV
219
                }
×
220
            }
UNCOV
221
            debug!(
×
222
                target: LOG_TARGET,
×
223
                "Adding {} outputs in response for block #{} '{}'", outputs.len(),
×
224
                current_header.height,
225
                current_header_hash
226
            );
227

UNCOV
228
            let inputs_in_block = self
×
UNCOV
229
                .db
×
UNCOV
230
                .fetch_inputs_in_block(current_header_hash)
×
UNCOV
231
                .await
×
UNCOV
232
                .rpc_status_internal_error(LOG_TARGET)?;
×
UNCOV
233
            if tx.is_closed() {
×
234
                debug!(target: LOG_TARGET, "Peer '{}' exited TXO sync session early", self.peer_node_id);
×
235
                break;
×
UNCOV
236
            }
×
UNCOV
237

×
UNCOV
238
            let mut inputs = Vec::with_capacity(inputs_in_block.len());
×
UNCOV
239
            for input in inputs_in_block {
×
UNCOV
240
                let output_from_current_tranche = if let Some(mined_info) = self
×
UNCOV
241
                    .db
×
UNCOV
242
                    .fetch_output(input.output_hash())
×
UNCOV
243
                    .await
×
UNCOV
244
                    .rpc_status_internal_error(LOG_TARGET)?
×
245
                {
UNCOV
246
                    mined_info.mined_height >= start_header.height
×
247
                } else {
248
                    false
×
249
                };
250

UNCOV
251
                if output_from_current_tranche {
×
UNCOV
252
                    trace!(target: LOG_TARGET, "Spent TXO (hash '{}') not sent to peer", input.output_hash().to_hex());
×
253
                } else {
UNCOV
254
                    let input_commitment = match self.db.fetch_output(input.output_hash()).await {
×
UNCOV
255
                        Ok(Some(o)) => o.output.commitment,
×
256
                        Ok(None) => {
257
                            return Err(RpcStatus::general(&format!(
×
258
                                "Mined info for input '{}' not found",
×
259
                                input.output_hash().to_hex()
×
260
                            )))
×
261
                        },
262
                        Err(e) => {
×
263
                            return Err(RpcStatus::general(&format!(
×
264
                                "Input '{}' not found ({})",
×
265
                                input.output_hash().to_hex(),
×
266
                                e
×
267
                            )))
×
268
                        },
269
                    };
UNCOV
270
                    trace!(target: LOG_TARGET, "Spent TXO (commitment '{}') to peer", input_commitment.to_hex());
×
UNCOV
271
                    inputs.push(Ok(SyncUtxosResponse {
×
UNCOV
272
                        txo: Some(Txo::Commitment(input_commitment.as_bytes().to_vec())),
×
UNCOV
273
                        mined_header: current_header_hash.to_vec(),
×
UNCOV
274
                    }));
×
275
                }
276
            }
UNCOV
277
            debug!(
×
278
                target: LOG_TARGET,
×
279
                "Adding {} inputs in response for block #{} '{}'", inputs.len(),
×
280
                current_header.height,
281
                current_header_hash
282
            );
283

UNCOV
284
            let mut txos = Vec::with_capacity(outputs.len() + inputs.len());
×
UNCOV
285
            txos.append(&mut outputs);
×
UNCOV
286
            txos.append(&mut inputs);
×
UNCOV
287
            if start_header == current_header {
×
UNCOV
288
                debug!(
×
289
                    target: LOG_TARGET,
×
290
                    "Adding {} genesis block pruned inputs in response for block #{} '{}'", pruned_genesis_block_outputs.len(),
×
291
                    current_header.height,
292
                    current_header_hash
293
                );
UNCOV
294
                txos.append(&mut pruned_genesis_block_outputs);
×
UNCOV
295
            }
×
UNCOV
296
            let txos = txos.into_iter();
×
UNCOV
297

×
UNCOV
298
            // Ensure task stops if the peer prematurely stops their RPC session
×
UNCOV
299
            let txos_len = txos.len();
×
UNCOV
300
            if utils::mpsc::send_all(tx, txos).await.is_err() {
×
301
                break;
×
UNCOV
302
            }
×
UNCOV
303

×
UNCOV
304
            debug!(
×
305
                target: LOG_TARGET,
×
306
                "Streamed {} TXOs in {:.2?} (including stream backpressure)",
×
307
                txos_len,
×
308
                timer.elapsed()
×
309
            );
310

UNCOV
311
            if current_header.height + 1 > end_header.height {
×
UNCOV
312
                break;
×
UNCOV
313
            }
×
314

UNCOV
315
            current_header = self
×
UNCOV
316
                .db
×
UNCOV
317
                .fetch_header(current_header.height + 1)
×
UNCOV
318
                .await
×
UNCOV
319
                .rpc_status_internal_error(LOG_TARGET)?
×
UNCOV
320
                .ok_or_else(|| {
×
321
                    RpcStatus::general(&format!(
×
322
                        "Potential data consistency issue: header {} not found",
×
323
                        current_header.height + 1
×
324
                    ))
×
UNCOV
325
                })?;
×
326
        }
327

UNCOV
328
        debug!(
×
329
            target: LOG_TARGET,
×
330
            "TXO sync completed to Header hash = {}",
×
331
            current_header.hash().to_hex()
×
332
        );
333

UNCOV
334
        Ok(())
×
UNCOV
335
    }
×
336
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc