• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 19333426770

13 Nov 2025 01:37PM UTC coverage: 50.497% (-1.0%) from 51.536%
19333426770

push

github

web-flow
feat: add noreadahead to lmdb as config option  (#7581)

Description
---
Added `open::NORDAHEAD` flag to the LMDB builder as config option.

_"**Quote:** Most operating systems perform readahead on read requests
by default. This option turns it off if the OS supports it. Turning it
off may help random read performance when the DB is larger than RAM and
system RAM is full. The option is not implemented on Windows."_

As per LMDB documentation, this might help with the seed nodes' RAM
usage when needed, or for other Linux users.

Motivation and Context
---
See #7578 for background.

How Has This Been Tested?
---
Not tested.

What process can a PR reviewer use to test or verify this change?
---
Code review.

<!-- Checklist -->
<!-- 1. Is the title of your PR in the form that would make nice release
notes? The title, excluding the conventional commit
tag, will be included exactly as is in the CHANGELOG, so please think
about it carefully. -->


Breaking Changes
---

- [x] None
- [ ] Requires data directory on base node to be deleted
- [ ] Requires hard fork
- [ ] Other - Please specify

<!-- Does this include a breaking change? If so, include this line as a
footer -->
<!-- BREAKING CHANGE: Description what the user should do, e.g. delete a
database, resync the chain -->


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

* **New Features**
* Added a configuration option to control OS readahead behavior for
database storage; can be disabled for performance tuning (defaults
preserve prior behavior).
* **Chores**
  * Storage initialization now respects the new readahead setting.
* **Tests**
  * Internal tests updated to reflect the new configuration parameter.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

21 of 22 new or added lines in 2 files covered. (95.45%)

1256 existing lines in 34 files now uncovered.

57979 of 114817 relevant lines covered (50.5%)

325976.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

5.12
/base_layer/core/src/base_node/rpc/query_service.rs
1
// Copyright 2025 The Tari Project
2
// SPDX-License-Identifier: BSD-3-Clause
3

4
use std::cmp;
5

6
use log::trace;
7
use serde_valid::{validation, Validate};
8
use tari_common_types::{
9
    types,
10
    types::{FixedHash, FixedHashSizeError},
11
};
12
use tari_transaction_components::{
13
    rpc::{
14
        models,
15
        models::{
16
            BlockUtxoInfo,
17
            GenerateKernelMerkleProofResponse,
18
            GetUtxosByBlockRequest,
19
            GetUtxosByBlockResponse,
20
            MinimalUtxoSyncInfo,
21
            SyncUtxosByBlockRequest,
22
            SyncUtxosByBlockResponse,
23
            TipInfoResponse,
24
            TxLocation,
25
            TxQueryResponse,
26
        },
27
    },
28
    transaction_components::TransactionOutput,
29
};
30
use tari_utilities::{hex::Hex, ByteArray, ByteArrayError};
31
use thiserror::Error;
32

33
use crate::{
34
    base_node::{rpc::BaseNodeWalletQueryService, state_machine_service::states::StateInfo, StateMachineHandle},
35
    chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend, ChainStorageError},
36
    mempool::{service::MempoolHandle, MempoolServiceError, TxStorageResponse},
37
};
38

39
const LOG_TARGET: &str = "c::bn::rpc::query_service";
40
const SYNC_UTXOS_SPEND_TIP_SAFETY_LIMIT: u64 = 1000;
41

42
#[derive(Debug, Error)]
43
pub enum Error {
44
    #[error("Failed to get chain metadata: {0}")]
45
    FailedToGetChainMetadata(#[from] ChainStorageError),
46
    #[error("Header not found at height: {height}")]
47
    HeaderNotFound { height: u64 },
48
    #[error("Signature conversion error: {0}")]
49
    SignatureConversion(ByteArrayError),
50
    #[error("Mempool service error: {0}")]
51
    MempoolService(#[from] MempoolServiceError),
52
    #[error("Serde validation error: {0}")]
53
    SerdeValidation(#[from] validation::Errors),
54
    #[error("Hash conversion error: {0}")]
55
    HashConversion(#[from] FixedHashSizeError),
56
    #[error("Start header hash not found")]
57
    StartHeaderHashNotFound,
58
    #[error("End header hash not found")]
59
    EndHeaderHashNotFound,
60
    #[error("Header hash not found")]
61
    HeaderHashNotFound,
62
    #[error("Start header height {start_height} cannot be greater than the end header height {end_height}")]
63
    HeaderHeightMismatch { start_height: u64, end_height: u64 },
64
    #[error("Output not found")]
65
    OutputNotFound,
66
    #[error("A general error occurred: {0}")]
67
    General(anyhow::Error),
68
}
69

70
impl Error {
71
    fn general(err: impl Into<anyhow::Error>) -> Self {
×
72
        Error::General(err.into())
×
73
    }
×
74
}
75

76
pub struct Service<B> {
77
    db: AsyncBlockchainDb<B>,
78
    state_machine: StateMachineHandle,
79
    mempool: MempoolHandle,
80
}
81

82
impl<B: BlockchainBackend + 'static> Service<B> {
UNCOV
83
    pub fn new(db: AsyncBlockchainDb<B>, state_machine: StateMachineHandle, mempool: MempoolHandle) -> Self {
×
UNCOV
84
        Self {
×
UNCOV
85
            db,
×
UNCOV
86
            state_machine,
×
UNCOV
87
            mempool,
×
UNCOV
88
        }
×
UNCOV
89
    }
×
90

91
    fn state_machine(&self) -> StateMachineHandle {
×
92
        self.state_machine.clone()
×
93
    }
×
94

UNCOV
95
    fn db(&self) -> &AsyncBlockchainDb<B> {
×
UNCOV
96
        &self.db
×
UNCOV
97
    }
×
98

99
    fn mempool(&self) -> MempoolHandle {
×
100
        self.mempool.clone()
×
101
    }
×
102

103
    async fn fetch_kernel(&self, signature: types::CompressedSignature) -> Result<TxQueryResponse, Error> {
×
104
        let db = self.db();
×
105

106
        match db.fetch_kernel_by_excess_sig(signature.clone()).await? {
×
107
            None => (),
×
108
            Some((_, block_hash)) => match db.fetch_header_by_block_hash(block_hash).await? {
×
109
                None => (),
×
110
                Some(header) => {
×
111
                    let response = TxQueryResponse {
×
112
                        location: TxLocation::Mined,
×
113
                        mined_header_hash: Some(block_hash.to_vec()),
×
114
                        mined_height: Some(header.height),
×
115
                        mined_timestamp: Some(header.timestamp.as_u64()),
×
116
                    };
×
117
                    return Ok(response);
×
118
                },
119
            },
120
        };
121

122
        // If not in a block then check the mempool
123
        let mut mempool = self.mempool();
×
124
        let mempool_response = match mempool.get_tx_state_by_excess_sig(signature.clone()).await? {
×
125
            TxStorageResponse::UnconfirmedPool => TxQueryResponse {
×
126
                location: TxLocation::InMempool,
×
127
                mined_header_hash: None,
×
128
                mined_height: None,
×
129
                mined_timestamp: None,
×
130
            },
×
131
            TxStorageResponse::ReorgPool |
132
            TxStorageResponse::NotStoredOrphan |
133
            TxStorageResponse::NotStoredTimeLocked |
134
            TxStorageResponse::NotStoredAlreadySpent |
135
            TxStorageResponse::NotStoredConsensus |
136
            TxStorageResponse::NotStored |
137
            TxStorageResponse::NotStoredFeeTooLow |
138
            TxStorageResponse::NotStoredAlreadyMined => TxQueryResponse {
×
139
                location: TxLocation::NotStored,
×
140
                mined_timestamp: None,
×
141
                mined_height: None,
×
142
                mined_header_hash: None,
×
143
            },
×
144
        };
145

146
        Ok(mempool_response)
×
147
    }
×
148

149
    async fn fetch_utxos_by_block(&self, request: GetUtxosByBlockRequest) -> Result<GetUtxosByBlockResponse, Error> {
×
150
        request.validate()?;
×
151

152
        let hash = request.header_hash.clone().try_into()?;
×
153

154
        let header = self
×
155
            .db()
×
156
            .fetch_header_by_block_hash(hash)
×
157
            .await?
×
158
            .ok_or_else(|| Error::HeaderHashNotFound)?;
×
159

160
        // fetch utxos
161
        let outputs_with_statuses = self.db.fetch_outputs_in_block_with_spend_state(hash, None).await?;
×
162

163
        let outputs = outputs_with_statuses
×
164
            .into_iter()
×
165
            .map(|(output, _spent)| output)
×
166
            .collect::<Vec<TransactionOutput>>();
×
167

168
        // if its empty, we need to send an empty vec of outputs.
169
        let utxo_block_response = GetUtxosByBlockResponse {
×
170
            outputs,
×
171
            height: header.height,
×
172
            header_hash: hash.to_vec(),
×
173
            mined_timestamp: header.timestamp.as_u64(),
×
174
        };
×
175

176
        Ok(utxo_block_response)
×
177
    }
×
178

179
    #[allow(clippy::too_many_lines)]
UNCOV
180
    async fn fetch_utxos(&self, request: SyncUtxosByBlockRequest) -> Result<SyncUtxosByBlockResponse, Error> {
×
181
        // validate and fetch inputs
UNCOV
182
        request.validate()?;
×
183

UNCOV
184
        let hash = request.start_header_hash.clone().try_into()?;
×
UNCOV
185
        let start_header = self
×
UNCOV
186
            .db()
×
UNCOV
187
            .fetch_header_by_block_hash(hash)
×
UNCOV
188
            .await?
×
UNCOV
189
            .ok_or_else(|| Error::StartHeaderHashNotFound)?;
×
190

UNCOV
191
        let tip_header = self.db.fetch_tip_header().await?;
×
192
        // we only allow wallets to ask for a max of 100 blocks at a time and we want to cache the queries to ensure
193
        // they are in batch of 100 and we want to ensure they request goes to the nearest 100 block height so
194
        // we can cache all wallet's queries
UNCOV
195
        let increase = ((start_header.height + 100) / 100) * 100;
×
UNCOV
196
        let end_height = cmp::min(tip_header.header().height, increase);
×
197

198
        // pagination
UNCOV
199
        let start_header_height = start_header.height + (request.page * request.limit);
×
UNCOV
200
        if start_header_height > tip_header.header().height {
×
UNCOV
201
            return Err(Error::HeaderHeightMismatch {
×
UNCOV
202
                start_height: start_header.height,
×
UNCOV
203
                end_height: tip_header.header().height,
×
UNCOV
204
            });
×
205
        }
×
206
        let start_header = self
×
207
            .db
×
208
            .fetch_header(start_header_height)
×
209
            .await?
×
210
            .ok_or_else(|| Error::HeaderNotFound {
×
211
                height: start_header_height,
×
212
            })?;
×
213
        // fetch utxos
214
        let mut utxos = vec![];
×
215
        let mut current_header = start_header;
×
216
        let mut fetched_utxos = 0;
×
217
        let spending_end_header_hash = self
×
218
            .db
×
219
            .fetch_header(
×
220
                tip_header
×
221
                    .header()
×
222
                    .height
×
223
                    .saturating_sub(SYNC_UTXOS_SPEND_TIP_SAFETY_LIMIT),
×
224
            )
×
225
            .await?
×
226
            .ok_or_else(|| Error::HeaderNotFound {
×
227
                height: tip_header
×
228
                    .header()
×
229
                    .height
×
230
                    .saturating_sub(SYNC_UTXOS_SPEND_TIP_SAFETY_LIMIT),
×
231
            })?
×
232
            .hash();
×
233
        let next_header_to_request;
234
        let mut has_next_page = false;
×
235
        loop {
236
            let current_header_hash = current_header.hash();
×
237
            trace!(
×
238
                target: LOG_TARGET,
×
239
                "current header = {} ({})",
×
240
                current_header.height,
241
                current_header_hash.to_hex()
×
242
            );
243
            let outputs = if request.exclude_spent {
×
244
                self.db
×
245
                    .fetch_outputs_in_block_with_spend_state(current_header_hash, Some(spending_end_header_hash))
×
246
                    .await?
×
247
                    .into_iter()
×
248
                    .filter(|(_, spent)| !spent)
×
249
                    .map(|(output, _spent)| output)
×
250
                    .collect::<Vec<TransactionOutput>>()
×
251
            } else {
252
                self.db
×
253
                    .fetch_outputs_in_block_with_spend_state(current_header_hash, None)
×
254
                    .await?
×
255
                    .into_iter()
×
256
                    .map(|(output, _spent)| output)
×
257
                    .collect::<Vec<TransactionOutput>>()
×
258
            };
259
            let mut inputs = self
×
260
                .db
×
261
                .fetch_inputs_in_block(current_header_hash)
×
262
                .await?
×
263
                .into_iter()
×
264
                .map(|input| input.output_hash())
×
265
                .collect::<Vec<FixedHash>>();
×
266
            for output_chunk in outputs.chunks(2000) {
×
267
                let inputs_to_send = if inputs.is_empty() {
×
268
                    Vec::new()
×
269
                } else {
270
                    let num_to_drain = inputs.len().min(2000);
×
271
                    inputs.drain(..num_to_drain).map(|h| h.to_vec()).collect()
×
272
                };
273

274
                let output_block_response = BlockUtxoInfo {
×
275
                    outputs: output_chunk
×
276
                        .iter()
×
277
                        .map(|output| MinimalUtxoSyncInfo {
×
278
                            output_hash: output.hash().to_vec(),
×
279
                            commitment: output.commitment().to_vec(),
×
280
                            encrypted_data: output.encrypted_data().as_bytes().to_vec(),
×
281
                            sender_offset_public_key: output.sender_offset_public_key.to_vec(),
×
282
                        })
×
283
                        .collect(),
×
284
                    inputs: inputs_to_send,
×
285
                    height: current_header.height,
×
286
                    header_hash: current_header_hash.to_vec(),
×
287
                    mined_timestamp: current_header.timestamp.as_u64(),
×
288
                };
289
                utxos.push(output_block_response);
×
290
            }
291
            // We might still have inputs left to send if they are more than the outputs
292
            for input_chunk in inputs.chunks(2000) {
×
293
                let output_block_response = BlockUtxoInfo {
×
294
                    outputs: Vec::new(),
×
295
                    inputs: input_chunk.iter().map(|h| h.to_vec()).collect::<Vec<_>>().to_vec(),
×
296
                    height: current_header.height,
×
297
                    header_hash: current_header_hash.to_vec(),
×
298
                    mined_timestamp: current_header.timestamp.as_u64(),
×
299
                };
300
                utxos.push(output_block_response);
×
301
            }
302

303
            fetched_utxos += 1;
×
304

305
            if current_header.height >= tip_header.header().height {
×
306
                next_header_to_request = vec![];
×
307
                has_next_page = (end_height.saturating_sub(current_header.height)) > 0;
×
308
                break;
×
309
            }
×
310
            if fetched_utxos >= request.limit {
×
311
                next_header_to_request = current_header_hash.to_vec();
×
312
                // This is a special edge case, our request has reached the page limit, but we are also not done with
313
                // the block. We also dont want to split up the block over two requests. So we need to ensure that we
314
                // remove the partial block we added so that it can be requested fully in the next request. We also dont
315
                // want to get in a loop where the block cannot fit into the page limit, so if the block is the same as
316
                // the first one, we just send it as is, partial. If not we remove it and let it be sent in the next
317
                // request.
318
                if utxos.first().ok_or(Error::General(anyhow::anyhow!("No utxos founds")))? // should never happen as we always add at least one block
×
319
                    .header_hash ==
320
                    current_header_hash.to_vec()
×
321
                {
322
                    // special edge case where the first block is also the last block we can send, so we just send it as
323
                    // is, partial
324
                    break;
×
325
                }
×
326
                while !utxos.is_empty() &&
×
327
                    utxos.last().ok_or(Error::General(anyhow::anyhow!("No utxos found")))? // should never happen as we always add at least one block
×
328
                    .header_hash ==
329
                        current_header_hash.to_vec()
×
330
                {
×
331
                    utxos.pop();
×
332
                }
×
333
                break;
×
334
            }
×
335

336
            current_header =
×
337
                self.db
×
338
                    .fetch_header(current_header.height + 1)
×
339
                    .await?
×
340
                    .ok_or_else(|| Error::HeaderNotFound {
×
341
                        height: current_header.height + 1,
×
342
                    })?;
×
343
            if current_header.height == end_height {
×
344
                next_header_to_request = current_header.hash().to_vec();
×
345
                has_next_page = (end_height.saturating_sub(current_header.height)) > 0;
×
346
                break; // Stop if we reach the end height
×
347
            }
×
348
        }
349

350
        Ok(SyncUtxosByBlockResponse {
×
351
            blocks: utxos,
×
352
            has_next_page,
×
353
            next_header_to_scan: next_header_to_request,
×
354
        })
×
UNCOV
355
    }
×
356
}
357

358
#[async_trait::async_trait]
359
impl<B: BlockchainBackend + 'static> BaseNodeWalletQueryService for Service<B> {
360
    type Error = Error;
361

362
    async fn get_tip_info(&self) -> Result<TipInfoResponse, Self::Error> {
×
363
        let state_machine = self.state_machine();
×
364
        let status_watch = state_machine.get_status_info_watch();
×
365
        let is_synced = match status_watch.borrow().state_info {
×
366
            StateInfo::Listening(li) => li.is_synced(),
×
367
            _ => false,
×
368
        };
369

370
        let metadata = self.db.get_chain_metadata().await?;
×
371

372
        Ok(TipInfoResponse {
×
373
            metadata: Some(metadata),
×
374
            is_synced,
×
375
        })
×
376
    }
×
377

378
    async fn get_header_by_height(&self, height: u64) -> Result<models::BlockHeader, Self::Error> {
×
379
        let result = self
×
380
            .db
×
381
            .fetch_header(height)
×
382
            .await?
×
383
            .ok_or(Error::HeaderNotFound { height })?
×
384
            .into();
×
385
        Ok(result)
×
386
    }
×
387

388
    async fn get_height_at_time(&self, epoch_time: u64) -> Result<u64, Self::Error> {
×
389
        trace!(target: LOG_TARGET, "requested_epoch_time: {}", epoch_time);
×
390
        let tip_header = self.db.fetch_tip_header().await?;
×
391

392
        let mut left_height = 0u64;
×
393
        let mut right_height = tip_header.height();
×
394

395
        while left_height <= right_height {
×
396
            let mut mid_height = (left_height + right_height) / 2;
×
397

398
            if mid_height == 0 {
×
399
                return Ok(0u64);
×
400
            }
×
401
            // If the two bounds are adjacent then perform the test between the right and left sides
402
            if left_height == mid_height {
×
403
                mid_height = right_height;
×
404
            }
×
405

406
            let mid_header = self
×
407
                .db
×
408
                .fetch_header(mid_height)
×
409
                .await?
×
410
                .ok_or_else(|| Error::HeaderNotFound { height: mid_height })?;
×
411
            let before_mid_header = self
×
412
                .db
×
413
                .fetch_header(mid_height - 1)
×
414
                .await?
×
415
                .ok_or_else(|| Error::HeaderNotFound { height: mid_height - 1 })?;
×
416
            trace!(
×
417
                target: LOG_TARGET,
×
418
                "requested_epoch_time: {}, left: {}, mid: {}/{} ({}/{}), right: {}",
×
419
                epoch_time,
420
                left_height,
421
                mid_height,
422
                mid_height-1,
×
423
                mid_header.timestamp.as_u64(),
×
424
                before_mid_header.timestamp.as_u64(),
×
425
                right_height
426
            );
427
            if epoch_time < mid_header.timestamp.as_u64() && epoch_time >= before_mid_header.timestamp.as_u64() {
×
428
                trace!(
×
429
                    target: LOG_TARGET,
×
430
                    "requested_epoch_time: {}, selected height: {}",
×
431
                    epoch_time, before_mid_header.height
432
                );
433
                return Ok(before_mid_header.height);
×
434
            } else if mid_height == right_height {
×
435
                trace!(
×
436
                    target: LOG_TARGET,
×
437
                    "requested_epoch_time: {epoch_time}, selected height: {right_height}"
×
438
                );
439
                return Ok(right_height);
×
440
            } else if epoch_time <= mid_header.timestamp.as_u64() {
×
441
                right_height = mid_height;
×
442
            } else {
×
443
                left_height = mid_height;
×
444
            }
×
445
        }
446

447
        Ok(0u64)
×
448
    }
×
449

450
    async fn transaction_query(
451
        &self,
452
        signature: crate::base_node::rpc::models::Signature,
453
    ) -> Result<TxQueryResponse, Self::Error> {
×
454
        let signature = signature.try_into().map_err(Error::SignatureConversion)?;
×
455

456
        let response = self.fetch_kernel(signature).await?;
×
457

458
        Ok(response)
×
459
    }
×
460

461
    async fn sync_utxos_by_block(
462
        &self,
463
        request: SyncUtxosByBlockRequest,
464
    ) -> Result<SyncUtxosByBlockResponse, Self::Error> {
×
465
        self.fetch_utxos(request).await
×
466
    }
×
467

468
    async fn get_utxos_by_block(
469
        &self,
470
        request: GetUtxosByBlockRequest,
471
    ) -> Result<GetUtxosByBlockResponse, Self::Error> {
×
472
        self.fetch_utxos_by_block(request).await
×
473
    }
×
474

475
    async fn get_utxos_mined_info(
476
        &self,
477
        request: models::GetUtxosMinedInfoRequest,
478
    ) -> Result<models::GetUtxosMinedInfoResponse, Self::Error> {
×
479
        request.validate()?;
×
480

481
        let mut utxos = vec![];
×
482

483
        let tip_header = self.db().fetch_tip_header().await?;
×
484
        for hash in request.hashes {
×
485
            let hash = hash.try_into()?;
×
486
            let output = self.db().fetch_output(hash).await?;
×
487
            if let Some(output) = output {
×
488
                utxos.push(models::MinedUtxoInfo {
×
489
                    utxo_hash: hash.to_vec(),
×
490
                    mined_in_hash: output.header_hash.to_vec(),
×
491
                    mined_in_height: output.mined_height,
×
492
                    mined_in_timestamp: output.mined_timestamp,
×
493
                });
×
494
            }
×
495
        }
496

497
        Ok(models::GetUtxosMinedInfoResponse {
×
498
            utxos,
×
499
            best_block_hash: tip_header.hash().to_vec(),
×
500
            best_block_height: tip_header.height(),
×
501
        })
×
502
    }
×
503

504
    async fn get_utxos_deleted_info(
505
        &self,
506
        request: models::GetUtxosDeletedInfoRequest,
507
    ) -> Result<models::GetUtxosDeletedInfoResponse, Self::Error> {
×
508
        request.validate()?;
×
509

510
        let mut utxos = vec![];
×
511

512
        let must_include_header = request.must_include_header.clone().try_into()?;
×
513
        if self
×
514
            .db()
×
515
            .fetch_header_by_block_hash(must_include_header)
×
516
            .await?
×
517
            .is_none()
×
518
        {
519
            return Err(Error::HeaderHashNotFound);
×
520
        }
×
521

522
        let tip_header = self.db().fetch_tip_header().await?;
×
523
        for hash in request.hashes {
×
524
            let hash = hash.try_into()?;
×
525
            let output = self.db().fetch_output(hash).await?;
×
526

527
            if let Some(output) = output {
×
528
                // is it still unspent?
529
                let input = self.db().fetch_input(hash).await?;
×
530
                if let Some(i) = input {
×
531
                    utxos.push(models::DeletedUtxoInfo {
×
532
                        utxo_hash: hash.to_vec(),
×
533
                        found_in_header: Some((output.mined_height, output.header_hash.to_vec())),
×
534
                        spent_in_header: Some((i.spent_height, i.header_hash.to_vec())),
×
535
                    });
×
536
                } else {
×
537
                    utxos.push(models::DeletedUtxoInfo {
×
538
                        utxo_hash: hash.to_vec(),
×
539
                        found_in_header: Some((output.mined_height, output.header_hash.to_vec())),
×
540
                        spent_in_header: None,
×
541
                    });
×
542
                }
×
543
            } else {
×
544
                utxos.push(models::DeletedUtxoInfo {
×
545
                    utxo_hash: hash.to_vec(),
×
546
                    found_in_header: None,
×
547
                    spent_in_header: None,
×
548
                });
×
549
            }
×
550
        }
551

552
        Ok(models::GetUtxosDeletedInfoResponse {
×
553
            utxos,
×
554
            best_block_hash: tip_header.hash().to_vec(),
×
555
            best_block_height: tip_header.height(),
×
556
        })
×
557
    }
×
558

559
    async fn generate_kernel_merkle_proof(
560
        &self,
561
        excess_sig: types::CompressedSignature,
562
    ) -> Result<GenerateKernelMerkleProofResponse, Self::Error> {
×
563
        let proof = self.db().generate_kernel_merkle_proof(excess_sig).await?;
×
564
        Ok(GenerateKernelMerkleProofResponse {
565
            encoded_merkle_proof: bincode::serialize(&proof.merkle_proof).map_err(Error::general)?,
×
566
            block_hash: proof.block_hash,
×
567
            leaf_index: proof.leaf_index.value() as u64,
×
568
        })
569
    }
×
570

571
    async fn get_utxo(&self, request: models::GetUtxoRequest) -> Result<models::GetUtxoResponse, Self::Error> {
×
572
        let hash: FixedHash = request.output_hash.try_into().map_err(Error::general)?;
×
573
        let outputs = self.db().fetch_outputs_with_spend_status_at_tip(vec![hash]).await?;
×
574
        let output = match outputs.first() {
×
575
            Some(Some((output, _spent))) => Some(output.clone()),
×
576
            _ => return Err(Error::OutputNotFound),
×
577
        };
578
        Ok(models::GetUtxoResponse { output })
×
579
    }
×
580
}
581

582
#[cfg(test)]
583
mod tests {
584
    use tari_common::configuration::Network;
585
    use tari_shutdown::Shutdown;
586

587
    use super::*;
588
    use crate::test_helpers::blockchain::create_new_blockchain_with_network;
UNCOV
589
    fn make_state_machine_handle() -> StateMachineHandle {
×
590
        use tokio::sync::{broadcast, watch};
UNCOV
591
        let (state_tx, _state_rx) = broadcast::channel(10);
×
UNCOV
592
        let (_status_tx, status_rx) =
×
UNCOV
593
            watch::channel(crate::base_node::state_machine_service::states::StatusInfo::new());
×
UNCOV
594
        let shutdown = Shutdown::new();
×
UNCOV
595
        StateMachineHandle::new(state_tx, status_rx, shutdown.to_signal())
×
UNCOV
596
    }
×
597

UNCOV
598
    fn make_mempool_handle() -> MempoolHandle {
×
599
        use crate::mempool::test_utils::mock::create_mempool_service_mock;
UNCOV
600
        let (handle, _state) = create_mempool_service_mock();
×
UNCOV
601
        handle
×
UNCOV
602
    }
×
603

604
    async fn make_service() -> Service<crate::test_helpers::blockchain::TempDatabase> {
2✔
605
        let db = create_new_blockchain_with_network(Network::default());
2✔
606
        let adb = AsyncBlockchainDb::from(db);
2✔
607
        let state_machine = make_state_machine_handle();
2✔
608
        let mempool = make_mempool_handle();
2✔
609
        Service::new(adb, state_machine, mempool)
2✔
610
    }
2✔
611

612
    #[tokio::test]
613
    async fn fetch_utxos_start_header_not_found() {
1✔
614
        let service = make_service().await;
1✔
UNCOV
615
        let req = SyncUtxosByBlockRequest {
×
UNCOV
616
            start_header_hash: vec![0xAB; 32],
×
UNCOV
617
            limit: 4,
×
UNCOV
618
            page: 0,
×
UNCOV
619
            exclude_spent: false,
×
UNCOV
620
        };
×
UNCOV
621
        let err = service.fetch_utxos(req).await.unwrap_err();
×
622
        match err {
1✔
623
            Error::StartHeaderHashNotFound => {},
1✔
624
            other => panic!("unexpected error: {other:?}"),
1✔
625
        }
1✔
626
    }
1✔
627

628
    #[tokio::test]
629
    async fn fetch_utxos_header_height_mismatch() {
1✔
630
        let service = make_service().await;
1✔
UNCOV
631
        let genesis = service.db().fetch_header(0).await.unwrap().unwrap();
×
632
        // page * limit moves start height beyond tip (0)
UNCOV
633
        let req = SyncUtxosByBlockRequest {
×
UNCOV
634
            start_header_hash: genesis.hash().to_vec(),
×
UNCOV
635
            limit: 1,
×
UNCOV
636
            page: 1,
×
UNCOV
637
            exclude_spent: false,
×
UNCOV
638
        };
×
UNCOV
639
        let err = service.fetch_utxos(req).await.unwrap_err();
×
640
        match err {
1✔
641
            Error::HeaderHeightMismatch { .. } => {},
1✔
642
            other => panic!("unexpected error: {other:?}"),
1✔
643
        }
1✔
644
    }
1✔
645
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc