• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 15280118615

27 May 2025 04:01PM UTC coverage: 73.59% (+0.4%) from 73.233%
15280118615

push

github

web-flow
feat: add base node HTTP wallet service (#7061)

Description
---
Added a new HTTP server for base node that exposes some wallet related
query functionality.

Current new endpoints (examples on **esmeralda** network):
 - http://127.0.0.1:9005/get_tip_info
 - http://127.0.0.1:9005/get_header_by_height?height=6994
 - http://127.0.0.1:9005/get_height_at_time?time=1747739959

Default ports for http service (by network):
```
MainNet: 9000,
StageNet: 9001,
NextNet: 9002,
LocalNet: 9003,
Igor: 9004,
Esmeralda: 9005,
```

New configuration needs to be set in base node:
```toml
[base_node.http_wallet_query_service]
port = 9000
external_address = "http://127.0.0.1:9000" # this is optional, but if not set, when someone requests for the external address, just returns a None, so wallets can't contact base node
```

Motivation and Context
---


How Has This Been Tested?
---
### Manually

#### Basic test
1. Build new base node
2. Set base node configuration by adding the following:
```toml
[base_node.http_wallet_query_service]
port = 9000
external_address = "http://127.0.0.1:9000"
```
This way we set the port and external address (which is sent to wallet
client when requesting, so in real world it must be public)
3. Set logging level of base node logs to DEBUG
4. Start base node
5. Build and start console wallet
6. See that it is still able to synchronize
7. Check logs of base node (with `tail -f ...` command for instance) and
see that the HTTP endpoints are used

#### Use RPC fallback test
1. Build new base node
2. Set base node configuration by adding the following:
```toml
[base_node.http_wallet_query_service]
port = 9000
external_address = "http://127.0.0.1:9001"
```
This way we set the port and external address (which is sent to wallet
client when requesting, so in real world it must be public)
3. Set logging level of base node logs to DEBUG
4. Start base node
5. Build and start console wallet
6. See that it is still able to synchronize
7. Check logs of base nod... (continued)

9 of 114 new or added lines in 4 files covered. (7.89%)

1592 existing lines in 62 files now uncovered.

82227 of 111736 relevant lines covered (73.59%)

272070.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

51.19
/base_layer/core/src/base_node/rpc/service.rs
1
// Copyright 2025 The Tari Project
2
// SPDX-License-Identifier: BSD-3-Clause
3

4
use std::convert::{TryFrom, TryInto};
5

6
use log::*;
7
use tari_common_types::types::{FixedHash, Signature};
8
use tari_comms::protocol::rpc::{Request, Response, RpcStatus, RpcStatusResultExt, Streaming};
9
use tari_utilities::hex::Hex;
10
use tokio::sync::mpsc;
11
use url::Url;
12

13
use crate::{
14
    base_node::{
15
        rpc::{sync_utxos_by_block_task::SyncUtxosByBlockTask, BaseNodeWalletService},
16
        state_machine_service::states::StateInfo,
17
        StateMachineHandle,
18
    },
19
    chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend},
20
    mempool::{service::MempoolHandle, TxStorageResponse},
21
    proto,
22
    proto::{
23
        base_node::{
24
            FetchMatchingUtxos,
25
            FetchUtxosResponse,
26
            GetMempoolFeePerGramStatsRequest,
27
            GetMempoolFeePerGramStatsResponse,
28
            GetWalletQueryHttpServiceAddressResponse,
29
            QueryDeletedData,
30
            QueryDeletedRequest,
31
            QueryDeletedResponse,
32
            Signatures as SignaturesProto,
33
            SyncUtxosByBlockRequest,
34
            SyncUtxosByBlockResponse,
35
            TipInfoResponse,
36
            TxLocation,
37
            TxQueryBatchResponse,
38
            TxQueryBatchResponses,
39
            TxQueryResponse,
40
            TxSubmissionRejectionReason,
41
            TxSubmissionResponse,
42
            UtxoQueryRequest,
43
            UtxoQueryResponse,
44
            UtxoQueryResponses,
45
        },
46
        types::{Signature as SignatureProto, Transaction as TransactionProto},
47
    },
48
    transactions::transaction_components::Transaction,
49
};
50

51
const LOG_TARGET: &str = "c::base_node::rpc";
52
const MAX_QUERY_DELETED_HASHES: usize = 1000;
53

54
pub struct BaseNodeWalletRpcService<B> {
55
    db: AsyncBlockchainDb<B>,
56
    mempool: MempoolHandle,
57
    state_machine: StateMachineHandle,
58
    wallet_query_service_address: Option<Url>,
59
}
60

61
impl<B: BlockchainBackend + 'static> BaseNodeWalletRpcService<B> {
62
    pub fn new(
3✔
63
        db: AsyncBlockchainDb<B>,
3✔
64
        mempool: MempoolHandle,
3✔
65
        state_machine: StateMachineHandle,
3✔
66
        wallet_query_service_address: Option<Url>,
3✔
67
    ) -> Self {
3✔
68
        Self {
3✔
69
            db,
3✔
70
            mempool,
3✔
71
            state_machine,
3✔
72
            wallet_query_service_address,
3✔
73
        }
3✔
74
    }
3✔
75

76
    #[inline]
77
    fn db(&self) -> AsyncBlockchainDb<B> {
86✔
78
        self.db.clone()
86✔
79
    }
86✔
80

81
    #[inline]
82
    pub fn mempool(&self) -> MempoolHandle {
8✔
83
        self.mempool.clone()
8✔
84
    }
8✔
85

86
    #[inline]
87
    pub fn state_machine(&self) -> StateMachineHandle {
16✔
88
        self.state_machine.clone()
16✔
89
    }
16✔
90

91
    async fn fetch_kernel(&self, signature: Signature) -> Result<TxQueryResponse, RpcStatus> {
6✔
92
        let db = self.db();
6✔
93
        let chain_metadata = db.get_chain_metadata().await.rpc_status_internal_error(LOG_TARGET)?;
6✔
94
        let state_machine = self.state_machine();
6✔
95

6✔
96
        // Determine if we are synced
6✔
97
        let status_watch = state_machine.get_status_info_watch();
6✔
98
        let is_synced = match (status_watch.borrow()).state_info {
6✔
99
            StateInfo::Listening(li) => li.is_synced(),
6✔
100
            _ => false,
×
101
        };
102
        match db
6✔
103
            .fetch_kernel_by_excess_sig(signature.clone())
6✔
104
            .await
6✔
105
            .rpc_status_internal_error(LOG_TARGET)?
6✔
106
        {
107
            None => (),
4✔
108
            Some((_, block_hash)) => {
2✔
109
                match db
2✔
110
                    .fetch_header_by_block_hash(block_hash)
2✔
111
                    .await
2✔
112
                    .rpc_status_internal_error(LOG_TARGET)?
2✔
113
                {
114
                    None => (),
×
115
                    Some(header) => {
2✔
116
                        let confirmations = chain_metadata.best_block_height().saturating_sub(header.height);
2✔
117
                        let response = TxQueryResponse {
2✔
118
                            location: TxLocation::Mined as i32,
2✔
119
                            best_block_hash: block_hash.to_vec(),
2✔
120
                            confirmations,
2✔
121
                            is_synced,
2✔
122
                            best_block_height: chain_metadata.best_block_height(),
2✔
123
                            mined_timestamp: header.timestamp.as_u64(),
2✔
124
                        };
2✔
125
                        return Ok(response);
2✔
126
                    },
127
                }
128
            },
129
        };
130

131
        // If not in a block then check the mempool
132
        let mut mempool = self.mempool();
4✔
133
        let mempool_response = match mempool
4✔
134
            .get_tx_state_by_excess_sig(signature.clone())
4✔
135
            .await
4✔
136
            .rpc_status_internal_error(LOG_TARGET)?
4✔
137
        {
138
            TxStorageResponse::UnconfirmedPool => TxQueryResponse {
2✔
139
                location: TxLocation::InMempool as i32,
2✔
140
                best_block_hash: vec![],
2✔
141
                confirmations: 0,
2✔
142
                is_synced,
2✔
143
                best_block_height: chain_metadata.best_block_height(),
2✔
144
                mined_timestamp: 0,
2✔
145
            },
2✔
146
            TxStorageResponse::ReorgPool |
147
            TxStorageResponse::NotStoredOrphan |
148
            TxStorageResponse::NotStoredTimeLocked |
149
            TxStorageResponse::NotStoredAlreadySpent |
150
            TxStorageResponse::NotStoredConsensus |
151
            TxStorageResponse::NotStored |
152
            TxStorageResponse::NotStoredFeeTooLow |
153
            TxStorageResponse::NotStoredAlreadyMined => TxQueryResponse {
2✔
154
                location: TxLocation::NotStored as i32,
2✔
155
                best_block_hash: vec![],
2✔
156
                confirmations: 0,
2✔
157
                is_synced,
2✔
158
                best_block_height: chain_metadata.best_block_height(),
2✔
159
                mined_timestamp: 0,
2✔
160
            },
2✔
161
        };
162
        Ok(mempool_response)
4✔
163
    }
6✔
164
}
165

166
#[tari_comms::async_trait]
167
impl<B: BlockchainBackend + 'static> BaseNodeWalletService for BaseNodeWalletRpcService<B> {
168
    async fn submit_transaction(
169
        &self,
170
        request: Request<TransactionProto>,
171
    ) -> Result<Response<TxSubmissionResponse>, RpcStatus> {
4✔
172
        let message = request.into_message();
4✔
173
        let transaction =
4✔
174
            Transaction::try_from(message).map_err(|_| RpcStatus::bad_request("Transaction was invalid"))?;
4✔
175
        let mut mempool = self.mempool();
4✔
176
        let state_machine = self.state_machine();
4✔
177

4✔
178
        // Determine if we are synced
4✔
179
        let status_watch = state_machine.get_status_info_watch();
4✔
180
        let is_synced = match (status_watch.borrow()).state_info {
4✔
181
            StateInfo::Listening(li) => li.is_synced(),
4✔
182
            _ => false,
×
183
        };
184

185
        let response = match mempool
4✔
186
            .submit_transaction(transaction.clone())
4✔
187
            .await
4✔
188
            .rpc_status_internal_error(LOG_TARGET)?
4✔
189
        {
190
            TxStorageResponse::UnconfirmedPool => TxSubmissionResponse {
1✔
191
                accepted: true,
1✔
192
                rejection_reason: TxSubmissionRejectionReason::None.into(),
1✔
193
                is_synced,
1✔
194
            },
1✔
195

196
            TxStorageResponse::NotStoredOrphan => TxSubmissionResponse {
1✔
197
                accepted: false,
1✔
198
                rejection_reason: TxSubmissionRejectionReason::Orphan.into(),
1✔
199
                is_synced,
1✔
200
            },
1✔
201
            TxStorageResponse::NotStoredFeeTooLow => TxSubmissionResponse {
×
202
                accepted: false,
×
203
                rejection_reason: TxSubmissionRejectionReason::FeeTooLow.into(),
×
204
                is_synced,
×
205
            },
×
206
            TxStorageResponse::NotStoredTimeLocked => TxSubmissionResponse {
×
207
                accepted: false,
×
208
                rejection_reason: TxSubmissionRejectionReason::TimeLocked.into(),
×
209
                is_synced,
×
210
            },
×
211
            TxStorageResponse::NotStoredConsensus | TxStorageResponse::NotStored => TxSubmissionResponse {
×
212
                accepted: false,
×
213
                rejection_reason: TxSubmissionRejectionReason::ValidationFailed.into(),
×
214
                is_synced,
×
215
            },
×
216
            TxStorageResponse::NotStoredAlreadySpent |
217
            TxStorageResponse::ReorgPool |
218
            TxStorageResponse::NotStoredAlreadyMined => {
219
                // Is this transaction a double spend or has this transaction been mined?
220
                match transaction.first_kernel_excess_sig() {
2✔
221
                    None => TxSubmissionResponse {
×
222
                        accepted: false,
×
223
                        rejection_reason: TxSubmissionRejectionReason::DoubleSpend.into(),
×
224
                        is_synced,
×
225
                    },
×
226
                    Some(s) => {
2✔
227
                        // Check to see if the kernel exists in the blockchain db in which case this exact transaction
2✔
228
                        // already exists in the chain, otherwise it is a double spend
2✔
229
                        let db = self.db();
2✔
230
                        match db
2✔
231
                            .fetch_kernel_by_excess_sig(s.clone())
2✔
232
                            .await
2✔
233
                            .rpc_status_internal_error(LOG_TARGET)?
2✔
234
                        {
235
                            None => TxSubmissionResponse {
1✔
236
                                accepted: false,
1✔
237
                                rejection_reason: TxSubmissionRejectionReason::DoubleSpend.into(),
1✔
238
                                is_synced,
1✔
239
                            },
1✔
240
                            Some(_) => TxSubmissionResponse {
1✔
241
                                accepted: false,
1✔
242
                                rejection_reason: TxSubmissionRejectionReason::AlreadyMined.into(),
1✔
243
                                is_synced,
1✔
244
                            },
1✔
245
                        }
246
                    },
247
                }
248
            },
249
        };
250
        Ok(Response::new(response))
4✔
251
    }
8✔
252

253
    async fn transaction_query(
254
        &self,
255
        request: Request<SignatureProto>,
256
    ) -> Result<Response<TxQueryResponse>, RpcStatus> {
4✔
257
        let state_machine = self.state_machine();
4✔
258

4✔
259
        // Determine if we are synced
4✔
260
        let status_watch = state_machine.get_status_info_watch();
4✔
261
        let is_synced = match status_watch.borrow().state_info {
4✔
262
            StateInfo::Listening(li) => li.is_synced(),
4✔
263
            _ => false,
×
264
        };
265

266
        let message = request.into_message();
4✔
267
        let signature = Signature::try_from(message).map_err(|_| RpcStatus::bad_request("Signature was invalid"))?;
4✔
268

269
        let mut response = self.fetch_kernel(signature).await?;
4✔
270
        response.is_synced = is_synced;
4✔
271
        Ok(Response::new(response))
4✔
272
    }
8✔
273

274
    async fn transaction_batch_query(
275
        &self,
276
        request: Request<SignaturesProto>,
277
    ) -> Result<Response<TxQueryBatchResponses>, RpcStatus> {
1✔
278
        let state_machine = self.state_machine();
1✔
279

1✔
280
        // Determine if we are synced
1✔
281
        let status_watch = state_machine.get_status_info_watch();
1✔
282
        let is_synced = match (status_watch.borrow()).state_info {
1✔
283
            StateInfo::Listening(li) => li.is_synced(),
1✔
284
            _ => false,
×
285
        };
286

287
        let message = request.into_message();
1✔
288

1✔
289
        let mut responses: Vec<TxQueryBatchResponse> = Vec::new();
1✔
290

291
        let metadata = self
1✔
292
            .db
1✔
293
            .get_chain_metadata()
1✔
294
            .await
1✔
295
            .rpc_status_internal_error(LOG_TARGET)?;
1✔
296

297
        for sig in message.sigs {
3✔
298
            let signature = Signature::try_from(sig).map_err(|_| RpcStatus::bad_request("Signature was invalid"))?;
2✔
299
            let response: TxQueryResponse = self.fetch_kernel(signature.clone()).await?;
2✔
300
            responses.push(TxQueryBatchResponse {
2✔
301
                signature: Some(SignatureProto::from(signature)),
2✔
302
                location: response.location,
2✔
303
                best_block_hash: response.best_block_hash,
2✔
304
                confirmations: response.confirmations,
2✔
305
                best_block_height: response.best_block_height.saturating_sub(response.confirmations),
2✔
306
                mined_timestamp: response.mined_timestamp,
2✔
307
            });
2✔
308
        }
309
        Ok(Response::new(TxQueryBatchResponses {
1✔
310
            responses,
1✔
311
            is_synced,
1✔
312
            best_block_hash: metadata.best_block_hash().to_vec(),
1✔
313
            best_block_height: metadata.best_block_height(),
1✔
314
            tip_mined_timestamp: metadata.timestamp(),
1✔
315
        }))
1✔
316
    }
2✔
317

318
    async fn fetch_matching_utxos(
319
        &self,
320
        request: Request<FetchMatchingUtxos>,
321
    ) -> Result<Response<FetchUtxosResponse>, RpcStatus> {
1✔
322
        let message = request.into_message();
1✔
323

1✔
324
        let state_machine = self.state_machine();
1✔
325
        // Determine if we are synced
1✔
326
        let status_watch = state_machine.get_status_info_watch();
1✔
327
        let is_synced = match (status_watch.borrow()).state_info {
1✔
328
            StateInfo::Listening(li) => li.is_synced(),
1✔
329
            _ => false,
×
330
        };
331

332
        let db = self.db();
1✔
333
        let mut res = Vec::with_capacity(message.output_hashes.len());
1✔
334
        let hashes: Vec<FixedHash> = message
1✔
335
            .output_hashes
1✔
336
            .into_iter()
1✔
337
            .map(|hash| hash.try_into().map_err(|_| "Malformed pruned hash".to_string()))
4✔
338
            .collect::<Result<_, _>>()
1✔
339
            .map_err(|_| RpcStatus::bad_request(&"Malformed block hash received".to_string()))?;
1✔
340
        let utxos = db
1✔
341
            .fetch_outputs_with_spend_status_at_tip(hashes)
1✔
342
            .await
1✔
343
            .rpc_status_internal_error(LOG_TARGET)?
1✔
344
            .into_iter()
1✔
345
            .flatten();
1✔
346
        for (output, spent) in utxos {
4✔
347
            if !spent {
3✔
348
                res.push(output);
3✔
349
            }
3✔
350
        }
351

352
        Ok(Response::new(FetchUtxosResponse {
353
            outputs: res
1✔
354
                .into_iter()
1✔
355
                .map(TryInto::try_into)
1✔
356
                .collect::<Result<Vec<_>, String>>()
1✔
357
                .map_err(|err| RpcStatus::bad_request(&err))?,
1✔
358
            is_synced,
1✔
359
        }))
360
    }
2✔
361

362
    async fn utxo_query(&self, request: Request<UtxoQueryRequest>) -> Result<Response<UtxoQueryResponses>, RpcStatus> {
×
363
        let message = request.into_message();
×
364
        if message.output_hashes.is_empty() {
×
365
            return Err(RpcStatus::bad_request("Empty output hashes"));
×
366
        }
×
367
        const MAX_ALLOWED_QUERY_SIZE: usize = 512;
368
        if message.output_hashes.len() > MAX_ALLOWED_QUERY_SIZE {
×
369
            return Err(RpcStatus::bad_request(&format!(
×
370
                "Exceeded maximum allowed query hashes. Max: {}",
×
371
                MAX_ALLOWED_QUERY_SIZE
×
372
            )));
×
373
        }
×
374

×
375
        let db = self.db();
×
376

×
377
        debug!(
×
378
            target: LOG_TARGET,
×
379
            "Querying {} UTXO(s) for mined state",
×
380
            message.output_hashes.len(),
×
381
        );
382
        let hashes: Vec<FixedHash> = message
×
383
            .output_hashes
×
384
            .into_iter()
×
385
            .map(|hash| hash.try_into().map_err(|_| "Malformed pruned hash".to_string()))
×
386
            .collect::<Result<_, _>>()
×
387
            .map_err(|_| RpcStatus::bad_request(&"Malformed block hash received".to_string()))?;
×
388
        trace!(
×
389
            target: LOG_TARGET,
×
390
            "UTXO hashes queried from wallet: {:?}",
×
391
            hashes.iter().map(|h| h.to_hex()).collect::<Vec<String>>()
×
392
        );
393

394
        let mined_info_resp = db
×
395
            .fetch_outputs_mined_info(hashes)
×
396
            .await
×
397
            .rpc_status_internal_error(LOG_TARGET)?;
×
398

399
        let num_mined = mined_info_resp.iter().filter(|opt| opt.is_some()).count();
×
400
        debug!(
×
401
            target: LOG_TARGET,
×
402
            "Found {} mined and {} unmined UTXO(s)",
×
403
            num_mined,
×
404
            mined_info_resp.len() - num_mined
×
405
        );
406
        let metadata = self
×
407
            .db
×
408
            .get_chain_metadata()
×
409
            .await
×
410
            .rpc_status_internal_error(LOG_TARGET)?;
×
411

412
        Ok(Response::new(UtxoQueryResponses {
413
            best_block_height: metadata.best_block_height(),
×
414
            best_block_hash: metadata.best_block_hash().to_vec(),
×
415
            responses: mined_info_resp
×
416
                .into_iter()
×
417
                .flatten()
×
418
                .map(|utxo| {
×
419
                    Ok(UtxoQueryResponse {
×
420
                        mined_at_height: utxo.mined_height,
×
421
                        mined_in_block: utxo.header_hash.to_vec(),
×
422
                        output_hash: utxo.output.hash().to_vec(),
×
423
                        output: match utxo.output.try_into() {
×
424
                            Ok(output) => Some(output),
×
425
                            Err(err) => {
×
426
                                return Err(err);
×
427
                            },
428
                        },
429
                        mined_timestamp: utxo.mined_timestamp,
×
430
                    })
431
                })
×
432
                .collect::<Result<Vec<_>, String>>()
×
433
                .map_err(|err| RpcStatus::bad_request(&err))?,
×
434
        }))
435
    }
×
436

437
    async fn query_deleted(
438
        &self,
439
        request: Request<QueryDeletedRequest>,
440
    ) -> Result<Response<QueryDeletedResponse>, RpcStatus> {
×
441
        let message = request.into_message();
×
442
        if message.hashes.len() > MAX_QUERY_DELETED_HASHES {
×
443
            return Err(RpcStatus::bad_request(
×
444
                &"Received more hashes than we allow".to_string(),
×
445
            ));
×
446
        }
×
447
        let chain_include_header = message.chain_must_include_header;
×
448
        if !chain_include_header.is_empty() {
×
449
            let hash = chain_include_header
×
450
                .try_into()
×
451
                .map_err(|_| RpcStatus::bad_request(&"Malformed block hash received".to_string()))?;
×
452
            if self
×
453
                .db
×
454
                .fetch_header_by_block_hash(hash)
×
455
                .await
×
456
                .rpc_status_internal_error(LOG_TARGET)?
×
457
                .is_none()
×
458
            {
459
                return Err(RpcStatus::not_found(
×
460
                    "Chain does not include header. It might have been reorged out",
×
461
                ));
×
462
            }
×
463
        }
×
464
        let hashes: Vec<FixedHash> = message
×
465
            .hashes
×
466
            .into_iter()
×
467
            .map(|hash| hash.try_into())
×
468
            .collect::<Result<_, _>>()
×
469
            .map_err(|_| RpcStatus::bad_request(&"Malformed utxo hash received".to_string()))?;
×
470
        let mut return_data = Vec::with_capacity(hashes.len());
×
471
        let utxos = self
×
472
            .db
×
473
            .fetch_outputs_mined_info(hashes.clone())
×
474
            .await
×
475
            .rpc_status_internal_error(LOG_TARGET)?;
×
476
        let txos = self
×
477
            .db
×
478
            .fetch_inputs_mined_info(hashes)
×
479
            .await
×
480
            .rpc_status_internal_error(LOG_TARGET)?;
×
481
        if utxos.len() != txos.len() {
×
482
            return Err(RpcStatus::general("database returned different inputs vs outputs"));
×
483
        }
×
484
        for (utxo, txo) in utxos.iter().zip(txos.iter()) {
×
485
            let mut data = match utxo {
×
486
                None => QueryDeletedData {
×
487
                    mined_at_height: 0,
×
488
                    block_mined_in: Vec::new(),
×
489
                    height_deleted_at: 0,
×
490
                    block_deleted_in: Vec::new(),
×
491
                },
×
492
                Some(u) => QueryDeletedData {
×
493
                    mined_at_height: u.mined_height,
×
494
                    block_mined_in: u.header_hash.to_vec(),
×
495
                    height_deleted_at: 0,
×
496
                    block_deleted_in: Vec::new(),
×
497
                },
×
498
            };
499
            if let Some(input) = txo {
×
500
                data.height_deleted_at = input.spent_height;
×
501
                data.block_deleted_in = input.header_hash.to_vec();
×
502
            };
×
503
            return_data.push(data);
×
504
        }
505
        let metadata = self
×
506
            .db
×
507
            .get_chain_metadata()
×
508
            .await
×
509
            .rpc_status_internal_error(LOG_TARGET)?;
×
510

511
        Ok(Response::new(QueryDeletedResponse {
×
512
            best_block_height: metadata.best_block_height(),
×
513
            best_block_hash: metadata.best_block_hash().to_vec(),
×
514
            data: return_data,
×
515
        }))
×
516
    }
×
517

518
    async fn get_tip_info(&self, _request: Request<()>) -> Result<Response<TipInfoResponse>, RpcStatus> {
×
519
        let state_machine = self.state_machine();
×
520
        let status_watch = state_machine.get_status_info_watch();
×
521
        let is_synced = match status_watch.borrow().state_info {
×
522
            StateInfo::Listening(li) => li.is_synced(),
×
523
            _ => false,
×
524
        };
525

526
        let metadata = self
×
527
            .db
×
528
            .get_chain_metadata()
×
529
            .await
×
530
            .rpc_status_internal_error(LOG_TARGET)?;
×
531

532
        Ok(Response::new(TipInfoResponse {
×
533
            metadata: Some(metadata.into()),
×
534
            is_synced,
×
535
        }))
×
536
    }
×
537

538
    async fn get_header(&self, request: Request<u64>) -> Result<Response<proto::core::BlockHeader>, RpcStatus> {
×
539
        let height = request.into_message();
×
540
        let header = self
×
541
            .db()
×
542
            .fetch_header(height)
×
543
            .await
×
544
            .rpc_status_internal_error(LOG_TARGET)?
×
545
            .ok_or_else(|| RpcStatus::not_found(&format!("Header not found at height {}", height)))?;
×
546

547
        Ok(Response::new(header.into()))
×
548
    }
×
549

550
    async fn get_header_by_height(
551
        &self,
552
        request: Request<u64>,
553
    ) -> Result<Response<proto::core::BlockHeader>, RpcStatus> {
×
554
        let height = request.into_message();
×
555
        let header = self
×
556
            .db()
×
557
            .fetch_header(height)
×
558
            .await
×
559
            .rpc_status_internal_error(LOG_TARGET)?
×
560
            .ok_or_else(|| RpcStatus::not_found(&format!("Header not found at height {}", height)))?;
×
561

562
        Ok(Response::new(header.into()))
×
563
    }
×
564

565
    async fn get_height_at_time(&self, request: Request<u64>) -> Result<Response<u64>, RpcStatus> {
9✔
566
        let requested_epoch_time: u64 = request.into_message();
9✔
567
        trace!(target: LOG_TARGET, "requested_epoch_time: {}", requested_epoch_time);
9✔
568
        let tip_header = self
9✔
569
            .db()
9✔
570
            .fetch_tip_header()
9✔
571
            .await
9✔
572
            .rpc_status_internal_error(LOG_TARGET)?;
9✔
573

574
        let mut left_height = 0u64;
9✔
575
        let mut right_height = tip_header.height();
9✔
576
        trace!(
9✔
577
            target: LOG_TARGET,
×
578
            "requested_epoch_time: {}, left: {}, right: {}",
×
579
            requested_epoch_time,
580
            left_height,
581
            right_height
582
        );
583

584
        while left_height <= right_height {
34✔
585
            let mut mid_height = (left_height + right_height) / 2;
34✔
586

34✔
587
            if mid_height == 0 {
34✔
588
                return Ok(Response::new(0u64));
1✔
589
            }
33✔
590
            // If the two bounds are adjacent then perform the test between the right and left sides
33✔
591
            if left_height == mid_height {
33✔
592
                mid_height = right_height;
4✔
593
            }
29✔
594

595
            let mid_header = self
33✔
596
                .db()
33✔
597
                .fetch_header(mid_height)
33✔
598
                .await
33✔
599
                .rpc_status_internal_error(LOG_TARGET)?
33✔
600
                .ok_or_else(|| {
33✔
601
                    RpcStatus::not_found(&format!("Header not found during search at height {}", mid_height))
×
602
                })?;
33✔
603
            let before_mid_header = self
33✔
604
                .db()
33✔
605
                .fetch_header(mid_height - 1)
33✔
606
                .await
33✔
607
                .rpc_status_internal_error(LOG_TARGET)?
33✔
608
                .ok_or_else(|| {
33✔
609
                    RpcStatus::not_found(&format!("Header not found during search at height {}", mid_height - 1))
×
610
                })?;
33✔
611
            trace!(
33✔
612
                target: LOG_TARGET,
×
613
                "requested_epoch_time: {}, left: {}, mid: {}/{} ({}/{}), right: {}",
×
614
                requested_epoch_time,
×
615
                left_height,
×
616
                mid_height,
×
617
                mid_height-1,
×
618
                mid_header.timestamp.as_u64(),
×
619
                before_mid_header.timestamp.as_u64(),
×
620
                right_height
621
            );
622
            if requested_epoch_time < mid_header.timestamp.as_u64() &&
33✔
623
                requested_epoch_time >= before_mid_header.timestamp.as_u64()
12✔
624
            {
625
                trace!(
5✔
626
                    target: LOG_TARGET,
×
627
                    "requested_epoch_time: {}, selected height: {}",
×
628
                    requested_epoch_time, before_mid_header.height
629
                );
630
                return Ok(Response::new(before_mid_header.height));
5✔
631
            } else if mid_height == right_height {
28✔
632
                trace!(
3✔
633
                    target: LOG_TARGET,
×
634
                    "requested_epoch_time: {}, selected height: {}",
×
635
                    requested_epoch_time, right_height
636
                );
637
                return Ok(Response::new(right_height));
3✔
638
            } else if requested_epoch_time <= mid_header.timestamp.as_u64() {
25✔
639
                right_height = mid_height;
8✔
640
            } else {
17✔
641
                left_height = mid_height;
17✔
642
            }
17✔
643
        }
644

645
        Ok(Response::new(0u64))
×
646
    }
18✔
647

648
    async fn sync_utxos_by_block(
649
        &self,
650
        request: Request<SyncUtxosByBlockRequest>,
651
    ) -> Result<Streaming<SyncUtxosByBlockResponse>, RpcStatus> {
2✔
652
        let req = request.message();
2✔
653
        let peer = request.context().peer_node_id();
2✔
654
        debug!(
2✔
655
            target: LOG_TARGET,
×
656
            "Received sync_utxos_by_block request from {} from header {} to {} ",
×
657
            peer,
×
658
            req.start_header_hash.to_hex(),
×
659
            req.end_header_hash.to_hex(),
×
660
        );
661

662
        // Number of blocks to load and push to the stream before loading the next batch. Most blocks have 1 output but
663
        // full blocks will have 500
664
        const BATCH_SIZE: usize = 5;
665
        let (tx, rx) = mpsc::channel(BATCH_SIZE);
2✔
666
        let task = SyncUtxosByBlockTask::new(self.db());
2✔
667
        task.run(request.into_message(), tx).await?;
2✔
668

669
        Ok(Streaming::new(rx))
2✔
670
    }
4✔
671

672
    async fn get_mempool_fee_per_gram_stats(
673
        &self,
674
        request: Request<GetMempoolFeePerGramStatsRequest>,
675
    ) -> Result<Response<GetMempoolFeePerGramStatsResponse>, RpcStatus> {
×
676
        let req = request.into_message();
×
677
        let count =
×
678
            usize::try_from(req.count).map_err(|_| RpcStatus::bad_request("count must be less than or equal to 20"))?;
×
679

680
        if count > 20 {
×
681
            return Err(RpcStatus::bad_request("count must be less than or equal to 20"));
×
682
        }
×
683

684
        let metadata = self
×
685
            .db
×
686
            .get_chain_metadata()
×
687
            .await
×
688
            .rpc_status_internal_error(LOG_TARGET)?;
×
689
        let stats = self
×
690
            .mempool()
×
691
            .get_fee_per_gram_stats(count, metadata.best_block_height())
×
692
            .await
×
693
            .rpc_status_internal_error(LOG_TARGET)?;
×
694

695
        Ok(Response::new(stats.into()))
×
696
    }
×
697

698
    async fn get_wallet_query_http_service_address(
699
        &self,
700
        _request: Request<()>,
NEW
701
    ) -> Result<Response<GetWalletQueryHttpServiceAddressResponse>, RpcStatus> {
×
NEW
702
        Ok(Response::new(GetWalletQueryHttpServiceAddressResponse {
×
NEW
703
            http_address: self
×
NEW
704
                .wallet_query_service_address
×
NEW
705
                .clone()
×
NEW
706
                .map(|url| url.to_string())
×
NEW
707
                .unwrap_or_default(),
×
NEW
708
        }))
×
NEW
709
    }
×
710
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc