• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 20166106361

12 Dec 2025 12:01PM UTC coverage: 60.642% (-0.09%) from 60.729%
20166106361

push

github

SWvheerden
chore: new release v5.2.0-rc.0

70660 of 116519 relevant lines covered (60.64%)

225057.41 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

55.78
/base_layer/core/src/base_node/rpc/query_service.rs
1
// Copyright 2025 The Tari Project
2
// SPDX-License-Identifier: BSD-3-Clause
3

4
use std::cmp;
5

6
use log::trace;
7
use serde_valid::{validation, Validate};
8
use tari_common_types::{
9
    types,
10
    types::{FixedHash, FixedHashSizeError},
11
};
12
use tari_transaction_components::{
13
    rpc::{
14
        models,
15
        models::{
16
            BlockUtxoInfo,
17
            GenerateKernelMerkleProofResponse,
18
            GetUtxosByBlockRequest,
19
            GetUtxosByBlockResponse,
20
            MinimalUtxoSyncInfo,
21
            SyncUtxosByBlockRequest,
22
            SyncUtxosByBlockResponseV0,
23
            SyncUtxosByBlockResponseV1,
24
            TipInfoResponse,
25
            TxLocation,
26
            TxQueryResponse,
27
        },
28
    },
29
    transaction_components::TransactionOutput,
30
};
31
use tari_utilities::{hex::Hex, ByteArray, ByteArrayError};
32
use thiserror::Error;
33

34
use crate::{
35
    base_node::{rpc::BaseNodeWalletQueryService, state_machine_service::states::StateInfo, StateMachineHandle},
36
    chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend, ChainStorageError},
37
    mempool::{service::MempoolHandle, MempoolServiceError, TxStorageResponse},
38
};
39

40
const LOG_TARGET: &str = "c::bn::rpc::query_service";
41
const SYNC_UTXOS_SPEND_TIP_SAFETY_LIMIT: u64 = 1000;
42
const WALLET_MAX_BLOCKS_PER_REQUEST: u64 = 100;
43

44
#[derive(Debug, Error)]
45
pub enum Error {
46
    #[error("Failed to get chain metadata: {0}")]
47
    FailedToGetChainMetadata(#[from] ChainStorageError),
48
    #[error("Header not found at height: {height}")]
49
    HeaderNotFound { height: u64 },
50
    #[error("Signature conversion error: {0}")]
51
    SignatureConversion(ByteArrayError),
52
    #[error("Mempool service error: {0}")]
53
    MempoolService(#[from] MempoolServiceError),
54
    #[error("Serde validation error: {0}")]
55
    SerdeValidation(#[from] validation::Errors),
56
    #[error("Hash conversion error: {0}")]
57
    HashConversion(#[from] FixedHashSizeError),
58
    #[error("Start header hash not found")]
59
    StartHeaderHashNotFound,
60
    #[error("End header hash not found")]
61
    EndHeaderHashNotFound,
62
    #[error("Header hash not found")]
63
    HeaderHashNotFound,
64
    #[error("Start header height {start_height} cannot be greater than the end header height {end_height}")]
65
    HeaderHeightMismatch { start_height: u64, end_height: u64 },
66
    #[error("Output not found")]
67
    OutputNotFound,
68
    #[error("A general error occurred: {0}")]
69
    General(anyhow::Error),
70
}
71

72
impl Error {
73
    fn general(err: impl Into<anyhow::Error>) -> Self {
×
74
        Error::General(err.into())
×
75
    }
×
76
}
77

78
pub struct Service<B> {
79
    db: AsyncBlockchainDb<B>,
80
    state_machine: StateMachineHandle,
81
    mempool: MempoolHandle,
82
}
83

84
impl<B: BlockchainBackend + 'static> Service<B> {
85
    pub fn new(db: AsyncBlockchainDb<B>, state_machine: StateMachineHandle, mempool: MempoolHandle) -> Self {
4✔
86
        Self {
4✔
87
            db,
4✔
88
            state_machine,
4✔
89
            mempool,
4✔
90
        }
4✔
91
    }
4✔
92

93
    fn state_machine(&self) -> StateMachineHandle {
×
94
        self.state_machine.clone()
×
95
    }
×
96

97
    fn db(&self) -> &AsyncBlockchainDb<B> {
13✔
98
        &self.db
13✔
99
    }
13✔
100

101
    fn mempool(&self) -> MempoolHandle {
×
102
        self.mempool.clone()
×
103
    }
×
104

105
    async fn fetch_kernel(&self, signature: types::CompressedSignature) -> Result<TxQueryResponse, Error> {
×
106
        let db = self.db();
×
107

108
        match db.fetch_kernel_by_excess_sig(signature.clone()).await? {
×
109
            None => (),
×
110
            Some((_, block_hash)) => match db.fetch_header_by_block_hash(block_hash).await? {
×
111
                None => (),
×
112
                Some(header) => {
×
113
                    let response = TxQueryResponse {
×
114
                        location: TxLocation::Mined,
×
115
                        mined_header_hash: Some(block_hash.to_vec()),
×
116
                        mined_height: Some(header.height),
×
117
                        mined_timestamp: Some(header.timestamp.as_u64()),
×
118
                    };
×
119
                    return Ok(response);
×
120
                },
121
            },
122
        };
123

124
        // If not in a block then check the mempool
125
        let mut mempool = self.mempool();
×
126
        let mempool_response = match mempool.get_tx_state_by_excess_sig(signature.clone()).await? {
×
127
            TxStorageResponse::UnconfirmedPool => TxQueryResponse {
×
128
                location: TxLocation::InMempool,
×
129
                mined_header_hash: None,
×
130
                mined_height: None,
×
131
                mined_timestamp: None,
×
132
            },
×
133
            TxStorageResponse::ReorgPool |
134
            TxStorageResponse::NotStoredOrphan |
135
            TxStorageResponse::NotStoredTimeLocked |
136
            TxStorageResponse::NotStoredAlreadySpent |
137
            TxStorageResponse::NotStoredConsensus |
138
            TxStorageResponse::NotStored |
139
            TxStorageResponse::NotStoredFeeTooLow |
140
            TxStorageResponse::NotStoredAlreadyMined => TxQueryResponse {
×
141
                location: TxLocation::NotStored,
×
142
                mined_timestamp: None,
×
143
                mined_height: None,
×
144
                mined_header_hash: None,
×
145
            },
×
146
        };
147

148
        Ok(mempool_response)
×
149
    }
×
150

151
    async fn fetch_utxos_by_block(&self, request: GetUtxosByBlockRequest) -> Result<GetUtxosByBlockResponse, Error> {
×
152
        request.validate()?;
×
153

154
        let hash = request.header_hash.clone().try_into()?;
×
155

156
        let header = self
×
157
            .db()
×
158
            .fetch_header_by_block_hash(hash)
×
159
            .await?
×
160
            .ok_or_else(|| Error::HeaderHashNotFound)?;
×
161

162
        // fetch utxos
163
        let outputs_with_statuses = self.db.fetch_outputs_in_block_with_spend_state(hash, None).await?;
×
164

165
        let outputs = outputs_with_statuses
×
166
            .into_iter()
×
167
            .map(|(output, _spent)| output)
×
168
            .collect::<Vec<TransactionOutput>>();
×
169

170
        // if its empty, we need to send an empty vec of outputs.
171
        let utxo_block_response = GetUtxosByBlockResponse {
×
172
            outputs,
×
173
            height: header.height,
×
174
            header_hash: hash.to_vec(),
×
175
            mined_timestamp: header.timestamp.as_u64(),
×
176
        };
×
177

178
        Ok(utxo_block_response)
×
179
    }
×
180

181
    #[allow(clippy::too_many_lines)]
182
    async fn fetch_utxos(&self, request: SyncUtxosByBlockRequest) -> Result<SyncUtxosByBlockResponseV0, Error> {
10✔
183
        // validate and fetch inputs
184
        request.validate()?;
10✔
185

186
        let hash = request.start_header_hash.clone().try_into()?;
10✔
187
        let start_header = self
10✔
188
            .db()
10✔
189
            .fetch_header_by_block_hash(hash)
10✔
190
            .await?
10✔
191
            .ok_or_else(|| Error::StartHeaderHashNotFound)?;
10✔
192

193
        let tip_header = self.db.fetch_tip_header().await?;
9✔
194
        // we only allow wallets to ask for a max of 100 blocks at a time and we want to cache the queries to ensure
195
        // they are in batch of 100 and we want to ensure they request goes to the nearest 100 block height so
196
        // we can cache all wallet's queries
197
        let increase = ((start_header.height + WALLET_MAX_BLOCKS_PER_REQUEST) / WALLET_MAX_BLOCKS_PER_REQUEST) *
9✔
198
            WALLET_MAX_BLOCKS_PER_REQUEST;
9✔
199
        let end_height = cmp::min(tip_header.header().height, increase);
9✔
200
        // pagination
201
        let start_header_height = start_header.height + (request.page * request.limit);
9✔
202
        if start_header_height > tip_header.header().height {
9✔
203
            return Err(Error::HeaderHeightMismatch {
1✔
204
                start_height: start_header.height,
1✔
205
                end_height: tip_header.header().height,
1✔
206
            });
1✔
207
        }
8✔
208
        let start_header = self
8✔
209
            .db
8✔
210
            .fetch_header(start_header_height)
8✔
211
            .await?
8✔
212
            .ok_or_else(|| Error::HeaderNotFound {
8✔
213
                height: start_header_height,
×
214
            })?;
×
215
        // fetch utxos
216
        let mut utxos = vec![];
8✔
217
        let next_page_start_height = start_header.height.saturating_add(request.limit);
8✔
218
        let mut current_header = start_header;
8✔
219
        let mut fetched_chunks = 0;
8✔
220
        let spending_end_header_hash = self
8✔
221
            .db
8✔
222
            .fetch_header(
8✔
223
                tip_header
8✔
224
                    .header()
8✔
225
                    .height
8✔
226
                    .saturating_sub(SYNC_UTXOS_SPEND_TIP_SAFETY_LIMIT),
8✔
227
            )
8✔
228
            .await?
8✔
229
            .ok_or_else(|| Error::HeaderNotFound {
8✔
230
                height: tip_header
×
231
                    .header()
×
232
                    .height
×
233
                    .saturating_sub(SYNC_UTXOS_SPEND_TIP_SAFETY_LIMIT),
×
234
            })?
×
235
            .hash();
8✔
236
        let next_header_to_request;
237
        let mut has_next_page = true;
8✔
238
        loop {
239
            let current_header_hash = current_header.hash();
20✔
240
            trace!(
20✔
241
                target: LOG_TARGET,
×
242
                "current header = {} ({})",
×
243
                current_header.height,
244
                current_header_hash.to_hex()
×
245
            );
246
            let outputs = if request.exclude_spent {
20✔
247
                self.db
×
248
                    .fetch_outputs_in_block_with_spend_state(current_header_hash, Some(spending_end_header_hash))
×
249
                    .await?
×
250
                    .into_iter()
×
251
                    .filter(|(_, spent)| !spent)
×
252
                    .map(|(output, _spent)| output)
×
253
                    .collect::<Vec<TransactionOutput>>()
×
254
            } else {
255
                self.db
20✔
256
                    .fetch_outputs_in_block_with_spend_state(current_header_hash, None)
20✔
257
                    .await?
20✔
258
                    .into_iter()
20✔
259
                    .map(|(output, _spent)| output)
20✔
260
                    .collect::<Vec<TransactionOutput>>()
20✔
261
            };
262
            let mut inputs = self
20✔
263
                .db
20✔
264
                .fetch_inputs_in_block(current_header_hash)
20✔
265
                .await?
20✔
266
                .into_iter()
20✔
267
                .map(|input| input.output_hash())
20✔
268
                .collect::<Vec<FixedHash>>();
20✔
269
            if outputs.is_empty() && inputs.is_empty() {
20✔
270
                // No outputs or inputs in this block, put empty placeholder here so wallet knows this height has been
3✔
271
                // scanned This can happen if all the outputs are spent and exclude_spent is true
3✔
272
                let block_response = BlockUtxoInfo {
3✔
273
                    outputs: Vec::new(),
3✔
274
                    inputs: Vec::new(),
3✔
275
                    height: current_header.height,
3✔
276
                    header_hash: current_header_hash.to_vec(),
3✔
277
                    mined_timestamp: current_header.timestamp.as_u64(),
3✔
278
                };
3✔
279
                utxos.push(block_response);
3✔
280
            }
17✔
281
            for output_chunk in outputs.chunks(2000) {
20✔
282
                let inputs_to_send = if inputs.is_empty() {
17✔
283
                    Vec::new()
17✔
284
                } else {
285
                    let num_to_drain = inputs.len().min(2000);
×
286
                    inputs.drain(..num_to_drain).map(|h| h.to_vec()).collect()
×
287
                };
288

289
                let output_block_response = BlockUtxoInfo {
17✔
290
                    outputs: output_chunk
17✔
291
                        .iter()
17✔
292
                        .map(|output| MinimalUtxoSyncInfo {
17✔
293
                            output_hash: output.hash().to_vec(),
17✔
294
                            commitment: output.commitment().to_vec(),
17✔
295
                            encrypted_data: output.encrypted_data().as_bytes().to_vec(),
17✔
296
                            sender_offset_public_key: output.sender_offset_public_key.to_vec(),
17✔
297
                        })
17✔
298
                        .collect(),
17✔
299
                    inputs: inputs_to_send,
17✔
300
                    height: current_header.height,
17✔
301
                    header_hash: current_header_hash.to_vec(),
17✔
302
                    mined_timestamp: current_header.timestamp.as_u64(),
17✔
303
                };
304
                utxos.push(output_block_response);
17✔
305
                fetched_chunks += 1;
17✔
306
            }
307
            // We might still have inputs left to send if they are more than the outputs
308
            for input_chunk in inputs.chunks(2000) {
20✔
309
                let output_block_response = BlockUtxoInfo {
×
310
                    outputs: Vec::new(),
×
311
                    inputs: input_chunk.iter().map(|h| h.to_vec()).collect::<Vec<_>>().to_vec(),
×
312
                    height: current_header.height,
×
313
                    header_hash: current_header_hash.to_vec(),
×
314
                    mined_timestamp: current_header.timestamp.as_u64(),
×
315
                };
316
                utxos.push(output_block_response);
×
317
                fetched_chunks += 1;
×
318
            }
319

320
            if current_header.height >= tip_header.header().height {
20✔
321
                next_header_to_request = vec![];
2✔
322
                has_next_page = (end_height.saturating_sub(current_header.height)) > 0;
2✔
323
                break;
2✔
324
            }
18✔
325
            if fetched_chunks > request.limit {
18✔
326
                next_header_to_request = current_header_hash.to_vec();
×
327
                // This is a special edge case, our request has reached the page limit, but we are also not done with
328
                // the block. We also dont want to split up the block over two requests. So we need to ensure that we
329
                // remove the partial block we added so that it can be requested fully in the next request. We also dont
330
                // want to get in a loop where the block cannot fit into the page limit, so if the block is the same as
331
                // the first one, we just send it as is, partial. If not we remove it and let it be sent in the next
332
                // request.
333
                if utxos.first().ok_or(Error::General(anyhow::anyhow!("No utxos founds")))? // should never happen as we always add at least one block
×
334
                    .header_hash ==
335
                    current_header_hash.to_vec()
×
336
                {
337
                    // special edge case where the first block is also the last block we can send, so we just send it as
338
                    // is, partial
339
                    break;
×
340
                }
×
341
                while !utxos.is_empty() &&
×
342
                    utxos.last().ok_or(Error::General(anyhow::anyhow!("No utxos found")))? // should never happen as we always add at least one block
×
343
                    .header_hash ==
344
                        current_header_hash.to_vec()
×
345
                {
×
346
                    utxos.pop();
×
347
                }
×
348
                break;
×
349
            }
18✔
350
            if current_header.height + 1 > end_height {
18✔
351
                next_header_to_request = current_header.hash().to_vec();
×
352
                has_next_page = (end_height.saturating_sub(current_header.height)) > 0;
×
353
                break; // Stop if we reach the end height
×
354
            }
18✔
355
            current_header =
18✔
356
                self.db
18✔
357
                    .fetch_header(current_header.height + 1)
18✔
358
                    .await?
18✔
359
                    .ok_or_else(|| Error::HeaderNotFound {
18✔
360
                        height: current_header.height + 1,
×
361
                    })?;
×
362

363
            if current_header.height == next_page_start_height {
18✔
364
                // we are on the limit, stop here
365
                next_header_to_request = current_header.hash().to_vec();
6✔
366
                has_next_page = (end_height.saturating_sub(current_header.height)) > 0;
6✔
367
                break;
6✔
368
            }
12✔
369
        }
370
        Ok(SyncUtxosByBlockResponseV0 {
8✔
371
            blocks: utxos,
8✔
372
            has_next_page,
8✔
373
            next_header_to_scan: next_header_to_request,
8✔
374
        })
8✔
375
    }
10✔
376
}
377

378
#[async_trait::async_trait]
379
impl<B: BlockchainBackend + 'static> BaseNodeWalletQueryService for Service<B> {
380
    type Error = Error;
381

382
    async fn get_tip_info(&self) -> Result<TipInfoResponse, Self::Error> {
×
383
        let state_machine = self.state_machine();
×
384
        let status_watch = state_machine.get_status_info_watch();
×
385
        let is_synced = match status_watch.borrow().state_info {
×
386
            StateInfo::Listening(li) => li.is_synced(),
×
387
            _ => false,
×
388
        };
389

390
        let metadata = self.db.get_chain_metadata().await?;
×
391

392
        Ok(TipInfoResponse {
×
393
            metadata: Some(metadata),
×
394
            is_synced,
×
395
        })
×
396
    }
×
397

398
    async fn get_header_by_height(&self, height: u64) -> Result<models::BlockHeader, Self::Error> {
×
399
        let result = self
×
400
            .db
×
401
            .fetch_header(height)
×
402
            .await?
×
403
            .ok_or(Error::HeaderNotFound { height })?
×
404
            .into();
×
405
        Ok(result)
×
406
    }
×
407

408
    async fn get_height_at_time(&self, epoch_time: u64) -> Result<u64, Self::Error> {
×
409
        trace!(target: LOG_TARGET, "requested_epoch_time: {}", epoch_time);
×
410
        let tip_header = self.db.fetch_tip_header().await?;
×
411

412
        let mut left_height = 0u64;
×
413
        let mut right_height = tip_header.height();
×
414

415
        while left_height <= right_height {
×
416
            let mut mid_height = (left_height + right_height) / 2;
×
417

418
            if mid_height == 0 {
×
419
                return Ok(0u64);
×
420
            }
×
421
            // If the two bounds are adjacent then perform the test between the right and left sides
422
            if left_height == mid_height {
×
423
                mid_height = right_height;
×
424
            }
×
425

426
            let mid_header = self
×
427
                .db
×
428
                .fetch_header(mid_height)
×
429
                .await?
×
430
                .ok_or_else(|| Error::HeaderNotFound { height: mid_height })?;
×
431
            let before_mid_header = self
×
432
                .db
×
433
                .fetch_header(mid_height - 1)
×
434
                .await?
×
435
                .ok_or_else(|| Error::HeaderNotFound { height: mid_height - 1 })?;
×
436
            trace!(
×
437
                target: LOG_TARGET,
×
438
                "requested_epoch_time: {}, left: {}, mid: {}/{} ({}/{}), right: {}",
×
439
                epoch_time,
440
                left_height,
441
                mid_height,
442
                mid_height-1,
×
443
                mid_header.timestamp.as_u64(),
×
444
                before_mid_header.timestamp.as_u64(),
×
445
                right_height
446
            );
447
            if epoch_time < mid_header.timestamp.as_u64() && epoch_time >= before_mid_header.timestamp.as_u64() {
×
448
                trace!(
×
449
                    target: LOG_TARGET,
×
450
                    "requested_epoch_time: {}, selected height: {}",
×
451
                    epoch_time, before_mid_header.height
452
                );
453
                return Ok(before_mid_header.height);
×
454
            } else if mid_height == right_height {
×
455
                trace!(
×
456
                    target: LOG_TARGET,
×
457
                    "requested_epoch_time: {epoch_time}, selected height: {right_height}"
×
458
                );
459
                return Ok(right_height);
×
460
            } else if epoch_time <= mid_header.timestamp.as_u64() {
×
461
                right_height = mid_height;
×
462
            } else {
×
463
                left_height = mid_height;
×
464
            }
×
465
        }
466

467
        Ok(0u64)
×
468
    }
×
469

470
    async fn transaction_query(
471
        &self,
472
        signature: crate::base_node::rpc::models::Signature,
473
    ) -> Result<TxQueryResponse, Self::Error> {
×
474
        let signature = signature.try_into().map_err(Error::SignatureConversion)?;
×
475

476
        let response = self.fetch_kernel(signature).await?;
×
477

478
        Ok(response)
×
479
    }
×
480

481
    async fn sync_utxos_by_block_v0(
482
        &self,
483
        request: SyncUtxosByBlockRequest,
484
    ) -> Result<SyncUtxosByBlockResponseV0, Self::Error> {
×
485
        self.fetch_utxos(request).await
×
486
    }
×
487

488
    async fn sync_utxos_by_block_v1(
489
        &self,
490
        request: SyncUtxosByBlockRequest,
491
    ) -> Result<SyncUtxosByBlockResponseV1, Self::Error> {
×
492
        let v1 = self.fetch_utxos(request).await?;
×
493
        Ok(v1.into())
×
494
    }
×
495

496
    async fn get_utxos_by_block(
497
        &self,
498
        request: GetUtxosByBlockRequest,
499
    ) -> Result<GetUtxosByBlockResponse, Self::Error> {
×
500
        self.fetch_utxos_by_block(request).await
×
501
    }
×
502

503
    async fn get_utxos_mined_info(
504
        &self,
505
        request: models::GetUtxosMinedInfoRequest,
506
    ) -> Result<models::GetUtxosMinedInfoResponse, Self::Error> {
×
507
        request.validate()?;
×
508

509
        let mut utxos = vec![];
×
510

511
        let tip_header = self.db().fetch_tip_header().await?;
×
512
        for hash in request.hashes {
×
513
            let hash = hash.try_into()?;
×
514
            let output = self.db().fetch_output(hash).await?;
×
515
            if let Some(output) = output {
×
516
                utxos.push(models::MinedUtxoInfo {
×
517
                    utxo_hash: hash.to_vec(),
×
518
                    mined_in_hash: output.header_hash.to_vec(),
×
519
                    mined_in_height: output.mined_height,
×
520
                    mined_in_timestamp: output.mined_timestamp,
×
521
                });
×
522
            }
×
523
        }
524

525
        Ok(models::GetUtxosMinedInfoResponse {
×
526
            utxos,
×
527
            best_block_hash: tip_header.hash().to_vec(),
×
528
            best_block_height: tip_header.height(),
×
529
        })
×
530
    }
×
531

532
    async fn get_utxos_deleted_info(
533
        &self,
534
        request: models::GetUtxosDeletedInfoRequest,
535
    ) -> Result<models::GetUtxosDeletedInfoResponse, Self::Error> {
×
536
        request.validate()?;
×
537

538
        let mut utxos = vec![];
×
539

540
        let must_include_header = request.must_include_header.clone().try_into()?;
×
541
        if self
×
542
            .db()
×
543
            .fetch_header_by_block_hash(must_include_header)
×
544
            .await?
×
545
            .is_none()
×
546
        {
547
            return Err(Error::HeaderHashNotFound);
×
548
        }
×
549

550
        let tip_header = self.db().fetch_tip_header().await?;
×
551
        for hash in request.hashes {
×
552
            let hash = hash.try_into()?;
×
553
            let output = self.db().fetch_output(hash).await?;
×
554

555
            if let Some(output) = output {
×
556
                // is it still unspent?
557
                let input = self.db().fetch_input(hash).await?;
×
558
                if let Some(i) = input {
×
559
                    utxos.push(models::DeletedUtxoInfo {
×
560
                        utxo_hash: hash.to_vec(),
×
561
                        found_in_header: Some((output.mined_height, output.header_hash.to_vec())),
×
562
                        spent_in_header: Some((i.spent_height, i.header_hash.to_vec())),
×
563
                    });
×
564
                } else {
×
565
                    utxos.push(models::DeletedUtxoInfo {
×
566
                        utxo_hash: hash.to_vec(),
×
567
                        found_in_header: Some((output.mined_height, output.header_hash.to_vec())),
×
568
                        spent_in_header: None,
×
569
                    });
×
570
                }
×
571
            } else {
×
572
                utxos.push(models::DeletedUtxoInfo {
×
573
                    utxo_hash: hash.to_vec(),
×
574
                    found_in_header: None,
×
575
                    spent_in_header: None,
×
576
                });
×
577
            }
×
578
        }
579

580
        Ok(models::GetUtxosDeletedInfoResponse {
×
581
            utxos,
×
582
            best_block_hash: tip_header.hash().to_vec(),
×
583
            best_block_height: tip_header.height(),
×
584
        })
×
585
    }
×
586

587
    async fn generate_kernel_merkle_proof(
588
        &self,
589
        excess_sig: types::CompressedSignature,
590
    ) -> Result<GenerateKernelMerkleProofResponse, Self::Error> {
×
591
        let proof = self.db().generate_kernel_merkle_proof(excess_sig).await?;
×
592
        Ok(GenerateKernelMerkleProofResponse {
593
            encoded_merkle_proof: bincode::serialize(&proof.merkle_proof).map_err(Error::general)?,
×
594
            block_hash: proof.block_hash,
×
595
            leaf_index: proof.leaf_index.value() as u64,
×
596
        })
597
    }
×
598

599
    async fn get_utxo(&self, request: models::GetUtxoRequest) -> Result<Option<TransactionOutput>, Self::Error> {
×
600
        let hash: FixedHash = request.output_hash.try_into().map_err(Error::general)?;
×
601
        let outputs = self.db().fetch_outputs_with_spend_status_at_tip(vec![hash]).await?;
×
602
        let output = match outputs.first() {
×
603
            Some(Some((output, _spent))) => Some(output.clone()),
×
604
            _ => return Err(Error::OutputNotFound),
×
605
        };
606
        Ok(output)
×
607
    }
×
608
}
609

610
#[cfg(test)]
611
mod tests {
612
    #![allow(clippy::indexing_slicing)]
613
    use tari_common::configuration::Network;
614
    use tari_shutdown::Shutdown;
615

616
    use super::*;
617
    use crate::test_helpers::blockchain::create_new_blockchain_with_network;
618
    fn make_state_machine_handle() -> StateMachineHandle {
4✔
619
        use tokio::sync::{broadcast, watch};
620
        let (state_tx, _state_rx) = broadcast::channel(10);
4✔
621
        let (_status_tx, status_rx) =
4✔
622
            watch::channel(crate::base_node::state_machine_service::states::StatusInfo::new());
4✔
623
        let shutdown = Shutdown::new();
4✔
624
        StateMachineHandle::new(state_tx, status_rx, shutdown.to_signal())
4✔
625
    }
4✔
626

627
    fn make_mempool_handle() -> MempoolHandle {
4✔
628
        use crate::mempool::test_utils::mock::create_mempool_service_mock;
629
        let (handle, _state) = create_mempool_service_mock();
4✔
630
        handle
4✔
631
    }
4✔
632

633
    async fn make_service() -> Service<crate::test_helpers::blockchain::TempDatabase> {
2✔
634
        let db = create_new_blockchain_with_network(Network::LocalNet);
2✔
635
        let adb = AsyncBlockchainDb::from(db);
2✔
636
        let state_machine = make_state_machine_handle();
2✔
637
        let mempool = make_mempool_handle();
2✔
638
        Service::new(adb, state_machine, mempool)
2✔
639
    }
2✔
640

641
    #[tokio::test]
642
    async fn fetch_utxos_start_header_not_found() {
1✔
643
        let service = make_service().await;
1✔
644
        let req = SyncUtxosByBlockRequest {
1✔
645
            start_header_hash: vec![0xAB; 32],
1✔
646
            limit: 4,
1✔
647
            page: 0,
1✔
648
            exclude_spent: false,
1✔
649
            version: 0,
1✔
650
        };
1✔
651
        let err = service.fetch_utxos(req).await.unwrap_err();
1✔
652
        match err {
1✔
653
            Error::StartHeaderHashNotFound => {},
1✔
654
            other => panic!("unexpected error: {other:?}"),
1✔
655
        }
1✔
656
    }
1✔
657

658
    #[tokio::test]
659
    async fn fetch_utxos_header_height_mismatch() {
1✔
660
        let service = make_service().await;
1✔
661
        let genesis = service.db().fetch_header(0).await.unwrap().unwrap();
1✔
662
        // page * limit moves start height beyond tip (0)
663
        let req = SyncUtxosByBlockRequest {
1✔
664
            start_header_hash: genesis.hash().to_vec(),
1✔
665
            limit: 1,
1✔
666
            page: 1,
1✔
667
            exclude_spent: false,
1✔
668
            version: 0,
1✔
669
        };
1✔
670
        let err = service.fetch_utxos(req).await.unwrap_err();
1✔
671
        match err {
1✔
672
            Error::HeaderHeightMismatch { .. } => {},
1✔
673
            other => panic!("unexpected error: {other:?}"),
1✔
674
        }
1✔
675
    }
1✔
676

677
    #[tokio::test]
678
    async fn fetch_utxos_paginates_results() {
1✔
679
        use crate::{block_specs, test_helpers::blockchain::create_main_chain};
680

681
        // Build a small chain: GB -> A -> B -> C
682
        let db = create_new_blockchain_with_network(Network::LocalNet);
1✔
683
        let (_names, chain) = create_main_chain(&db, block_specs!(["A->GB"], ["B->A"], ["C->B"], ["D->C"]));
1✔
684

685
        // Construct the service over this DB
686
        let adb = AsyncBlockchainDb::from(db);
1✔
687
        let state_machine = make_state_machine_handle();
1✔
688
        let mempool = make_mempool_handle();
1✔
689
        let service = Service::new(adb, state_machine, mempool);
1✔
690

691
        // Use genesis as the start header hash
692
        let genesis = service.db().fetch_header(0).await.unwrap().unwrap();
1✔
693
        let g_hash = genesis.hash().to_vec();
1✔
694

695
        // Page 0, limit 1: should return only A (height 1) and next header should be B
696
        // genesis block in local net has 0 inputs and outputs
697
        let resp0 = service
1✔
698
            .fetch_utxos(SyncUtxosByBlockRequest {
1✔
699
                start_header_hash: g_hash.clone(),
1✔
700
                limit: 1,
1✔
701
                page: 0,
1✔
702
                exclude_spent: false,
1✔
703
                version: 0,
1✔
704
            })
1✔
705
            .await
1✔
706
            .expect("fetch_utxos page 0 should succeed");
1✔
707

708
        assert_eq!(resp0.blocks.len(), 1, "expected exactly one block in first page");
1✔
709
        assert_eq!(resp0.blocks[0].height, 0, "first page should start at genesis height");
1✔
710

711
        // Expect next_header_to_scan to be the hash of block A (height 1)
712
        let a_hash = chain.get("A").unwrap().hash().to_vec();
1✔
713
        assert_eq!(resp0.next_header_to_scan, a_hash, "next header should point to A");
1✔
714

715
        // Page 1, limit 1: should return only block A (height 1) and next header should be B
716
        let resp1 = service
1✔
717
            .fetch_utxos(SyncUtxosByBlockRequest {
1✔
718
                start_header_hash: g_hash.clone(),
1✔
719
                limit: 1,
1✔
720
                page: 1,
1✔
721
                exclude_spent: false,
1✔
722
                version: 0,
1✔
723
            })
1✔
724
            .await
1✔
725
            .expect("fetch_utxos page 1 should succeed");
1✔
726

727
        assert_eq!(resp1.blocks.len(), 1, "expected exactly one block in second page");
1✔
728
        assert_eq!(resp1.blocks[0].height, 1, "second page should start at height 1 (A)");
1✔
729

730
        // Expect next_header_to_scan to be the hash of block B (height 2)
731
        let b_hash = chain.get("B").unwrap().hash().to_vec();
1✔
732
        assert_eq!(resp1.next_header_to_scan, b_hash, "next header should point to B");
1✔
733

734
        // Page 2, limit 1: should return only block B (height 2) and next header should be C
735
        let resp2 = service
1✔
736
            .fetch_utxos(SyncUtxosByBlockRequest {
1✔
737
                start_header_hash: g_hash.clone(),
1✔
738
                limit: 1,
1✔
739
                page: 2,
1✔
740
                exclude_spent: false,
1✔
741
                version: 0,
1✔
742
            })
1✔
743
            .await
1✔
744
            .expect("fetch_utxos page 2 should succeed");
1✔
745

746
        assert_eq!(resp2.blocks.len(), 1, "expected exactly one block in third page");
1✔
747
        assert_eq!(resp2.blocks[0].height, 2, "third page should start at height 2 (B)");
1✔
748
        let c_hash = chain.get("C").unwrap().hash().to_vec();
1✔
749
        assert_eq!(resp2.next_header_to_scan, c_hash, "next header should point to C");
1✔
750

751
        // Page 3, limit 1: should return only block C (height 3) and next header should be empty (tip reached)
752
        let resp3 = service
1✔
753
            .fetch_utxos(SyncUtxosByBlockRequest {
1✔
754
                start_header_hash: g_hash.clone(),
1✔
755
                limit: 1,
1✔
756
                page: 3,
1✔
757
                exclude_spent: false,
1✔
758
                version: 0,
1✔
759
            })
1✔
760
            .await
1✔
761
            .expect("fetch_utxos page 3 should succeed");
1✔
762

763
        assert_eq!(resp3.blocks.len(), 1, "expected exactly one block in fourth page");
1✔
764
        assert_eq!(resp3.blocks[0].height, 3, "fourth page should start at height 3 (C)");
1✔
765
        let d_hash = chain.get("D").unwrap().hash().to_vec();
1✔
766
        assert_eq!(resp3.next_header_to_scan, d_hash, "next header should point to D");
1✔
767

768
        // Page 4, limit 1: should return only block C (height 3) and next header should be empty (tip reached)
769
        let resp4 = service
1✔
770
            .fetch_utxos(SyncUtxosByBlockRequest {
1✔
771
                start_header_hash: g_hash.clone(),
1✔
772
                limit: 1,
1✔
773
                page: 4,
1✔
774
                exclude_spent: false,
1✔
775
                version: 0,
1✔
776
            })
1✔
777
            .await
1✔
778
            .expect("fetch_utxos page 3 should succeed");
1✔
779

780
        assert_eq!(resp4.blocks.len(), 1, "expected exactly one block in fourth page");
1✔
781
        assert_eq!(resp4.blocks[0].height, 4, "fourth page should start at height 4 (D)");
1✔
782
        assert!(resp4.next_header_to_scan.is_empty(), "no next header at tip");
1✔
783
        let resp5 = service
1✔
784
            .fetch_utxos(SyncUtxosByBlockRequest {
1✔
785
                start_header_hash: g_hash,
1✔
786
                limit: 2,
1✔
787
                page: 0,
1✔
788
                exclude_spent: false,
1✔
789
                version: 0,
1✔
790
            })
1✔
791
            .await
1✔
792
            .expect("fetch_utxos should succeed");
1✔
793

794
        assert_eq!(resp5.blocks.len(), 2, "expected 2 blocks");
1✔
795
        assert_eq!(resp5.blocks[0].height, 0, "Should be block (GB)");
1✔
796
        assert_eq!(resp5.blocks[1].height, 1, "Should be block (A)");
1✔
797
        let b_hash = chain.get("B").unwrap().hash().to_vec();
1✔
798
        assert_eq!(resp5.next_header_to_scan, b_hash, "next header should point to B");
1✔
799
        assert!(resp5.has_next_page, "Should have more pages");
1✔
800
    }
1✔
801

802
    #[tokio::test]
803
    async fn large_fetch_utxo_paginates_results() {
1✔
804
        use crate::{block_specs, test_helpers::blockchain::create_main_chain};
805

806
        // Build a small chain: GB -> A -> B -> C
807
        let db = create_new_blockchain_with_network(Network::LocalNet);
1✔
808
        let (_names, chain) = create_main_chain(
1✔
809
            &db,
1✔
810
            block_specs!(
1✔
811
                ["1->GB"],
1✔
812
                ["2->1"],
1✔
813
                ["3->2"],
1✔
814
                ["4->3"],
1✔
815
                ["5->4"],
1✔
816
                ["6->5"],
1✔
817
                ["7->6"],
1✔
818
                ["8->7"],
1✔
819
                ["9->8"],
1✔
820
                ["10->9"],
1✔
821
                ["11->10"],
1✔
822
                ["12->11"]
1✔
823
            ),
1✔
824
        );
1✔
825

826
        // Construct the service over this DB
827
        let adb = AsyncBlockchainDb::from(db);
1✔
828
        let state_machine = make_state_machine_handle();
1✔
829
        let mempool = make_mempool_handle();
1✔
830
        let service = Service::new(adb, state_machine, mempool);
1✔
831

832
        // Use genesis as the start header hash
833
        let genesis = service.db().fetch_header(0).await.unwrap().unwrap();
1✔
834
        let g_hash = genesis.hash().to_vec();
1✔
835

836
        let resp = service
1✔
837
            .fetch_utxos(SyncUtxosByBlockRequest {
1✔
838
                start_header_hash: g_hash.clone(),
1✔
839
                limit: 10,
1✔
840
                page: 0,
1✔
841
                exclude_spent: false,
1✔
842
                version: 0,
1✔
843
            })
1✔
844
            .await
1✔
845
            .expect("fetch_utxos should succeed");
1✔
846

847
        assert_eq!(resp.blocks.len(), 10, "expected 10 blocks");
1✔
848
        assert_eq!(resp.blocks[0].height, 0, "Should be block (1)");
1✔
849
        assert_eq!(resp.blocks[1].height, 1, "Should be block (2)");
1✔
850
        assert_eq!(resp.blocks[2].height, 2, "Should be block (3)");
1✔
851
        assert_eq!(resp.blocks[3].height, 3, "Should be block (4)");
1✔
852
        assert_eq!(resp.blocks[4].height, 4, "Should be block (5)");
1✔
853
        assert_eq!(resp.blocks[5].height, 5, "Should be block (6)");
1✔
854
        assert_eq!(resp.blocks[6].height, 6, "Should be block (7)");
1✔
855
        assert_eq!(resp.blocks[7].height, 7, "Should be block (8)");
1✔
856
        assert_eq!(resp.blocks[8].height, 8, "Should be block (9)");
1✔
857
        assert_eq!(resp.blocks[9].height, 9, "Should be block (9)");
1✔
858
        let next_hash = chain.get("10").unwrap().hash().to_vec();
1✔
859
        assert_eq!(resp.next_header_to_scan, next_hash, "next header should point to 10");
1✔
860
        assert!(resp.has_next_page, "Should have more pages");
1✔
861
        let resp = service
1✔
862
            .fetch_utxos(SyncUtxosByBlockRequest {
1✔
863
                start_header_hash: g_hash,
1✔
864
                limit: 10,
1✔
865
                page: 1,
1✔
866
                exclude_spent: false,
1✔
867
                version: 0,
1✔
868
            })
1✔
869
            .await
1✔
870
            .expect("fetch_utxos should succeed");
1✔
871
        assert_eq!(resp.blocks.len(), 3, "expected 3 blocks");
1✔
872
        assert_eq!(resp.blocks[0].height, 10, "Should be block (10)");
1✔
873
        assert_eq!(resp.blocks[1].height, 11, "Should be block (11)");
1✔
874
        assert_eq!(resp.blocks[2].height, 12, "Should be block (12)");
1✔
875
        assert!(resp.next_header_to_scan.is_empty(), "Should be empty");
1✔
876
        assert!(!resp.has_next_page, "Should not have more pages");
1✔
877
    }
1✔
878
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc