• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 16123384529

07 Jul 2025 05:11PM UTC coverage: 64.327% (-7.6%) from 71.89%
16123384529

push

github

web-flow
chore: new release v4.9.0-pre.0 (#7289)

Description
---
new release esmeralda

77151 of 119935 relevant lines covered (64.33%)

227108.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/base_layer/core/src/base_node/rpc/service.rs
1
// Copyright 2025 The Tari Project
2
// SPDX-License-Identifier: BSD-3-Clause
3

4
use std::convert::{TryFrom, TryInto};
5

6
use log::*;
7
use tari_common_types::types::{FixedHash, Signature};
8
use tari_comms::protocol::rpc::{Request, Response, RpcStatus, RpcStatusResultExt, Streaming};
9
use tari_utilities::hex::Hex;
10
use tokio::sync::mpsc;
11
use url::Url;
12

13
use crate::{
14
    base_node::{
15
        rpc::{sync_utxos_by_block_task::SyncUtxosByBlockTask, BaseNodeWalletService},
16
        state_machine_service::states::StateInfo,
17
        StateMachineHandle,
18
    },
19
    chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend},
20
    mempool::{service::MempoolHandle, TxStorageResponse},
21
    proto,
22
    proto::{
23
        base_node::{
24
            FetchMatchingUtxos,
25
            FetchUtxosResponse,
26
            GetMempoolFeePerGramStatsRequest,
27
            GetMempoolFeePerGramStatsResponse,
28
            GetWalletQueryHttpServiceAddressResponse,
29
            QueryDeletedData,
30
            QueryDeletedRequest,
31
            QueryDeletedResponse,
32
            Signatures as SignaturesProto,
33
            SyncUtxosByBlockRequest,
34
            SyncUtxosByBlockResponse,
35
            TipInfoResponse,
36
            TxLocation,
37
            TxQueryBatchResponse,
38
            TxQueryBatchResponses,
39
            TxQueryResponse,
40
            TxSubmissionRejectionReason,
41
            TxSubmissionResponse,
42
            UtxoQueryRequest,
43
            UtxoQueryResponse,
44
            UtxoQueryResponses,
45
        },
46
        types::{Signature as SignatureProto, Transaction as TransactionProto},
47
    },
48
    transactions::transaction_components::Transaction,
49
};
50

51
const LOG_TARGET: &str = "c::base_node::rpc";
52
const MAX_QUERY_DELETED_HASHES: usize = 1000;
53

54
pub struct BaseNodeWalletRpcService<B> {
55
    db: AsyncBlockchainDb<B>,
56
    mempool: MempoolHandle,
57
    state_machine: StateMachineHandle,
58
    wallet_query_service_address: Option<Url>,
59
}
60

61
impl<B: BlockchainBackend + 'static> BaseNodeWalletRpcService<B> {
62
    pub fn new(
×
63
        db: AsyncBlockchainDb<B>,
×
64
        mempool: MempoolHandle,
×
65
        state_machine: StateMachineHandle,
×
66
        wallet_query_service_address: Option<Url>,
×
67
    ) -> Self {
×
68
        Self {
×
69
            db,
×
70
            mempool,
×
71
            state_machine,
×
72
            wallet_query_service_address,
×
73
        }
×
74
    }
×
75

76
    #[inline]
77
    fn db(&self) -> AsyncBlockchainDb<B> {
×
78
        self.db.clone()
×
79
    }
×
80

81
    #[inline]
82
    pub fn mempool(&self) -> MempoolHandle {
×
83
        self.mempool.clone()
×
84
    }
×
85

86
    #[inline]
87
    pub fn state_machine(&self) -> StateMachineHandle {
×
88
        self.state_machine.clone()
×
89
    }
×
90

91
    async fn fetch_kernel(&self, signature: Signature) -> Result<TxQueryResponse, RpcStatus> {
×
92
        let db = self.db();
×
93
        let chain_metadata = db.get_chain_metadata().await.rpc_status_internal_error(LOG_TARGET)?;
×
94
        let state_machine = self.state_machine();
×
95

×
96
        // Determine if we are synced
×
97
        let status_watch = state_machine.get_status_info_watch();
×
98
        let is_synced = match (status_watch.borrow()).state_info {
×
99
            StateInfo::Listening(li) => li.is_synced(),
×
100
            _ => false,
×
101
        };
102
        match db
×
103
            .fetch_kernel_by_excess_sig(signature.clone())
×
104
            .await
×
105
            .rpc_status_internal_error(LOG_TARGET)?
×
106
        {
107
            None => (),
×
108
            Some((_, block_hash)) => {
×
109
                match db
×
110
                    .fetch_header_by_block_hash(block_hash)
×
111
                    .await
×
112
                    .rpc_status_internal_error(LOG_TARGET)?
×
113
                {
114
                    None => (),
×
115
                    Some(header) => {
×
116
                        let confirmations = chain_metadata.best_block_height().saturating_sub(header.height);
×
117
                        let response = TxQueryResponse {
×
118
                            location: TxLocation::Mined as i32,
×
119
                            best_block_hash: block_hash.to_vec(),
×
120
                            confirmations,
×
121
                            is_synced,
×
122
                            best_block_height: chain_metadata.best_block_height(),
×
123
                            mined_timestamp: header.timestamp.as_u64(),
×
124
                        };
×
125
                        return Ok(response);
×
126
                    },
127
                }
128
            },
129
        };
130

131
        // If not in a block then check the mempool
132
        let mut mempool = self.mempool();
×
133
        let mempool_response = match mempool
×
134
            .get_tx_state_by_excess_sig(signature.clone())
×
135
            .await
×
136
            .rpc_status_internal_error(LOG_TARGET)?
×
137
        {
138
            TxStorageResponse::UnconfirmedPool => TxQueryResponse {
×
139
                location: TxLocation::InMempool as i32,
×
140
                best_block_hash: vec![],
×
141
                confirmations: 0,
×
142
                is_synced,
×
143
                best_block_height: chain_metadata.best_block_height(),
×
144
                mined_timestamp: 0,
×
145
            },
×
146
            TxStorageResponse::ReorgPool |
147
            TxStorageResponse::NotStoredOrphan |
148
            TxStorageResponse::NotStoredTimeLocked |
149
            TxStorageResponse::NotStoredAlreadySpent |
150
            TxStorageResponse::NotStoredConsensus |
151
            TxStorageResponse::NotStored |
152
            TxStorageResponse::NotStoredFeeTooLow |
153
            TxStorageResponse::NotStoredAlreadyMined => TxQueryResponse {
×
154
                location: TxLocation::NotStored as i32,
×
155
                best_block_hash: vec![],
×
156
                confirmations: 0,
×
157
                is_synced,
×
158
                best_block_height: chain_metadata.best_block_height(),
×
159
                mined_timestamp: 0,
×
160
            },
×
161
        };
162
        Ok(mempool_response)
×
163
    }
×
164
}
165

166
#[tari_comms::async_trait]
167
impl<B: BlockchainBackend + 'static> BaseNodeWalletService for BaseNodeWalletRpcService<B> {
168
    async fn submit_transaction(
169
        &self,
170
        request: Request<TransactionProto>,
171
    ) -> Result<Response<TxSubmissionResponse>, RpcStatus> {
×
172
        let message = request.into_message();
×
173
        let transaction =
×
174
            Transaction::try_from(message).map_err(|_| RpcStatus::bad_request("Transaction was invalid"))?;
×
175
        let mut mempool = self.mempool();
×
176
        let state_machine = self.state_machine();
×
177

×
178
        // Determine if we are synced
×
179
        let status_watch = state_machine.get_status_info_watch();
×
180
        let is_synced = match (status_watch.borrow()).state_info {
×
181
            StateInfo::Listening(li) => li.is_synced(),
×
182
            _ => false,
×
183
        };
184

185
        let response = match mempool
×
186
            .submit_transaction(transaction.clone())
×
187
            .await
×
188
            .rpc_status_internal_error(LOG_TARGET)?
×
189
        {
190
            TxStorageResponse::UnconfirmedPool => TxSubmissionResponse {
×
191
                accepted: true,
×
192
                rejection_reason: TxSubmissionRejectionReason::None.into(),
×
193
                is_synced,
×
194
            },
×
195

196
            TxStorageResponse::NotStoredOrphan => TxSubmissionResponse {
×
197
                accepted: false,
×
198
                rejection_reason: TxSubmissionRejectionReason::Orphan.into(),
×
199
                is_synced,
×
200
            },
×
201
            TxStorageResponse::NotStoredFeeTooLow => TxSubmissionResponse {
×
202
                accepted: false,
×
203
                rejection_reason: TxSubmissionRejectionReason::FeeTooLow.into(),
×
204
                is_synced,
×
205
            },
×
206
            TxStorageResponse::NotStoredTimeLocked => TxSubmissionResponse {
×
207
                accepted: false,
×
208
                rejection_reason: TxSubmissionRejectionReason::TimeLocked.into(),
×
209
                is_synced,
×
210
            },
×
211
            TxStorageResponse::NotStoredConsensus | TxStorageResponse::NotStored => TxSubmissionResponse {
×
212
                accepted: false,
×
213
                rejection_reason: TxSubmissionRejectionReason::ValidationFailed.into(),
×
214
                is_synced,
×
215
            },
×
216
            TxStorageResponse::NotStoredAlreadySpent |
217
            TxStorageResponse::ReorgPool |
218
            TxStorageResponse::NotStoredAlreadyMined => {
219
                // Is this transaction a double spend or has this transaction been mined?
220
                match transaction.first_kernel_excess_sig() {
×
221
                    None => TxSubmissionResponse {
×
222
                        accepted: false,
×
223
                        rejection_reason: TxSubmissionRejectionReason::DoubleSpend.into(),
×
224
                        is_synced,
×
225
                    },
×
226
                    Some(s) => {
×
227
                        // Check to see if the kernel exists in the blockchain db in which case this exact transaction
×
228
                        // already exists in the chain, otherwise it is a double spend
×
229
                        let db = self.db();
×
230
                        match db
×
231
                            .fetch_kernel_by_excess_sig(s.clone())
×
232
                            .await
×
233
                            .rpc_status_internal_error(LOG_TARGET)?
×
234
                        {
235
                            None => TxSubmissionResponse {
×
236
                                accepted: false,
×
237
                                rejection_reason: TxSubmissionRejectionReason::DoubleSpend.into(),
×
238
                                is_synced,
×
239
                            },
×
240
                            Some(_) => TxSubmissionResponse {
×
241
                                accepted: false,
×
242
                                rejection_reason: TxSubmissionRejectionReason::AlreadyMined.into(),
×
243
                                is_synced,
×
244
                            },
×
245
                        }
246
                    },
247
                }
248
            },
249
        };
250
        Ok(Response::new(response))
×
251
    }
×
252

253
    async fn transaction_query(
254
        &self,
255
        request: Request<SignatureProto>,
256
    ) -> Result<Response<TxQueryResponse>, RpcStatus> {
×
257
        let state_machine = self.state_machine();
×
258

×
259
        // Determine if we are synced
×
260
        let status_watch = state_machine.get_status_info_watch();
×
261
        let is_synced = match status_watch.borrow().state_info {
×
262
            StateInfo::Listening(li) => li.is_synced(),
×
263
            _ => false,
×
264
        };
265

266
        let message = request.into_message();
×
267
        let signature = Signature::try_from(message).map_err(|_| RpcStatus::bad_request("Signature was invalid"))?;
×
268

269
        let mut response = self.fetch_kernel(signature).await?;
×
270
        response.is_synced = is_synced;
×
271
        Ok(Response::new(response))
×
272
    }
×
273

274
    async fn transaction_batch_query(
275
        &self,
276
        request: Request<SignaturesProto>,
277
    ) -> Result<Response<TxQueryBatchResponses>, RpcStatus> {
×
278
        let state_machine = self.state_machine();
×
279

×
280
        // Determine if we are synced
×
281
        let status_watch = state_machine.get_status_info_watch();
×
282
        let is_synced = match (status_watch.borrow()).state_info {
×
283
            StateInfo::Listening(li) => li.is_synced(),
×
284
            _ => false,
×
285
        };
286

287
        let message = request.into_message();
×
288

×
289
        let mut responses: Vec<TxQueryBatchResponse> = Vec::new();
×
290

291
        let metadata = self
×
292
            .db
×
293
            .get_chain_metadata()
×
294
            .await
×
295
            .rpc_status_internal_error(LOG_TARGET)?;
×
296

297
        for sig in message.sigs {
×
298
            let signature = Signature::try_from(sig).map_err(|_| RpcStatus::bad_request("Signature was invalid"))?;
×
299
            let response: TxQueryResponse = self.fetch_kernel(signature.clone()).await?;
×
300
            responses.push(TxQueryBatchResponse {
×
301
                signature: Some(SignatureProto::from(&signature)),
×
302
                location: response.location,
×
303
                best_block_hash: response.best_block_hash,
×
304
                confirmations: response.confirmations,
×
305
                best_block_height: response.best_block_height.saturating_sub(response.confirmations),
×
306
                mined_timestamp: response.mined_timestamp,
×
307
            });
×
308
        }
309
        Ok(Response::new(TxQueryBatchResponses {
×
310
            responses,
×
311
            is_synced,
×
312
            best_block_hash: metadata.best_block_hash().to_vec(),
×
313
            best_block_height: metadata.best_block_height(),
×
314
            tip_mined_timestamp: metadata.timestamp(),
×
315
        }))
×
316
    }
×
317

318
    async fn fetch_matching_utxos(
319
        &self,
320
        request: Request<FetchMatchingUtxos>,
321
    ) -> Result<Response<FetchUtxosResponse>, RpcStatus> {
×
322
        let message = request.into_message();
×
323

×
324
        let state_machine = self.state_machine();
×
325
        // Determine if we are synced
×
326
        let status_watch = state_machine.get_status_info_watch();
×
327
        let is_synced = match (status_watch.borrow()).state_info {
×
328
            StateInfo::Listening(li) => li.is_synced(),
×
329
            _ => false,
×
330
        };
331

332
        let db = self.db();
×
333
        let mut res = Vec::with_capacity(message.output_hashes.len());
×
334
        let hashes: Vec<FixedHash> = message
×
335
            .output_hashes
×
336
            .into_iter()
×
337
            .map(|hash| hash.try_into().map_err(|_| "Malformed pruned hash".to_string()))
×
338
            .collect::<Result<_, _>>()
×
339
            .map_err(|_| RpcStatus::bad_request(&"Malformed block hash received".to_string()))?;
×
340
        let utxos = db
×
341
            .fetch_outputs_with_spend_status_at_tip(hashes)
×
342
            .await
×
343
            .rpc_status_internal_error(LOG_TARGET)?
×
344
            .into_iter()
×
345
            .flatten();
×
346
        for (output, spent) in utxos {
×
347
            if !spent {
×
348
                res.push(output);
×
349
            }
×
350
        }
351

352
        Ok(Response::new(FetchUtxosResponse {
353
            outputs: res
×
354
                .into_iter()
×
355
                .map(TryInto::try_into)
×
356
                .collect::<Result<Vec<_>, String>>()
×
357
                .map_err(|err| RpcStatus::bad_request(&err))?,
×
358
            is_synced,
×
359
        }))
360
    }
×
361

362
    async fn utxo_query(&self, request: Request<UtxoQueryRequest>) -> Result<Response<UtxoQueryResponses>, RpcStatus> {
×
363
        let message = request.into_message();
×
364
        if message.output_hashes.is_empty() {
×
365
            return Err(RpcStatus::bad_request("Empty output hashes"));
×
366
        }
×
367
        const MAX_ALLOWED_QUERY_SIZE: usize = 512;
368
        if message.output_hashes.len() > MAX_ALLOWED_QUERY_SIZE {
×
369
            return Err(RpcStatus::bad_request(&format!(
×
370
                "Exceeded maximum allowed query hashes. Max: {}",
×
371
                MAX_ALLOWED_QUERY_SIZE
×
372
            )));
×
373
        }
×
374

×
375
        let db = self.db();
×
376

×
377
        debug!(
×
378
            target: LOG_TARGET,
×
379
            "Querying {} UTXO(s) for mined state",
×
380
            message.output_hashes.len(),
×
381
        );
382
        let hashes: Vec<FixedHash> = message
×
383
            .output_hashes
×
384
            .into_iter()
×
385
            .map(|hash| hash.try_into().map_err(|_| "Malformed pruned hash".to_string()))
×
386
            .collect::<Result<_, _>>()
×
387
            .map_err(|_| RpcStatus::bad_request(&"Malformed block hash received".to_string()))?;
×
388
        trace!(
×
389
            target: LOG_TARGET,
×
390
            "UTXO hashes queried from wallet: {:?}",
×
391
            hashes.iter().map(|h| h.to_hex()).collect::<Vec<String>>()
×
392
        );
393

394
        let mined_info_resp = db
×
395
            .fetch_outputs_mined_info(hashes)
×
396
            .await
×
397
            .rpc_status_internal_error(LOG_TARGET)?;
×
398

399
        let num_mined = mined_info_resp.iter().filter(|opt| opt.is_some()).count();
×
400
        debug!(
×
401
            target: LOG_TARGET,
×
402
            "Found {} mined and {} unmined UTXO(s)",
×
403
            num_mined,
×
404
            mined_info_resp.len() - num_mined
×
405
        );
406
        let metadata = self
×
407
            .db
×
408
            .get_chain_metadata()
×
409
            .await
×
410
            .rpc_status_internal_error(LOG_TARGET)?;
×
411

412
        Ok(Response::new(UtxoQueryResponses {
413
            best_block_height: metadata.best_block_height(),
×
414
            best_block_hash: metadata.best_block_hash().to_vec(),
×
415
            responses: mined_info_resp
×
416
                .into_iter()
×
417
                .flatten()
×
418
                .map(|utxo| {
×
419
                    Ok(UtxoQueryResponse {
×
420
                        mined_at_height: utxo.mined_height,
×
421
                        mined_in_block: utxo.header_hash.to_vec(),
×
422
                        output_hash: utxo.output.hash().to_vec(),
×
423
                        output: match utxo.output.try_into() {
×
424
                            Ok(output) => Some(output),
×
425
                            Err(err) => {
×
426
                                return Err(err);
×
427
                            },
428
                        },
429
                        mined_timestamp: utxo.mined_timestamp,
×
430
                    })
431
                })
×
432
                .collect::<Result<Vec<_>, String>>()
×
433
                .map_err(|err| RpcStatus::bad_request(&err))?,
×
434
        }))
435
    }
×
436

437
    async fn query_deleted(
438
        &self,
439
        request: Request<QueryDeletedRequest>,
440
    ) -> Result<Response<QueryDeletedResponse>, RpcStatus> {
×
441
        let message = request.into_message();
×
442
        if message.hashes.len() > MAX_QUERY_DELETED_HASHES {
×
443
            return Err(RpcStatus::bad_request(
×
444
                &"Received more hashes than we allow".to_string(),
×
445
            ));
×
446
        }
×
447
        let chain_include_header = message.chain_must_include_header;
×
448
        if !chain_include_header.is_empty() {
×
449
            let hash = chain_include_header
×
450
                .try_into()
×
451
                .map_err(|_| RpcStatus::bad_request(&"Malformed block hash received".to_string()))?;
×
452
            if self
×
453
                .db
×
454
                .fetch_header_by_block_hash(hash)
×
455
                .await
×
456
                .rpc_status_internal_error(LOG_TARGET)?
×
457
                .is_none()
×
458
            {
459
                return Err(RpcStatus::not_found(
×
460
                    "Chain does not include header. It might have been reorged out",
×
461
                ));
×
462
            }
×
463
        }
×
464
        let hashes: Vec<FixedHash> = message
×
465
            .hashes
×
466
            .into_iter()
×
467
            .map(|hash| hash.try_into())
×
468
            .collect::<Result<_, _>>()
×
469
            .map_err(|_| RpcStatus::bad_request(&"Malformed utxo hash received".to_string()))?;
×
470
        let mut return_data = Vec::with_capacity(hashes.len());
×
471
        let utxos = self
×
472
            .db
×
473
            .fetch_outputs_mined_info(hashes.clone())
×
474
            .await
×
475
            .rpc_status_internal_error(LOG_TARGET)?;
×
476
        let txos = self
×
477
            .db
×
478
            .fetch_inputs_mined_info(hashes)
×
479
            .await
×
480
            .rpc_status_internal_error(LOG_TARGET)?;
×
481
        if utxos.len() != txos.len() {
×
482
            return Err(RpcStatus::general("database returned different inputs vs outputs"));
×
483
        }
×
484
        for (utxo, txo) in utxos.iter().zip(txos.iter()) {
×
485
            let mut data = match utxo {
×
486
                None => QueryDeletedData {
×
487
                    mined_at_height: 0,
×
488
                    block_mined_in: Vec::new(),
×
489
                    height_deleted_at: 0,
×
490
                    block_deleted_in: Vec::new(),
×
491
                },
×
492
                Some(u) => QueryDeletedData {
×
493
                    mined_at_height: u.mined_height,
×
494
                    block_mined_in: u.header_hash.to_vec(),
×
495
                    height_deleted_at: 0,
×
496
                    block_deleted_in: Vec::new(),
×
497
                },
×
498
            };
499
            if let Some(input) = txo {
×
500
                data.height_deleted_at = input.spent_height;
×
501
                data.block_deleted_in = input.header_hash.to_vec();
×
502
            };
×
503
            return_data.push(data);
×
504
        }
505
        let metadata = self
×
506
            .db
×
507
            .get_chain_metadata()
×
508
            .await
×
509
            .rpc_status_internal_error(LOG_TARGET)?;
×
510

511
        Ok(Response::new(QueryDeletedResponse {
×
512
            best_block_height: metadata.best_block_height(),
×
513
            best_block_hash: metadata.best_block_hash().to_vec(),
×
514
            data: return_data,
×
515
        }))
×
516
    }
×
517

518
    async fn get_tip_info(&self, _request: Request<()>) -> Result<Response<TipInfoResponse>, RpcStatus> {
×
519
        let state_machine = self.state_machine();
×
520
        let status_watch = state_machine.get_status_info_watch();
×
521
        let is_synced = match status_watch.borrow().state_info {
×
522
            StateInfo::Listening(li) => li.is_synced(),
×
523
            _ => false,
×
524
        };
525

526
        let metadata = self
×
527
            .db
×
528
            .get_chain_metadata()
×
529
            .await
×
530
            .rpc_status_internal_error(LOG_TARGET)?;
×
531

532
        Ok(Response::new(TipInfoResponse {
×
533
            metadata: Some(metadata.into()),
×
534
            is_synced,
×
535
        }))
×
536
    }
×
537

538
    async fn get_header(&self, request: Request<u64>) -> Result<Response<proto::core::BlockHeader>, RpcStatus> {
×
539
        let height = request.into_message();
×
540
        let header = self
×
541
            .db()
×
542
            .fetch_header(height)
×
543
            .await
×
544
            .rpc_status_internal_error(LOG_TARGET)?
×
545
            .ok_or_else(|| RpcStatus::not_found(&format!("Header not found at height {}", height)))?;
×
546

547
        Ok(Response::new(header.into()))
×
548
    }
×
549

550
    async fn get_header_by_height(
551
        &self,
552
        request: Request<u64>,
553
    ) -> Result<Response<proto::core::BlockHeader>, RpcStatus> {
×
554
        let height = request.into_message();
×
555
        let header = self
×
556
            .db()
×
557
            .fetch_header(height)
×
558
            .await
×
559
            .rpc_status_internal_error(LOG_TARGET)?
×
560
            .ok_or_else(|| RpcStatus::not_found(&format!("Header not found at height {}", height)))?;
×
561

562
        Ok(Response::new(header.into()))
×
563
    }
×
564

565
    async fn get_height_at_time(&self, request: Request<u64>) -> Result<Response<u64>, RpcStatus> {
×
566
        let requested_epoch_time: u64 = request.into_message();
×
567
        trace!(target: LOG_TARGET, "requested_epoch_time: {}", requested_epoch_time);
×
568
        let tip_header = self
×
569
            .db()
×
570
            .fetch_tip_header()
×
571
            .await
×
572
            .rpc_status_internal_error(LOG_TARGET)?;
×
573

574
        let mut left_height = 0u64;
×
575
        let mut right_height = tip_header.height();
×
576
        trace!(
×
577
            target: LOG_TARGET,
×
578
            "requested_epoch_time: {}, left: {}, right: {}",
×
579
            requested_epoch_time,
580
            left_height,
581
            right_height
582
        );
583

584
        while left_height <= right_height {
×
585
            let mut mid_height = (left_height + right_height) / 2;
×
586

×
587
            if mid_height == 0 {
×
588
                return Ok(Response::new(0u64));
×
589
            }
×
590
            // If the two bounds are adjacent then perform the test between the right and left sides
×
591
            if left_height == mid_height {
×
592
                mid_height = right_height;
×
593
            }
×
594

595
            let mid_header = self
×
596
                .db()
×
597
                .fetch_header(mid_height)
×
598
                .await
×
599
                .rpc_status_internal_error(LOG_TARGET)?
×
600
                .ok_or_else(|| {
×
601
                    RpcStatus::not_found(&format!("Header not found during search at height {}", mid_height))
×
602
                })?;
×
603
            let before_mid_header = self
×
604
                .db()
×
605
                .fetch_header(mid_height - 1)
×
606
                .await
×
607
                .rpc_status_internal_error(LOG_TARGET)?
×
608
                .ok_or_else(|| {
×
609
                    RpcStatus::not_found(&format!("Header not found during search at height {}", mid_height - 1))
×
610
                })?;
×
611
            trace!(
×
612
                target: LOG_TARGET,
×
613
                "requested_epoch_time: {}, left: {}, mid: {}/{} ({}/{}), right: {}",
×
614
                requested_epoch_time,
×
615
                left_height,
×
616
                mid_height,
×
617
                mid_height-1,
×
618
                mid_header.timestamp.as_u64(),
×
619
                before_mid_header.timestamp.as_u64(),
×
620
                right_height
621
            );
622
            if requested_epoch_time < mid_header.timestamp.as_u64() &&
×
623
                requested_epoch_time >= before_mid_header.timestamp.as_u64()
×
624
            {
625
                trace!(
×
626
                    target: LOG_TARGET,
×
627
                    "requested_epoch_time: {}, selected height: {}",
×
628
                    requested_epoch_time, before_mid_header.height
629
                );
630
                return Ok(Response::new(before_mid_header.height));
×
631
            } else if mid_height == right_height {
×
632
                trace!(
×
633
                    target: LOG_TARGET,
×
634
                    "requested_epoch_time: {}, selected height: {}",
×
635
                    requested_epoch_time, right_height
636
                );
637
                return Ok(Response::new(right_height));
×
638
            } else if requested_epoch_time <= mid_header.timestamp.as_u64() {
×
639
                right_height = mid_height;
×
640
            } else {
×
641
                left_height = mid_height;
×
642
            }
×
643
        }
644

645
        Ok(Response::new(0u64))
×
646
    }
×
647

648
    async fn sync_utxos_by_block(
649
        &self,
650
        request: Request<SyncUtxosByBlockRequest>,
651
    ) -> Result<Streaming<SyncUtxosByBlockResponse>, RpcStatus> {
×
652
        let req = request.message();
×
653
        let peer = request.context().peer_node_id();
×
654
        debug!(
×
655
            target: LOG_TARGET,
×
656
            "Received sync_utxos_by_block request from {} from header {} to {} ",
×
657
            peer,
×
658
            req.start_header_hash.to_hex(),
×
659
            req.end_header_hash.to_hex(),
×
660
        );
661

662
        // Number of blocks to load and push to the stream before loading the next batch. Most blocks have 1 output but
663
        // full blocks will have 500
664
        const BATCH_SIZE: usize = 5;
665
        let (tx, rx) = mpsc::channel(BATCH_SIZE);
×
666
        let task = SyncUtxosByBlockTask::new(self.db());
×
667
        task.run(request.into_message(), tx).await?;
×
668

669
        Ok(Streaming::new(rx))
×
670
    }
×
671

672
    async fn get_mempool_fee_per_gram_stats(
673
        &self,
674
        request: Request<GetMempoolFeePerGramStatsRequest>,
675
    ) -> Result<Response<GetMempoolFeePerGramStatsResponse>, RpcStatus> {
×
676
        let req = request.into_message();
×
677
        let count =
×
678
            usize::try_from(req.count).map_err(|_| RpcStatus::bad_request("count must be less than or equal to 20"))?;
×
679

680
        if count > 20 {
×
681
            return Err(RpcStatus::bad_request("count must be less than or equal to 20"));
×
682
        }
×
683

684
        let metadata = self
×
685
            .db
×
686
            .get_chain_metadata()
×
687
            .await
×
688
            .rpc_status_internal_error(LOG_TARGET)?;
×
689
        let stats = self
×
690
            .mempool()
×
691
            .get_fee_per_gram_stats(count, metadata.best_block_height())
×
692
            .await
×
693
            .rpc_status_internal_error(LOG_TARGET)?;
×
694

695
        Ok(Response::new(stats.into()))
×
696
    }
×
697

698
    async fn get_wallet_query_http_service_address(
699
        &self,
700
        _request: Request<()>,
701
    ) -> Result<Response<GetWalletQueryHttpServiceAddressResponse>, RpcStatus> {
×
702
        Ok(Response::new(GetWalletQueryHttpServiceAddressResponse {
×
703
            http_address: self
×
704
                .wallet_query_service_address
×
705
                .clone()
×
706
                .map(|url| url.to_string())
×
707
                .unwrap_or_default(),
×
708
        }))
×
709
    }
×
710
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc