• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 17033178607

18 Aug 2025 06:45AM UTC coverage: 54.49% (-0.007%) from 54.497%
17033178607

push

github

stringhandler
Merge branch 'development' of github.com:tari-project/tari into odev

971 of 2923 new or added lines in 369 files covered. (33.22%)

5804 existing lines in 173 files now uncovered.

76688 of 140739 relevant lines covered (54.49%)

193850.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/base_layer/core/src/base_node/rpc/service.rs
1
// Copyright 2025 The Tari Project
2
// SPDX-License-Identifier: BSD-3-Clause
3

4
use std::convert::{TryFrom, TryInto};
5

6
use log::*;
7
use tari_common_types::types::{FixedHash, Signature};
8
use tari_comms::protocol::rpc::{Request, Response, RpcStatus, RpcStatusResultExt, Streaming};
9
use tari_utilities::hex::Hex;
10
use tokio::sync::mpsc;
11
use url::Url;
12

13
use crate::{
14
    base_node::{
15
        rpc::{sync_utxos_by_block_task::SyncUtxosByBlockTask, BaseNodeWalletService},
16
        state_machine_service::states::StateInfo,
17
        StateMachineHandle,
18
    },
19
    chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend},
20
    mempool::{service::MempoolHandle, TxStorageResponse},
21
    proto,
22
    proto::{
23
        base_node::{
24
            FetchMatchingUtxos,
25
            FetchUtxosResponse,
26
            GetMempoolFeePerGramStatsRequest,
27
            GetMempoolFeePerGramStatsResponse,
28
            GetWalletQueryHttpServiceAddressResponse,
29
            QueryDeletedData,
30
            QueryDeletedRequest,
31
            QueryDeletedResponse,
32
            Signatures as SignaturesProto,
33
            SyncUtxosByBlockRequest,
34
            SyncUtxosByBlockResponse,
35
            TipInfoResponse,
36
            TxLocation,
37
            TxQueryBatchResponse,
38
            TxQueryBatchResponses,
39
            TxQueryResponse,
40
            TxSubmissionRejectionReason,
41
            TxSubmissionResponse,
42
            UtxoQueryRequest,
43
            UtxoQueryResponse,
44
            UtxoQueryResponses,
45
        },
46
        types::{Signature as SignatureProto, Transaction as TransactionProto},
47
    },
48
    transactions::transaction_components::Transaction,
49
};
50

51
const LOG_TARGET: &str = "c::base_node::rpc";
52
const MAX_QUERY_DELETED_HASHES: usize = 1000;
53

54
pub struct BaseNodeWalletRpcService<B> {
55
    db: AsyncBlockchainDb<B>,
56
    mempool: MempoolHandle,
57
    state_machine: StateMachineHandle,
58
    wallet_query_service_address: Option<Url>,
59
}
60

61
impl<B: BlockchainBackend + 'static> BaseNodeWalletRpcService<B> {
62
    pub fn new(
×
63
        db: AsyncBlockchainDb<B>,
×
64
        mempool: MempoolHandle,
×
65
        state_machine: StateMachineHandle,
×
66
        wallet_query_service_address: Option<Url>,
×
67
    ) -> Self {
×
68
        Self {
×
69
            db,
×
70
            mempool,
×
71
            state_machine,
×
72
            wallet_query_service_address,
×
73
        }
×
74
    }
×
75

76
    #[inline]
77
    fn db(&self) -> AsyncBlockchainDb<B> {
×
78
        self.db.clone()
×
79
    }
×
80

81
    #[inline]
82
    pub fn mempool(&self) -> MempoolHandle {
×
83
        self.mempool.clone()
×
84
    }
×
85

86
    #[inline]
87
    pub fn state_machine(&self) -> StateMachineHandle {
×
88
        self.state_machine.clone()
×
89
    }
×
90

91
    async fn fetch_kernel(&self, signature: Signature) -> Result<TxQueryResponse, RpcStatus> {
×
92
        let db = self.db();
×
93
        let chain_metadata = db.get_chain_metadata().await.rpc_status_internal_error(LOG_TARGET)?;
×
94
        let state_machine = self.state_machine();
×
95

×
96
        // Determine if we are synced
×
97
        let status_watch = state_machine.get_status_info_watch();
×
98
        let is_synced = match (status_watch.borrow()).state_info {
×
99
            StateInfo::Listening(li) => li.is_synced(),
×
100
            _ => false,
×
101
        };
102
        match db
×
103
            .fetch_kernel_by_excess_sig(signature.clone())
×
104
            .await
×
105
            .rpc_status_internal_error(LOG_TARGET)?
×
106
        {
107
            None => (),
×
108
            Some((_, block_hash)) => {
×
109
                match db
×
110
                    .fetch_header_by_block_hash(block_hash)
×
111
                    .await
×
112
                    .rpc_status_internal_error(LOG_TARGET)?
×
113
                {
114
                    None => (),
×
115
                    Some(header) => {
×
116
                        let confirmations = chain_metadata.best_block_height().saturating_sub(header.height);
×
117
                        let response = TxQueryResponse {
×
118
                            location: TxLocation::Mined as i32,
×
119
                            best_block_hash: block_hash.to_vec(),
×
120
                            confirmations,
×
121
                            is_synced,
×
122
                            best_block_height: chain_metadata.best_block_height(),
×
123
                            mined_timestamp: header.timestamp.as_u64(),
×
124
                        };
×
125
                        return Ok(response);
×
126
                    },
127
                }
128
            },
129
        };
130

131
        // If not in a block then check the mempool
132
        let mut mempool = self.mempool();
×
133
        let mempool_response = match mempool
×
134
            .get_tx_state_by_excess_sig(signature.clone())
×
135
            .await
×
136
            .rpc_status_internal_error(LOG_TARGET)?
×
137
        {
138
            TxStorageResponse::UnconfirmedPool => TxQueryResponse {
×
139
                location: TxLocation::InMempool as i32,
×
140
                best_block_hash: vec![],
×
141
                confirmations: 0,
×
142
                is_synced,
×
143
                best_block_height: chain_metadata.best_block_height(),
×
144
                mined_timestamp: 0,
×
145
            },
×
146
            TxStorageResponse::ReorgPool |
147
            TxStorageResponse::NotStoredOrphan |
148
            TxStorageResponse::NotStoredTimeLocked |
149
            TxStorageResponse::NotStoredAlreadySpent |
150
            TxStorageResponse::NotStoredConsensus |
151
            TxStorageResponse::NotStored |
152
            TxStorageResponse::NotStoredFeeTooLow |
153
            TxStorageResponse::NotStoredAlreadyMined => TxQueryResponse {
×
154
                location: TxLocation::NotStored as i32,
×
155
                best_block_hash: vec![],
×
156
                confirmations: 0,
×
157
                is_synced,
×
158
                best_block_height: chain_metadata.best_block_height(),
×
159
                mined_timestamp: 0,
×
160
            },
×
161
        };
162
        Ok(mempool_response)
×
163
    }
×
164
}
165

166
#[tari_comms::async_trait]
167
impl<B: BlockchainBackend + 'static> BaseNodeWalletService for BaseNodeWalletRpcService<B> {
168
    async fn submit_transaction(
169
        &self,
170
        request: Request<TransactionProto>,
171
    ) -> Result<Response<TxSubmissionResponse>, RpcStatus> {
×
172
        let message = request.into_message();
×
173
        let transaction =
×
174
            Transaction::try_from(message).map_err(|_| RpcStatus::bad_request("Transaction was invalid"))?;
×
175
        let mut mempool = self.mempool();
×
176
        let state_machine = self.state_machine();
×
177

×
178
        // Determine if we are synced
×
179
        let status_watch = state_machine.get_status_info_watch();
×
180
        let is_synced = match (status_watch.borrow()).state_info {
×
181
            StateInfo::Listening(li) => li.is_synced(),
×
182
            _ => false,
×
183
        };
184

185
        let response = match mempool
×
186
            .submit_transaction(transaction.clone())
×
187
            .await
×
188
            .rpc_status_internal_error(LOG_TARGET)?
×
189
        {
190
            TxStorageResponse::UnconfirmedPool => TxSubmissionResponse {
×
191
                accepted: true,
×
192
                rejection_reason: TxSubmissionRejectionReason::None.into(),
×
193
                is_synced,
×
194
            },
×
195

196
            TxStorageResponse::NotStoredOrphan => TxSubmissionResponse {
×
197
                accepted: false,
×
198
                rejection_reason: TxSubmissionRejectionReason::Orphan.into(),
×
199
                is_synced,
×
200
            },
×
201
            TxStorageResponse::NotStoredFeeTooLow => TxSubmissionResponse {
×
202
                accepted: false,
×
203
                rejection_reason: TxSubmissionRejectionReason::FeeTooLow.into(),
×
204
                is_synced,
×
205
            },
×
206
            TxStorageResponse::NotStoredTimeLocked => TxSubmissionResponse {
×
207
                accepted: false,
×
208
                rejection_reason: TxSubmissionRejectionReason::TimeLocked.into(),
×
209
                is_synced,
×
210
            },
×
211
            TxStorageResponse::NotStoredConsensus | TxStorageResponse::NotStored => TxSubmissionResponse {
×
212
                accepted: false,
×
213
                rejection_reason: TxSubmissionRejectionReason::ValidationFailed.into(),
×
214
                is_synced,
×
215
            },
×
216
            TxStorageResponse::NotStoredAlreadySpent |
217
            TxStorageResponse::ReorgPool |
218
            TxStorageResponse::NotStoredAlreadyMined => {
219
                // Is this transaction a double spend or has this transaction been mined?
220
                match transaction.first_kernel_excess_sig() {
×
221
                    None => TxSubmissionResponse {
×
222
                        accepted: false,
×
223
                        rejection_reason: TxSubmissionRejectionReason::DoubleSpend.into(),
×
224
                        is_synced,
×
225
                    },
×
226
                    Some(s) => {
×
227
                        // Check to see if the kernel exists in the blockchain db in which case this exact transaction
×
228
                        // already exists in the chain, otherwise it is a double spend
×
229
                        let db = self.db();
×
230
                        match db
×
231
                            .fetch_kernel_by_excess_sig(s.clone())
×
232
                            .await
×
233
                            .rpc_status_internal_error(LOG_TARGET)?
×
234
                        {
235
                            None => TxSubmissionResponse {
×
236
                                accepted: false,
×
237
                                rejection_reason: TxSubmissionRejectionReason::DoubleSpend.into(),
×
238
                                is_synced,
×
239
                            },
×
240
                            Some(_) => TxSubmissionResponse {
×
241
                                accepted: false,
×
242
                                rejection_reason: TxSubmissionRejectionReason::AlreadyMined.into(),
×
243
                                is_synced,
×
244
                            },
×
245
                        }
246
                    },
247
                }
248
            },
249
        };
250
        Ok(Response::new(response))
×
251
    }
×
252

253
    async fn transaction_query(
254
        &self,
255
        request: Request<SignatureProto>,
256
    ) -> Result<Response<TxQueryResponse>, RpcStatus> {
×
257
        let state_machine = self.state_machine();
×
258

×
259
        // Determine if we are synced
×
260
        let status_watch = state_machine.get_status_info_watch();
×
261
        let is_synced = match status_watch.borrow().state_info {
×
262
            StateInfo::Listening(li) => li.is_synced(),
×
263
            _ => false,
×
264
        };
265

266
        let message = request.into_message();
×
267
        let signature = Signature::try_from(message).map_err(|_| RpcStatus::bad_request("Signature was invalid"))?;
×
268

269
        let mut response = self.fetch_kernel(signature).await?;
×
270
        response.is_synced = is_synced;
×
271
        Ok(Response::new(response))
×
272
    }
×
273

274
    async fn transaction_batch_query(
275
        &self,
276
        request: Request<SignaturesProto>,
277
    ) -> Result<Response<TxQueryBatchResponses>, RpcStatus> {
×
278
        let state_machine = self.state_machine();
×
279

×
280
        // Determine if we are synced
×
281
        let status_watch = state_machine.get_status_info_watch();
×
282
        let is_synced = match (status_watch.borrow()).state_info {
×
283
            StateInfo::Listening(li) => li.is_synced(),
×
284
            _ => false,
×
285
        };
286

287
        let message = request.into_message();
×
288

×
289
        let mut responses: Vec<TxQueryBatchResponse> = Vec::new();
×
290

291
        let metadata = self
×
292
            .db
×
293
            .get_chain_metadata()
×
294
            .await
×
295
            .rpc_status_internal_error(LOG_TARGET)?;
×
296

297
        for sig in message.sigs {
×
298
            let signature = Signature::try_from(sig).map_err(|_| RpcStatus::bad_request("Signature was invalid"))?;
×
299
            let response: TxQueryResponse = self.fetch_kernel(signature.clone()).await?;
×
300
            responses.push(TxQueryBatchResponse {
×
301
                signature: Some(SignatureProto::from(&signature)),
×
302
                location: response.location,
×
303
                best_block_hash: response.best_block_hash,
×
304
                confirmations: response.confirmations,
×
305
                best_block_height: response.best_block_height.saturating_sub(response.confirmations),
×
306
                mined_timestamp: response.mined_timestamp,
×
307
            });
×
308
        }
309
        Ok(Response::new(TxQueryBatchResponses {
×
310
            responses,
×
311
            is_synced,
×
312
            best_block_hash: metadata.best_block_hash().to_vec(),
×
313
            best_block_height: metadata.best_block_height(),
×
314
            tip_mined_timestamp: metadata.timestamp(),
×
315
        }))
×
316
    }
×
317

318
    async fn fetch_matching_utxos(
319
        &self,
320
        request: Request<FetchMatchingUtxos>,
321
    ) -> Result<Response<FetchUtxosResponse>, RpcStatus> {
×
322
        let message = request.into_message();
×
323

×
324
        let state_machine = self.state_machine();
×
325
        // Determine if we are synced
×
326
        let status_watch = state_machine.get_status_info_watch();
×
327
        let is_synced = match (status_watch.borrow()).state_info {
×
328
            StateInfo::Listening(li) => li.is_synced(),
×
329
            _ => false,
×
330
        };
331

332
        let db = self.db();
×
333
        let mut res = Vec::with_capacity(message.output_hashes.len());
×
334
        let hashes: Vec<FixedHash> = message
×
335
            .output_hashes
×
336
            .into_iter()
×
337
            .map(|hash| hash.try_into().map_err(|_| "Malformed pruned hash".to_string()))
×
338
            .collect::<Result<_, _>>()
×
339
            .map_err(|_| RpcStatus::bad_request(&"Malformed block hash received".to_string()))?;
×
340
        let utxos = db
×
341
            .fetch_outputs_with_spend_status_at_tip(hashes)
×
342
            .await
×
343
            .rpc_status_internal_error(LOG_TARGET)?
×
344
            .into_iter()
×
345
            .flatten();
×
346
        for (output, spent) in utxos {
×
347
            if !spent {
×
348
                res.push(output);
×
349
            }
×
350
        }
351

352
        Ok(Response::new(FetchUtxosResponse {
353
            outputs: res
×
354
                .into_iter()
×
355
                .map(TryInto::try_into)
×
356
                .collect::<Result<Vec<_>, String>>()
×
357
                .map_err(|err| RpcStatus::bad_request(&err))?,
×
358
            is_synced,
×
359
        }))
360
    }
×
361

362
    async fn utxo_query(&self, request: Request<UtxoQueryRequest>) -> Result<Response<UtxoQueryResponses>, RpcStatus> {
×
363
        let message = request.into_message();
×
364
        if message.output_hashes.is_empty() {
×
365
            return Err(RpcStatus::bad_request("Empty output hashes"));
×
366
        }
×
367
        const MAX_ALLOWED_QUERY_SIZE: usize = 512;
368
        if message.output_hashes.len() > MAX_ALLOWED_QUERY_SIZE {
×
369
            return Err(RpcStatus::bad_request(&format!(
×
NEW
370
                "Exceeded maximum allowed query hashes. Max: {MAX_ALLOWED_QUERY_SIZE}"
×
371
            )));
×
372
        }
×
373

×
374
        let db = self.db();
×
375

×
376
        debug!(
×
377
            target: LOG_TARGET,
×
378
            "Querying {} UTXO(s) for mined state",
×
UNCOV
379
            message.output_hashes.len(),
×
380
        );
381
        let hashes: Vec<FixedHash> = message
×
382
            .output_hashes
×
383
            .into_iter()
×
384
            .map(|hash| hash.try_into().map_err(|_| "Malformed pruned hash".to_string()))
×
385
            .collect::<Result<_, _>>()
×
386
            .map_err(|_| RpcStatus::bad_request(&"Malformed block hash received".to_string()))?;
×
387
        trace!(
×
388
            target: LOG_TARGET,
×
389
            "UTXO hashes queried from wallet: {:?}",
×
UNCOV
390
            hashes.iter().map(|h| h.to_hex()).collect::<Vec<String>>()
×
391
        );
392

393
        let mined_info_resp = db
×
394
            .fetch_outputs_mined_info(hashes)
×
395
            .await
×
UNCOV
396
            .rpc_status_internal_error(LOG_TARGET)?;
×
397

398
        let num_mined = mined_info_resp.iter().filter(|opt| opt.is_some()).count();
×
399
        debug!(
×
400
            target: LOG_TARGET,
×
401
            "Found {} mined and {} unmined UTXO(s)",
×
402
            num_mined,
×
UNCOV
403
            mined_info_resp.len() - num_mined
×
404
        );
405
        let metadata = self
×
406
            .db
×
407
            .get_chain_metadata()
×
408
            .await
×
UNCOV
409
            .rpc_status_internal_error(LOG_TARGET)?;
×
410

411
        Ok(Response::new(UtxoQueryResponses {
412
            best_block_height: metadata.best_block_height(),
×
413
            best_block_hash: metadata.best_block_hash().to_vec(),
×
414
            responses: mined_info_resp
×
415
                .into_iter()
×
416
                .flatten()
×
417
                .map(|utxo| {
×
418
                    Ok(UtxoQueryResponse {
×
419
                        mined_at_height: utxo.mined_height,
×
420
                        mined_in_block: utxo.header_hash.to_vec(),
×
421
                        output_hash: utxo.output.hash().to_vec(),
×
422
                        output: match utxo.output.try_into() {
×
423
                            Ok(output) => Some(output),
×
424
                            Err(err) => {
×
UNCOV
425
                                return Err(err);
×
426
                            },
427
                        },
UNCOV
428
                        mined_timestamp: utxo.mined_timestamp,
×
429
                    })
430
                })
×
431
                .collect::<Result<Vec<_>, String>>()
×
UNCOV
432
                .map_err(|err| RpcStatus::bad_request(&err))?,
×
433
        }))
UNCOV
434
    }
×
435

436
    async fn query_deleted(
437
        &self,
438
        request: Request<QueryDeletedRequest>,
439
    ) -> Result<Response<QueryDeletedResponse>, RpcStatus> {
×
440
        let message = request.into_message();
×
441
        if message.hashes.len() > MAX_QUERY_DELETED_HASHES {
×
442
            return Err(RpcStatus::bad_request(
×
443
                &"Received more hashes than we allow".to_string(),
×
444
            ));
×
445
        }
×
446
        let chain_include_header = message.chain_must_include_header;
×
447
        if !chain_include_header.is_empty() {
×
448
            let hash = chain_include_header
×
449
                .try_into()
×
450
                .map_err(|_| RpcStatus::bad_request(&"Malformed block hash received".to_string()))?;
×
451
            if self
×
452
                .db
×
453
                .fetch_header_by_block_hash(hash)
×
454
                .await
×
455
                .rpc_status_internal_error(LOG_TARGET)?
×
UNCOV
456
                .is_none()
×
457
            {
458
                return Err(RpcStatus::not_found(
×
459
                    "Chain does not include header. It might have been reorged out",
×
460
                ));
×
461
            }
×
462
        }
×
463
        let hashes: Vec<FixedHash> = message
×
464
            .hashes
×
465
            .into_iter()
×
466
            .map(|hash| hash.try_into())
×
467
            .collect::<Result<_, _>>()
×
468
            .map_err(|_| RpcStatus::bad_request(&"Malformed utxo hash received".to_string()))?;
×
469
        let mut return_data = Vec::with_capacity(hashes.len());
×
470
        let utxos = self
×
471
            .db
×
472
            .fetch_outputs_mined_info(hashes.clone())
×
473
            .await
×
474
            .rpc_status_internal_error(LOG_TARGET)?;
×
475
        let txos = self
×
476
            .db
×
477
            .fetch_inputs_mined_info(hashes)
×
478
            .await
×
479
            .rpc_status_internal_error(LOG_TARGET)?;
×
480
        if utxos.len() != txos.len() {
×
481
            return Err(RpcStatus::general("database returned different inputs vs outputs"));
×
482
        }
×
483
        for (utxo, txo) in utxos.iter().zip(txos.iter()) {
×
484
            let mut data = match utxo {
×
485
                None => QueryDeletedData {
×
486
                    mined_at_height: 0,
×
487
                    block_mined_in: Vec::new(),
×
488
                    height_deleted_at: 0,
×
489
                    block_deleted_in: Vec::new(),
×
490
                },
×
491
                Some(u) => QueryDeletedData {
×
492
                    mined_at_height: u.mined_height,
×
493
                    block_mined_in: u.header_hash.to_vec(),
×
494
                    height_deleted_at: 0,
×
495
                    block_deleted_in: Vec::new(),
×
UNCOV
496
                },
×
497
            };
498
            if let Some(input) = txo {
×
499
                data.height_deleted_at = input.spent_height;
×
500
                data.block_deleted_in = input.header_hash.to_vec();
×
501
            };
×
UNCOV
502
            return_data.push(data);
×
503
        }
504
        let metadata = self
×
505
            .db
×
506
            .get_chain_metadata()
×
507
            .await
×
UNCOV
508
            .rpc_status_internal_error(LOG_TARGET)?;
×
509

510
        Ok(Response::new(QueryDeletedResponse {
×
511
            best_block_height: metadata.best_block_height(),
×
512
            best_block_hash: metadata.best_block_hash().to_vec(),
×
513
            data: return_data,
×
514
        }))
×
UNCOV
515
    }
×
516

517
    async fn get_tip_info(&self, _request: Request<()>) -> Result<Response<TipInfoResponse>, RpcStatus> {
×
518
        let state_machine = self.state_machine();
×
519
        let status_watch = state_machine.get_status_info_watch();
×
520
        let is_synced = match status_watch.borrow().state_info {
×
521
            StateInfo::Listening(li) => li.is_synced(),
×
UNCOV
522
            _ => false,
×
523
        };
524

525
        let metadata = self
×
526
            .db
×
527
            .get_chain_metadata()
×
528
            .await
×
UNCOV
529
            .rpc_status_internal_error(LOG_TARGET)?;
×
530

531
        Ok(Response::new(TipInfoResponse {
×
532
            metadata: Some(metadata.into()),
×
533
            is_synced,
×
534
        }))
×
UNCOV
535
    }
×
536

537
    async fn get_header(&self, request: Request<u64>) -> Result<Response<proto::core::BlockHeader>, RpcStatus> {
×
538
        let height = request.into_message();
×
539
        let header = self
×
540
            .db()
×
541
            .fetch_header(height)
×
542
            .await
×
543
            .rpc_status_internal_error(LOG_TARGET)?
×
NEW
544
            .ok_or_else(|| RpcStatus::not_found(&format!("Header not found at height {height}")))?;
×
545

546
        Ok(Response::new(header.into()))
×
UNCOV
547
    }
×
548

549
    async fn get_header_by_height(
550
        &self,
551
        request: Request<u64>,
552
    ) -> Result<Response<proto::core::BlockHeader>, RpcStatus> {
×
553
        let height = request.into_message();
×
554
        let header = self
×
555
            .db()
×
556
            .fetch_header(height)
×
557
            .await
×
558
            .rpc_status_internal_error(LOG_TARGET)?
×
NEW
559
            .ok_or_else(|| RpcStatus::not_found(&format!("Header not found at height {height}")))?;
×
560

561
        Ok(Response::new(header.into()))
×
UNCOV
562
    }
×
563

564
    async fn get_height_at_time(&self, request: Request<u64>) -> Result<Response<u64>, RpcStatus> {
×
565
        let requested_epoch_time: u64 = request.into_message();
×
NEW
566
        trace!(target: LOG_TARGET, "requested_epoch_time: {requested_epoch_time}");
×
567
        let tip_header = self
×
568
            .db()
×
569
            .fetch_tip_header()
×
570
            .await
×
UNCOV
571
            .rpc_status_internal_error(LOG_TARGET)?;
×
572

573
        let mut left_height = 0u64;
×
574
        let mut right_height = tip_header.height();
×
575
        trace!(
×
576
            target: LOG_TARGET,
×
UNCOV
577
            "requested_epoch_time: {}, left: {}, right: {}",
×
578
            requested_epoch_time,
579
            left_height,
580
            right_height
581
        );
582

583
        while left_height <= right_height {
×
584
            let mut mid_height = (left_height + right_height) / 2;
×
585

×
586
            if mid_height == 0 {
×
587
                return Ok(Response::new(0u64));
×
588
            }
×
589
            // If the two bounds are adjacent then perform the test between the right and left sides
×
590
            if left_height == mid_height {
×
591
                mid_height = right_height;
×
UNCOV
592
            }
×
593

594
            let mid_header = self
×
595
                .db()
×
596
                .fetch_header(mid_height)
×
597
                .await
×
598
                .rpc_status_internal_error(LOG_TARGET)?
×
599
                .ok_or_else(|| {
×
NEW
600
                    RpcStatus::not_found(&format!("Header not found during search at height {mid_height}"))
×
601
                })?;
×
602
            let before_mid_header = self
×
603
                .db()
×
604
                .fetch_header(mid_height - 1)
×
605
                .await
×
606
                .rpc_status_internal_error(LOG_TARGET)?
×
607
                .ok_or_else(|| {
×
608
                    RpcStatus::not_found(&format!("Header not found during search at height {}", mid_height - 1))
×
609
                })?;
×
610
            trace!(
×
611
                target: LOG_TARGET,
×
612
                "requested_epoch_time: {}, left: {}, mid: {}/{} ({}/{}), right: {}",
×
613
                requested_epoch_time,
×
614
                left_height,
×
615
                mid_height,
×
616
                mid_height-1,
×
617
                mid_header.timestamp.as_u64(),
×
UNCOV
618
                before_mid_header.timestamp.as_u64(),
×
619
                right_height
620
            );
621
            if requested_epoch_time < mid_header.timestamp.as_u64() &&
×
UNCOV
622
                requested_epoch_time >= before_mid_header.timestamp.as_u64()
×
623
            {
624
                trace!(
×
625
                    target: LOG_TARGET,
×
UNCOV
626
                    "requested_epoch_time: {}, selected height: {}",
×
627
                    requested_epoch_time, before_mid_header.height
628
                );
629
                return Ok(Response::new(before_mid_header.height));
×
630
            } else if mid_height == right_height {
×
631
                trace!(
×
632
                    target: LOG_TARGET,
×
NEW
633
                    "requested_epoch_time: {requested_epoch_time}, selected height: {right_height}"
×
634
                );
635
                return Ok(Response::new(right_height));
×
636
            } else if requested_epoch_time <= mid_header.timestamp.as_u64() {
×
637
                right_height = mid_height;
×
638
            } else {
×
UNCOV
639
                left_height = mid_height;
×
UNCOV
640
            }
×
641
        }
642

UNCOV
643
        Ok(Response::new(0u64))
×
UNCOV
644
    }
×
645

646
    async fn sync_utxos_by_block(
647
        &self,
648
        request: Request<SyncUtxosByBlockRequest>,
649
    ) -> Result<Streaming<SyncUtxosByBlockResponse>, RpcStatus> {
×
650
        let req = request.message();
×
651
        let peer = request.context().peer_node_id();
×
652
        debug!(
×
653
            target: LOG_TARGET,
×
654
            "Received sync_utxos_by_block request from {} from header {} to {} ",
×
655
            peer,
×
UNCOV
656
            req.start_header_hash.to_hex(),
×
UNCOV
657
            req.end_header_hash.to_hex(),
×
658
        );
659

660
        // Number of blocks to load and push to the stream before loading the next batch. Most blocks have 1 output but
661
        // full blocks will have 500
662
        const BATCH_SIZE: usize = 5;
663
        let (tx, rx) = mpsc::channel(BATCH_SIZE);
×
UNCOV
664
        let task = SyncUtxosByBlockTask::new(self.db());
×
665
        task.run(request.into_message(), tx).await?;
×
666

UNCOV
667
        Ok(Streaming::new(rx))
×
UNCOV
668
    }
×
669

670
    async fn get_mempool_fee_per_gram_stats(
671
        &self,
672
        request: Request<GetMempoolFeePerGramStatsRequest>,
673
    ) -> Result<Response<GetMempoolFeePerGramStatsResponse>, RpcStatus> {
×
674
        let req = request.into_message();
×
UNCOV
675
        let count =
×
676
            usize::try_from(req.count).map_err(|_| RpcStatus::bad_request("count must be less than or equal to 20"))?;
×
677

678
        if count > 20 {
×
UNCOV
679
            return Err(RpcStatus::bad_request("count must be less than or equal to 20"));
×
680
        }
×
681

682
        let metadata = self
×
683
            .db
×
684
            .get_chain_metadata()
×
685
            .await
×
686
            .rpc_status_internal_error(LOG_TARGET)?;
×
687
        let stats = self
×
688
            .mempool()
×
689
            .get_fee_per_gram_stats(count, metadata.best_block_height())
×
UNCOV
690
            .await
×
691
            .rpc_status_internal_error(LOG_TARGET)?;
×
692

UNCOV
693
        Ok(Response::new(stats.into()))
×
UNCOV
694
    }
×
695

696
    async fn get_wallet_query_http_service_address(
697
        &self,
698
        _request: Request<()>,
699
    ) -> Result<Response<GetWalletQueryHttpServiceAddressResponse>, RpcStatus> {
×
700
        Ok(Response::new(GetWalletQueryHttpServiceAddressResponse {
×
701
            http_address: self
×
702
                .wallet_query_service_address
×
703
                .clone()
×
704
                .map(|url| url.to_string())
×
705
                .unwrap_or_default(),
×
UNCOV
706
        }))
×
UNCOV
707
    }
×
708
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc