• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 18097567115

29 Sep 2025 12:50PM UTC coverage: 58.554% (-2.3%) from 60.88%
18097567115

push

github

web-flow
chore(ci): switch rust toolchain to stable (#7524)

Description
switch rust toolchain to stable

Motivation and Context
use stable rust toolchain


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

* **Chores**
* Standardized Rust toolchain on stable across CI workflows for more
predictable builds.
* Streamlined setup by removing unnecessary components and aligning
toolchain configuration with environment variables.
  * Enabled an environment flag to improve rustup behavior during CI.
* Improved coverage workflow consistency with dynamic toolchain
selection.

* **Tests**
* Removed nightly-only requirements, simplifying test commands and
improving compatibility.
* Expanded CI triggers to include ci-* branches for better pre-merge
validation.
* Maintained existing job logic while improving reliability and
maintainability.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->

66336 of 113291 relevant lines covered (58.55%)

551641.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/base_layer/core/src/base_node/rpc/service.rs
1
// Copyright 2025 The Tari Project
2
// SPDX-License-Identifier: BSD-3-Clause
3

4
use std::convert::{TryFrom, TryInto};
5

6
use log::*;
7
use tari_common_types::types::{CompressedSignature, FixedHash};
8
use tari_comms::protocol::rpc::{Request, Response, RpcStatus, RpcStatusResultExt, Streaming};
9
use tari_transaction_components::transaction_components::Transaction;
10
use tari_utilities::hex::Hex;
11
use tokio::sync::mpsc;
12
use url::Url;
13

14
use crate::{
15
    base_node::{
16
        rpc::{sync_utxos_by_block_task::SyncUtxosByBlockTask, BaseNodeWalletService},
17
        state_machine_service::states::StateInfo,
18
        StateMachineHandle,
19
    },
20
    chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend},
21
    mempool::{service::MempoolHandle, TxStorageResponse},
22
    proto,
23
    proto::{
24
        base_node::{
25
            FetchMatchingUtxos,
26
            FetchUtxosResponse,
27
            GetMempoolFeePerGramStatsRequest,
28
            GetMempoolFeePerGramStatsResponse,
29
            GetWalletQueryHttpServiceAddressResponse,
30
            QueryDeletedData,
31
            QueryDeletedRequest,
32
            QueryDeletedResponse,
33
            Signatures as SignaturesProto,
34
            SyncUtxosByBlockRequest,
35
            SyncUtxosByBlockResponse,
36
            TipInfoResponse,
37
            TxLocation,
38
            TxQueryBatchResponse,
39
            TxQueryBatchResponses,
40
            TxQueryResponse,
41
            TxSubmissionRejectionReason,
42
            TxSubmissionResponse,
43
            UtxoQueryRequest,
44
            UtxoQueryResponse,
45
            UtxoQueryResponses,
46
        },
47
        types::{Signature as SignatureProto, Transaction as TransactionProto},
48
    },
49
};
50

51
const LOG_TARGET: &str = "c::base_node::rpc";
52
const MAX_QUERY_DELETED_HASHES: usize = 1000;
53

54
pub struct BaseNodeWalletRpcService<B> {
55
    db: AsyncBlockchainDb<B>,
56
    mempool: MempoolHandle,
57
    state_machine: StateMachineHandle,
58
    wallet_query_service_address: Option<Url>,
59
}
60

61
impl<B: BlockchainBackend + 'static> BaseNodeWalletRpcService<B> {
62
    pub fn new(
×
63
        db: AsyncBlockchainDb<B>,
×
64
        mempool: MempoolHandle,
×
65
        state_machine: StateMachineHandle,
×
66
        wallet_query_service_address: Option<Url>,
×
67
    ) -> Self {
×
68
        Self {
×
69
            db,
×
70
            mempool,
×
71
            state_machine,
×
72
            wallet_query_service_address,
×
73
        }
×
74
    }
×
75

76
    #[inline]
77
    fn db(&self) -> AsyncBlockchainDb<B> {
×
78
        self.db.clone()
×
79
    }
×
80

81
    #[inline]
82
    pub fn mempool(&self) -> MempoolHandle {
×
83
        self.mempool.clone()
×
84
    }
×
85

86
    #[inline]
87
    pub fn state_machine(&self) -> StateMachineHandle {
×
88
        self.state_machine.clone()
×
89
    }
×
90

91
    async fn fetch_kernel(&self, signature: CompressedSignature) -> Result<TxQueryResponse, RpcStatus> {
×
92
        let db = self.db();
×
93
        let chain_metadata = db.get_chain_metadata().await.rpc_status_internal_error(LOG_TARGET)?;
×
94
        let state_machine = self.state_machine();
×
95

96
        // Determine if we are synced
97
        let status_watch = state_machine.get_status_info_watch();
×
98
        let is_synced = match (status_watch.borrow()).state_info {
×
99
            StateInfo::Listening(li) => li.is_synced(),
×
100
            _ => false,
×
101
        };
102
        match db
×
103
            .fetch_kernel_by_excess_sig(signature.clone())
×
104
            .await
×
105
            .rpc_status_internal_error(LOG_TARGET)?
×
106
        {
107
            None => (),
×
108
            Some((_, block_hash)) => {
×
109
                match db
×
110
                    .fetch_header_by_block_hash(block_hash)
×
111
                    .await
×
112
                    .rpc_status_internal_error(LOG_TARGET)?
×
113
                {
114
                    None => (),
×
115
                    Some(header) => {
×
116
                        let confirmations = chain_metadata.best_block_height().saturating_sub(header.height);
×
117
                        let response = TxQueryResponse {
×
118
                            location: TxLocation::Mined as i32,
×
119
                            best_block_hash: block_hash.to_vec(),
×
120
                            confirmations,
×
121
                            is_synced,
×
122
                            best_block_height: chain_metadata.best_block_height(),
×
123
                            mined_timestamp: header.timestamp.as_u64(),
×
124
                        };
×
125
                        return Ok(response);
×
126
                    },
127
                }
128
            },
129
        };
130

131
        // If not in a block then check the mempool
132
        let mut mempool = self.mempool();
×
133
        let mempool_response = match mempool
×
134
            .get_tx_state_by_excess_sig(signature.clone())
×
135
            .await
×
136
            .rpc_status_internal_error(LOG_TARGET)?
×
137
        {
138
            TxStorageResponse::UnconfirmedPool => TxQueryResponse {
×
139
                location: TxLocation::InMempool as i32,
×
140
                best_block_hash: vec![],
×
141
                confirmations: 0,
×
142
                is_synced,
×
143
                best_block_height: chain_metadata.best_block_height(),
×
144
                mined_timestamp: 0,
×
145
            },
×
146
            TxStorageResponse::ReorgPool |
147
            TxStorageResponse::NotStoredOrphan |
148
            TxStorageResponse::NotStoredTimeLocked |
149
            TxStorageResponse::NotStoredAlreadySpent |
150
            TxStorageResponse::NotStoredConsensus |
151
            TxStorageResponse::NotStored |
152
            TxStorageResponse::NotStoredFeeTooLow |
153
            TxStorageResponse::NotStoredAlreadyMined => TxQueryResponse {
×
154
                location: TxLocation::NotStored as i32,
×
155
                best_block_hash: vec![],
×
156
                confirmations: 0,
×
157
                is_synced,
×
158
                best_block_height: chain_metadata.best_block_height(),
×
159
                mined_timestamp: 0,
×
160
            },
×
161
        };
162
        Ok(mempool_response)
×
163
    }
×
164
}
165

166
#[tari_comms::async_trait]
167
impl<B: BlockchainBackend + 'static> BaseNodeWalletService for BaseNodeWalletRpcService<B> {
168
    async fn submit_transaction(
169
        &self,
170
        request: Request<TransactionProto>,
171
    ) -> Result<Response<TxSubmissionResponse>, RpcStatus> {
×
172
        let message = request.into_message();
×
173
        let transaction =
×
174
            Transaction::try_from(message).map_err(|_| RpcStatus::bad_request("Transaction was invalid"))?;
×
175
        let mut mempool = self.mempool();
×
176
        let state_machine = self.state_machine();
×
177

178
        // Determine if we are synced
179
        let status_watch = state_machine.get_status_info_watch();
×
180
        let is_synced = match (status_watch.borrow()).state_info {
×
181
            StateInfo::Listening(li) => li.is_synced(),
×
182
            _ => false,
×
183
        };
184

185
        let response = match mempool
×
186
            .submit_transaction(transaction.clone())
×
187
            .await
×
188
            .rpc_status_internal_error(LOG_TARGET)?
×
189
        {
190
            TxStorageResponse::UnconfirmedPool => TxSubmissionResponse {
×
191
                accepted: true,
×
192
                rejection_reason: TxSubmissionRejectionReason::None.into(),
×
193
                is_synced,
×
194
            },
×
195

196
            TxStorageResponse::NotStoredOrphan => TxSubmissionResponse {
×
197
                accepted: false,
×
198
                rejection_reason: TxSubmissionRejectionReason::Orphan.into(),
×
199
                is_synced,
×
200
            },
×
201
            TxStorageResponse::NotStoredFeeTooLow => TxSubmissionResponse {
×
202
                accepted: false,
×
203
                rejection_reason: TxSubmissionRejectionReason::FeeTooLow.into(),
×
204
                is_synced,
×
205
            },
×
206
            TxStorageResponse::NotStoredTimeLocked => TxSubmissionResponse {
×
207
                accepted: false,
×
208
                rejection_reason: TxSubmissionRejectionReason::TimeLocked.into(),
×
209
                is_synced,
×
210
            },
×
211
            TxStorageResponse::NotStoredConsensus | TxStorageResponse::NotStored => TxSubmissionResponse {
×
212
                accepted: false,
×
213
                rejection_reason: TxSubmissionRejectionReason::ValidationFailed.into(),
×
214
                is_synced,
×
215
            },
×
216
            TxStorageResponse::NotStoredAlreadySpent |
217
            TxStorageResponse::ReorgPool |
218
            TxStorageResponse::NotStoredAlreadyMined => {
219
                // Is this transaction a double spend or has this transaction been mined?
220
                match transaction.first_kernel_excess_sig() {
×
221
                    None => TxSubmissionResponse {
×
222
                        accepted: false,
×
223
                        rejection_reason: TxSubmissionRejectionReason::DoubleSpend.into(),
×
224
                        is_synced,
×
225
                    },
×
226
                    Some(s) => {
×
227
                        // Check to see if the kernel exists in the blockchain db in which case this exact transaction
228
                        // already exists in the chain, otherwise it is a double spend
229
                        let db = self.db();
×
230
                        match db
×
231
                            .fetch_kernel_by_excess_sig(s.clone())
×
232
                            .await
×
233
                            .rpc_status_internal_error(LOG_TARGET)?
×
234
                        {
235
                            None => TxSubmissionResponse {
×
236
                                accepted: false,
×
237
                                rejection_reason: TxSubmissionRejectionReason::DoubleSpend.into(),
×
238
                                is_synced,
×
239
                            },
×
240
                            Some(_) => TxSubmissionResponse {
×
241
                                accepted: false,
×
242
                                rejection_reason: TxSubmissionRejectionReason::AlreadyMined.into(),
×
243
                                is_synced,
×
244
                            },
×
245
                        }
246
                    },
247
                }
248
            },
249
        };
250
        Ok(Response::new(response))
×
251
    }
×
252

253
    async fn transaction_query(
254
        &self,
255
        request: Request<SignatureProto>,
256
    ) -> Result<Response<TxQueryResponse>, RpcStatus> {
×
257
        let state_machine = self.state_machine();
×
258

259
        // Determine if we are synced
260
        let status_watch = state_machine.get_status_info_watch();
×
261
        let is_synced = match status_watch.borrow().state_info {
×
262
            StateInfo::Listening(li) => li.is_synced(),
×
263
            _ => false,
×
264
        };
265

266
        let message = request.into_message();
×
267
        let signature =
×
268
            CompressedSignature::try_from(message).map_err(|_| RpcStatus::bad_request("Signature was invalid"))?;
×
269

270
        let mut response = self.fetch_kernel(signature).await?;
×
271
        response.is_synced = is_synced;
×
272
        Ok(Response::new(response))
×
273
    }
×
274

275
    async fn transaction_batch_query(
276
        &self,
277
        request: Request<SignaturesProto>,
278
    ) -> Result<Response<TxQueryBatchResponses>, RpcStatus> {
×
279
        let state_machine = self.state_machine();
×
280

281
        // Determine if we are synced
282
        let status_watch = state_machine.get_status_info_watch();
×
283
        let is_synced = match (status_watch.borrow()).state_info {
×
284
            StateInfo::Listening(li) => li.is_synced(),
×
285
            _ => false,
×
286
        };
287

288
        let message = request.into_message();
×
289

290
        let mut responses: Vec<TxQueryBatchResponse> = Vec::new();
×
291

292
        let metadata = self
×
293
            .db
×
294
            .get_chain_metadata()
×
295
            .await
×
296
            .rpc_status_internal_error(LOG_TARGET)?;
×
297

298
        for sig in message.sigs {
×
299
            let signature =
×
300
                CompressedSignature::try_from(sig).map_err(|_| RpcStatus::bad_request("Signature was invalid"))?;
×
301
            let response: TxQueryResponse = self.fetch_kernel(signature.clone()).await?;
×
302
            responses.push(TxQueryBatchResponse {
×
303
                signature: Some(SignatureProto::from(&signature)),
×
304
                location: response.location,
×
305
                best_block_hash: response.best_block_hash,
×
306
                confirmations: response.confirmations,
×
307
                best_block_height: response.best_block_height.saturating_sub(response.confirmations),
×
308
                mined_timestamp: response.mined_timestamp,
×
309
            });
×
310
        }
311
        Ok(Response::new(TxQueryBatchResponses {
×
312
            responses,
×
313
            is_synced,
×
314
            best_block_hash: metadata.best_block_hash().to_vec(),
×
315
            best_block_height: metadata.best_block_height(),
×
316
            tip_mined_timestamp: metadata.timestamp(),
×
317
        }))
×
318
    }
×
319

320
    async fn fetch_matching_utxos(
321
        &self,
322
        request: Request<FetchMatchingUtxos>,
323
    ) -> Result<Response<FetchUtxosResponse>, RpcStatus> {
×
324
        let message = request.into_message();
×
325

326
        let state_machine = self.state_machine();
×
327
        // Determine if we are synced
328
        let status_watch = state_machine.get_status_info_watch();
×
329
        let is_synced = match (status_watch.borrow()).state_info {
×
330
            StateInfo::Listening(li) => li.is_synced(),
×
331
            _ => false,
×
332
        };
333

334
        let db = self.db();
×
335
        let mut res = Vec::with_capacity(message.output_hashes.len());
×
336
        let hashes: Vec<FixedHash> = message
×
337
            .output_hashes
×
338
            .into_iter()
×
339
            .map(|hash| hash.try_into().map_err(|_| "Malformed pruned hash".to_string()))
×
340
            .collect::<Result<_, _>>()
×
341
            .map_err(|_| RpcStatus::bad_request(&"Malformed block hash received".to_string()))?;
×
342
        let utxos = db
×
343
            .fetch_outputs_with_spend_status_at_tip(hashes)
×
344
            .await
×
345
            .rpc_status_internal_error(LOG_TARGET)?
×
346
            .into_iter()
×
347
            .flatten();
×
348
        for (output, spent) in utxos {
×
349
            if !spent {
×
350
                res.push(output);
×
351
            }
×
352
        }
353

354
        Ok(Response::new(FetchUtxosResponse {
×
355
            outputs: res
×
356
                .into_iter()
×
357
                .map(TryInto::try_into)
×
358
                .collect::<Result<Vec<_>, String>>()
×
359
                .map_err(|err| RpcStatus::bad_request(&err))?,
×
360
            is_synced,
×
361
        }))
362
    }
×
363

364
    async fn utxo_query(&self, request: Request<UtxoQueryRequest>) -> Result<Response<UtxoQueryResponses>, RpcStatus> {
×
365
        let message = request.into_message();
×
366
        if message.output_hashes.is_empty() {
×
367
            return Err(RpcStatus::bad_request("Empty output hashes"));
×
368
        }
×
369
        const MAX_ALLOWED_QUERY_SIZE: usize = 512;
370
        if message.output_hashes.len() > MAX_ALLOWED_QUERY_SIZE {
×
371
            return Err(RpcStatus::bad_request(&format!(
×
372
                "Exceeded maximum allowed query hashes. Max: {MAX_ALLOWED_QUERY_SIZE}"
×
373
            )));
×
374
        }
×
375

376
        let db = self.db();
×
377

378
        debug!(
×
379
            target: LOG_TARGET,
×
380
            "Querying {} UTXO(s) for mined state",
×
381
            message.output_hashes.len(),
×
382
        );
383
        let hashes: Vec<FixedHash> = message
×
384
            .output_hashes
×
385
            .into_iter()
×
386
            .map(|hash| hash.try_into().map_err(|_| "Malformed pruned hash".to_string()))
×
387
            .collect::<Result<_, _>>()
×
388
            .map_err(|_| RpcStatus::bad_request(&"Malformed block hash received".to_string()))?;
×
389
        trace!(
×
390
            target: LOG_TARGET,
×
391
            "UTXO hashes queried from wallet: {:?}",
×
392
            hashes.iter().map(|h| h.to_hex()).collect::<Vec<String>>()
×
393
        );
394

395
        let mined_info_resp = db
×
396
            .fetch_outputs_mined_info(hashes)
×
397
            .await
×
398
            .rpc_status_internal_error(LOG_TARGET)?;
×
399

400
        let num_mined = mined_info_resp.iter().filter(|opt| opt.is_some()).count();
×
401
        debug!(
×
402
            target: LOG_TARGET,
×
403
            "Found {} mined and {} unmined UTXO(s)",
×
404
            num_mined,
405
            mined_info_resp.len() - num_mined
×
406
        );
407
        let metadata = self
×
408
            .db
×
409
            .get_chain_metadata()
×
410
            .await
×
411
            .rpc_status_internal_error(LOG_TARGET)?;
×
412

413
        Ok(Response::new(UtxoQueryResponses {
×
414
            best_block_height: metadata.best_block_height(),
×
415
            best_block_hash: metadata.best_block_hash().to_vec(),
×
416
            responses: mined_info_resp
×
417
                .into_iter()
×
418
                .flatten()
×
419
                .map(|utxo| {
×
420
                    Ok(UtxoQueryResponse {
421
                        mined_at_height: utxo.mined_height,
×
422
                        mined_in_block: utxo.header_hash.to_vec(),
×
423
                        output_hash: utxo.output.hash().to_vec(),
×
424
                        output: match utxo.output.try_into() {
×
425
                            Ok(output) => Some(output),
×
426
                            Err(err) => {
×
427
                                return Err(err);
×
428
                            },
429
                        },
430
                        mined_timestamp: utxo.mined_timestamp,
×
431
                    })
432
                })
×
433
                .collect::<Result<Vec<_>, String>>()
×
434
                .map_err(|err| RpcStatus::bad_request(&err))?,
×
435
        }))
436
    }
×
437

438
    async fn query_deleted(
439
        &self,
440
        request: Request<QueryDeletedRequest>,
441
    ) -> Result<Response<QueryDeletedResponse>, RpcStatus> {
×
442
        let message = request.into_message();
×
443
        if message.hashes.len() > MAX_QUERY_DELETED_HASHES {
×
444
            return Err(RpcStatus::bad_request(
×
445
                &"Received more hashes than we allow".to_string(),
×
446
            ));
×
447
        }
×
448
        let chain_include_header = message.chain_must_include_header;
×
449
        if !chain_include_header.is_empty() {
×
450
            let hash = chain_include_header
×
451
                .try_into()
×
452
                .map_err(|_| RpcStatus::bad_request(&"Malformed block hash received".to_string()))?;
×
453
            if self
×
454
                .db
×
455
                .fetch_header_by_block_hash(hash)
×
456
                .await
×
457
                .rpc_status_internal_error(LOG_TARGET)?
×
458
                .is_none()
×
459
            {
460
                return Err(RpcStatus::not_found(
×
461
                    "Chain does not include header. It might have been reorged out",
×
462
                ));
×
463
            }
×
464
        }
×
465
        let hashes: Vec<FixedHash> = message
×
466
            .hashes
×
467
            .into_iter()
×
468
            .map(|hash| hash.try_into())
×
469
            .collect::<Result<_, _>>()
×
470
            .map_err(|_| RpcStatus::bad_request(&"Malformed utxo hash received".to_string()))?;
×
471
        let mut return_data = Vec::with_capacity(hashes.len());
×
472
        let utxos = self
×
473
            .db
×
474
            .fetch_outputs_mined_info(hashes.clone())
×
475
            .await
×
476
            .rpc_status_internal_error(LOG_TARGET)?;
×
477
        let txos = self
×
478
            .db
×
479
            .fetch_inputs_mined_info(hashes)
×
480
            .await
×
481
            .rpc_status_internal_error(LOG_TARGET)?;
×
482
        if utxos.len() != txos.len() {
×
483
            return Err(RpcStatus::general("database returned different inputs vs outputs"));
×
484
        }
×
485
        for (utxo, txo) in utxos.iter().zip(txos.iter()) {
×
486
            let mut data = match utxo {
×
487
                None => QueryDeletedData {
×
488
                    mined_at_height: 0,
×
489
                    block_mined_in: Vec::new(),
×
490
                    height_deleted_at: 0,
×
491
                    block_deleted_in: Vec::new(),
×
492
                },
×
493
                Some(u) => QueryDeletedData {
×
494
                    mined_at_height: u.mined_height,
×
495
                    block_mined_in: u.header_hash.to_vec(),
×
496
                    height_deleted_at: 0,
×
497
                    block_deleted_in: Vec::new(),
×
498
                },
×
499
            };
500
            if let Some(input) = txo {
×
501
                data.height_deleted_at = input.spent_height;
×
502
                data.block_deleted_in = input.header_hash.to_vec();
×
503
            };
×
504
            return_data.push(data);
×
505
        }
506
        let metadata = self
×
507
            .db
×
508
            .get_chain_metadata()
×
509
            .await
×
510
            .rpc_status_internal_error(LOG_TARGET)?;
×
511

512
        Ok(Response::new(QueryDeletedResponse {
×
513
            best_block_height: metadata.best_block_height(),
×
514
            best_block_hash: metadata.best_block_hash().to_vec(),
×
515
            data: return_data,
×
516
        }))
×
517
    }
×
518

519
    async fn get_tip_info(&self, _request: Request<()>) -> Result<Response<TipInfoResponse>, RpcStatus> {
×
520
        let state_machine = self.state_machine();
×
521
        let status_watch = state_machine.get_status_info_watch();
×
522
        let is_synced = match status_watch.borrow().state_info {
×
523
            StateInfo::Listening(li) => li.is_synced(),
×
524
            _ => false,
×
525
        };
526

527
        let metadata = self
×
528
            .db
×
529
            .get_chain_metadata()
×
530
            .await
×
531
            .rpc_status_internal_error(LOG_TARGET)?;
×
532

533
        Ok(Response::new(TipInfoResponse {
×
534
            metadata: Some(metadata.into()),
×
535
            is_synced,
×
536
        }))
×
537
    }
×
538

539
    async fn get_header(&self, request: Request<u64>) -> Result<Response<proto::core::BlockHeader>, RpcStatus> {
×
540
        let height = request.into_message();
×
541
        let header = self
×
542
            .db()
×
543
            .fetch_header(height)
×
544
            .await
×
545
            .rpc_status_internal_error(LOG_TARGET)?
×
546
            .ok_or_else(|| RpcStatus::not_found(&format!("Header not found at height {height}")))?;
×
547

548
        Ok(Response::new(header.into()))
×
549
    }
×
550

551
    async fn get_header_by_height(
552
        &self,
553
        request: Request<u64>,
554
    ) -> Result<Response<proto::core::BlockHeader>, RpcStatus> {
×
555
        let height = request.into_message();
×
556
        let header = self
×
557
            .db()
×
558
            .fetch_header(height)
×
559
            .await
×
560
            .rpc_status_internal_error(LOG_TARGET)?
×
561
            .ok_or_else(|| RpcStatus::not_found(&format!("Header not found at height {height}")))?;
×
562

563
        Ok(Response::new(header.into()))
×
564
    }
×
565

566
    async fn get_height_at_time(&self, request: Request<u64>) -> Result<Response<u64>, RpcStatus> {
×
567
        let requested_epoch_time: u64 = request.into_message();
×
568
        trace!(target: LOG_TARGET, "requested_epoch_time: {requested_epoch_time}");
×
569
        let tip_header = self
×
570
            .db()
×
571
            .fetch_tip_header()
×
572
            .await
×
573
            .rpc_status_internal_error(LOG_TARGET)?;
×
574

575
        let mut left_height = 0u64;
×
576
        let mut right_height = tip_header.height();
×
577
        trace!(
×
578
            target: LOG_TARGET,
×
579
            "requested_epoch_time: {}, left: {}, right: {}",
×
580
            requested_epoch_time,
581
            left_height,
582
            right_height
583
        );
584

585
        while left_height <= right_height {
×
586
            let mut mid_height = (left_height + right_height) / 2;
×
587

588
            if mid_height == 0 {
×
589
                return Ok(Response::new(0u64));
×
590
            }
×
591
            // If the two bounds are adjacent then perform the test between the right and left sides
592
            if left_height == mid_height {
×
593
                mid_height = right_height;
×
594
            }
×
595

596
            let mid_header = self
×
597
                .db()
×
598
                .fetch_header(mid_height)
×
599
                .await
×
600
                .rpc_status_internal_error(LOG_TARGET)?
×
601
                .ok_or_else(|| {
×
602
                    RpcStatus::not_found(&format!("Header not found during search at height {mid_height}"))
×
603
                })?;
×
604
            let before_mid_header = self
×
605
                .db()
×
606
                .fetch_header(mid_height - 1)
×
607
                .await
×
608
                .rpc_status_internal_error(LOG_TARGET)?
×
609
                .ok_or_else(|| {
×
610
                    RpcStatus::not_found(&format!("Header not found during search at height {}", mid_height - 1))
×
611
                })?;
×
612
            trace!(
×
613
                target: LOG_TARGET,
×
614
                "requested_epoch_time: {}, left: {}, mid: {}/{} ({}/{}), right: {}",
×
615
                requested_epoch_time,
616
                left_height,
617
                mid_height,
618
                mid_height-1,
×
619
                mid_header.timestamp.as_u64(),
×
620
                before_mid_header.timestamp.as_u64(),
×
621
                right_height
622
            );
623
            if requested_epoch_time < mid_header.timestamp.as_u64() &&
×
624
                requested_epoch_time >= before_mid_header.timestamp.as_u64()
×
625
            {
626
                trace!(
×
627
                    target: LOG_TARGET,
×
628
                    "requested_epoch_time: {}, selected height: {}",
×
629
                    requested_epoch_time, before_mid_header.height
630
                );
631
                return Ok(Response::new(before_mid_header.height));
×
632
            } else if mid_height == right_height {
×
633
                trace!(
×
634
                    target: LOG_TARGET,
×
635
                    "requested_epoch_time: {requested_epoch_time}, selected height: {right_height}"
×
636
                );
637
                return Ok(Response::new(right_height));
×
638
            } else if requested_epoch_time <= mid_header.timestamp.as_u64() {
×
639
                right_height = mid_height;
×
640
            } else {
×
641
                left_height = mid_height;
×
642
            }
×
643
        }
644

645
        Ok(Response::new(0u64))
×
646
    }
×
647

648
    async fn sync_utxos_by_block(
649
        &self,
650
        request: Request<SyncUtxosByBlockRequest>,
651
    ) -> Result<Streaming<SyncUtxosByBlockResponse>, RpcStatus> {
×
652
        let req = request.message();
×
653
        let peer = request.context().peer_node_id();
×
654
        debug!(
×
655
            target: LOG_TARGET,
×
656
            "Received sync_utxos_by_block request from {} from header {} to {} ",
×
657
            peer,
658
            req.start_header_hash.to_hex(),
×
659
            req.end_header_hash.to_hex(),
×
660
        );
661

662
        // Number of blocks to load and push to the stream before loading the next batch. Most blocks have 1 output but
663
        // full blocks will have 500
664
        const BATCH_SIZE: usize = 5;
665
        let (tx, rx) = mpsc::channel(BATCH_SIZE);
×
666
        let task = SyncUtxosByBlockTask::new(self.db());
×
667
        task.run(request.into_message(), tx).await?;
×
668

669
        Ok(Streaming::new(rx))
×
670
    }
×
671

672
    async fn get_mempool_fee_per_gram_stats(
673
        &self,
674
        request: Request<GetMempoolFeePerGramStatsRequest>,
675
    ) -> Result<Response<GetMempoolFeePerGramStatsResponse>, RpcStatus> {
×
676
        let req = request.into_message();
×
677
        let count =
×
678
            usize::try_from(req.count).map_err(|_| RpcStatus::bad_request("count must be less than or equal to 20"))?;
×
679

680
        if count > 20 {
×
681
            return Err(RpcStatus::bad_request("count must be less than or equal to 20"));
×
682
        }
×
683

684
        let metadata = self
×
685
            .db
×
686
            .get_chain_metadata()
×
687
            .await
×
688
            .rpc_status_internal_error(LOG_TARGET)?;
×
689
        let stats = self
×
690
            .mempool()
×
691
            .get_fee_per_gram_stats(count, metadata.best_block_height())
×
692
            .await
×
693
            .rpc_status_internal_error(LOG_TARGET)?;
×
694

695
        Ok(Response::new(stats.into()))
×
696
    }
×
697

698
    async fn get_wallet_query_http_service_address(
699
        &self,
700
        _request: Request<()>,
701
    ) -> Result<Response<GetWalletQueryHttpServiceAddressResponse>, RpcStatus> {
×
702
        Ok(Response::new(GetWalletQueryHttpServiceAddressResponse {
×
703
            http_address: self
×
704
                .wallet_query_service_address
×
705
                .clone()
×
706
                .map(|url| url.to_string())
×
707
                .unwrap_or_default(),
×
708
        }))
709
    }
×
710
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc