• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 18586282491

17 Oct 2025 07:53AM UTC coverage: 58.577% (-1.0%) from 59.552%
18586282491

push

github

web-flow
fix: change how pagination works on utxo scanning (#7546)

Description
---
Properly handle the special edge case, where the request has reached the
page limit, but we are also not done with the block. We also dont want
to split up the block over two requests. So we need to ensure that we
remove the partial block we added so that it can be requested fully in
the next request. We also dont want to get in a loop where the block
cannot fit into the page limit, so if the block is the same as the first
one, we just send it as it, partial. If net we remove it and let it be
sent in the next request.

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

* **Bug Fixes**
* Fixed edge cases in paginated UTXO queries when a page ends mid-block
to prevent partial or duplicated results.
* Ensures correct continuation across block boundaries and early exit
when the first block is also the last that can be sent.
* Removes trailing blocks or partial entries so subsequent pages request
the correct data.
  * Improves error reporting for rare pagination inconsistencies.
* Preserves behavior when limits aren’t reached or the chain end is
encountered.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

0 of 10 new or added lines in 1 file covered. (0.0%)

1137 existing lines in 32 files now uncovered.

66481 of 113494 relevant lines covered (58.58%)

228684.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/base_layer/core/src/base_node/rpc/query_service.rs
1
// Copyright 2025 The Tari Project
2
// SPDX-License-Identifier: BSD-3-Clause
3

4
use std::cmp;
5

6
use log::trace;
7
use serde_valid::{validation, Validate};
8
use tari_common_types::{types, types::FixedHashSizeError};
9
use tari_transaction_components::{
10
    rpc::{
11
        models,
12
        models::{
13
            BlockUtxoInfo,
14
            GenerateKernelMerkleProofResponse,
15
            GetUtxosByBlockRequest,
16
            GetUtxosByBlockResponse,
17
            MinimalUtxoSyncInfo,
18
            SyncUtxosByBlockRequest,
19
            SyncUtxosByBlockResponse,
20
            TipInfoResponse,
21
            TxLocation,
22
            TxQueryResponse,
23
        },
24
    },
25
    transaction_components::TransactionOutput,
26
};
27
use tari_utilities::{hex::Hex, ByteArray, ByteArrayError};
28
use thiserror::Error;
29

30
use crate::{
31
    base_node::{rpc::BaseNodeWalletQueryService, state_machine_service::states::StateInfo, StateMachineHandle},
32
    chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend, ChainStorageError},
33
    mempool::{service::MempoolHandle, MempoolServiceError, TxStorageResponse},
34
};
35

36
const LOG_TARGET: &str = "c::bn::rpc::query_service";
37

38
#[derive(Debug, Error)]
39
pub enum Error {
40
    #[error("Failed to get chain metadata: {0}")]
41
    FailedToGetChainMetadata(#[from] ChainStorageError),
42
    #[error("Header not found at height: {height}")]
43
    HeaderNotFound { height: u64 },
44
    #[error("Signature conversion error: {0}")]
45
    SignatureConversion(ByteArrayError),
46
    #[error("Mempool service error: {0}")]
47
    MempoolService(#[from] MempoolServiceError),
48
    #[error("Serde validation error: {0}")]
49
    SerdeValidation(#[from] validation::Errors),
50
    #[error("Hash conversion error: {0}")]
51
    HashConversion(#[from] FixedHashSizeError),
52
    #[error("Start header hash not found")]
53
    StartHeaderHashNotFound,
54
    #[error("End header hash not found")]
55
    EndHeaderHashNotFound,
56
    #[error("Header hash not found")]
57
    HeaderHashNotFound,
58
    #[error("Start header height {start_height} cannot be greater than the end header height {end_height}")]
59
    HeaderHeightMismatch { start_height: u64, end_height: u64 },
60
    #[error("A general error occurred: {0}")]
61
    General(anyhow::Error),
62
}
63

64
impl Error {
65
    fn general(err: impl Into<anyhow::Error>) -> Self {
×
66
        Error::General(err.into())
×
67
    }
×
68
}
69

70
pub struct Service<B> {
71
    db: AsyncBlockchainDb<B>,
72
    state_machine: StateMachineHandle,
73
    mempool: MempoolHandle,
74
}
75

76
impl<B: BlockchainBackend + 'static> Service<B> {
77
    pub fn new(db: AsyncBlockchainDb<B>, state_machine: StateMachineHandle, mempool: MempoolHandle) -> Self {
×
78
        Self {
×
79
            db,
×
80
            state_machine,
×
81
            mempool,
×
82
        }
×
83
    }
×
84

85
    fn state_machine(&self) -> StateMachineHandle {
×
86
        self.state_machine.clone()
×
87
    }
×
88

89
    fn db(&self) -> &AsyncBlockchainDb<B> {
×
90
        &self.db
×
91
    }
×
92

93
    fn mempool(&self) -> MempoolHandle {
×
94
        self.mempool.clone()
×
95
    }
×
96

97
    async fn fetch_kernel(&self, signature: types::CompressedSignature) -> Result<TxQueryResponse, Error> {
×
98
        let db = self.db();
×
99

100
        match db.fetch_kernel_by_excess_sig(signature.clone()).await? {
×
101
            None => (),
×
102
            Some((_, block_hash)) => match db.fetch_header_by_block_hash(block_hash).await? {
×
103
                None => (),
×
104
                Some(header) => {
×
105
                    let response = TxQueryResponse {
×
106
                        location: TxLocation::Mined,
×
107
                        mined_header_hash: Some(block_hash.to_vec()),
×
108
                        mined_height: Some(header.height),
×
109
                        mined_timestamp: Some(header.timestamp.as_u64()),
×
110
                    };
×
111
                    return Ok(response);
×
112
                },
113
            },
114
        };
115

116
        // If not in a block then check the mempool
117
        let mut mempool = self.mempool();
×
118
        let mempool_response = match mempool.get_tx_state_by_excess_sig(signature.clone()).await? {
×
119
            TxStorageResponse::UnconfirmedPool => TxQueryResponse {
×
120
                location: TxLocation::InMempool,
×
121
                mined_header_hash: None,
×
122
                mined_height: None,
×
123
                mined_timestamp: None,
×
124
            },
×
125
            TxStorageResponse::ReorgPool |
126
            TxStorageResponse::NotStoredOrphan |
127
            TxStorageResponse::NotStoredTimeLocked |
128
            TxStorageResponse::NotStoredAlreadySpent |
129
            TxStorageResponse::NotStoredConsensus |
130
            TxStorageResponse::NotStored |
131
            TxStorageResponse::NotStoredFeeTooLow |
132
            TxStorageResponse::NotStoredAlreadyMined => TxQueryResponse {
×
133
                location: TxLocation::NotStored,
×
134
                mined_timestamp: None,
×
135
                mined_height: None,
×
136
                mined_header_hash: None,
×
137
            },
×
138
        };
139

140
        Ok(mempool_response)
×
141
    }
×
142

143
    async fn fetch_utxos_by_block(&self, request: GetUtxosByBlockRequest) -> Result<GetUtxosByBlockResponse, Error> {
×
144
        request.validate()?;
×
145

146
        let hash = request.header_hash.clone().try_into()?;
×
147

148
        let header = self
×
149
            .db()
×
150
            .fetch_header_by_block_hash(hash)
×
151
            .await?
×
152
            .ok_or_else(|| Error::HeaderHashNotFound)?;
×
153

154
        // fetch utxos
155
        let outputs_with_statuses = self.db.fetch_outputs_in_block_with_spend_state(hash, None).await?;
×
156

157
        let outputs = outputs_with_statuses
×
158
            .into_iter()
×
159
            .map(|(output, _spent)| output)
×
160
            .collect::<Vec<TransactionOutput>>();
×
161

162
        // if its empty, we need to send an empty vec of outputs.
163
        let utxo_block_response = GetUtxosByBlockResponse {
×
164
            outputs,
×
165
            height: header.height,
×
166
            header_hash: hash.to_vec(),
×
167
            mined_timestamp: header.timestamp.as_u64(),
×
168
        };
×
169

170
        Ok(utxo_block_response)
×
171
    }
×
172

173
    #[allow(clippy::too_many_lines)]
174
    async fn fetch_utxos(&self, request: SyncUtxosByBlockRequest) -> Result<SyncUtxosByBlockResponse, Error> {
×
175
        // validate and fetch inputs
176
        request.validate()?;
×
177

178
        let hash = request.start_header_hash.clone().try_into()?;
×
179

180
        let start_header = self
×
181
            .db()
×
182
            .fetch_header_by_block_hash(hash)
×
183
            .await?
×
184
            .ok_or_else(|| Error::StartHeaderHashNotFound)?;
×
185

186
        let tip_header = self.db.fetch_tip_header().await?;
×
187
        // we only allow wallets to ask for a max of 100 blocks at a time and we want to cache the queries to ensure
188
        // they are in batch of 100 and we want to ensure they request goes to the nearest 100 block height so
189
        // we can cache all wallet's queries
190
        let increase = ((start_header.height + 100) / 100) * 100;
×
191
        let end_height = cmp::min(tip_header.header().height, increase);
×
192

193
        // pagination
194
        let start_header_height = start_header.height + (request.page * request.limit);
×
195
        let start_header = self
×
196
            .db
×
197
            .fetch_header(start_header_height)
×
198
            .await?
×
199
            .ok_or_else(|| Error::HeaderNotFound {
×
200
                height: start_header_height,
×
201
            })?;
×
202

203
        if start_header.height > tip_header.header().height {
×
204
            return Err(Error::HeaderHeightMismatch {
×
205
                start_height: start_header.height,
×
206
                end_height: tip_header.header().height,
×
207
            });
×
208
        }
×
209

210
        // fetch utxos
211
        let mut utxos = vec![];
×
212
        let mut current_header = start_header;
×
213
        let mut fetched_utxos = 0;
×
214
        let next_header_to_request;
215
        loop {
216
            let current_header_hash = current_header.hash();
×
217

218
            trace!(
×
219
                target: LOG_TARGET,
×
220
                "current header = {} ({})",
×
221
                current_header.height,
222
                current_header_hash.to_hex()
×
223
            );
224

225
            let outputs_with_statuses = self
×
226
                .db
×
227
                .fetch_outputs_in_block_with_spend_state(current_header.hash(), None)
×
228
                .await?;
×
229
            let mut inputs = self
×
230
                .db
×
231
                .fetch_inputs_in_block(current_header.hash())
×
232
                .await?
×
233
                .into_iter()
×
234
                .map(|input| input.output_hash().to_vec())
×
235
                .collect::<Vec<Vec<u8>>>();
×
236

237
            let outputs = outputs_with_statuses
×
238
                .into_iter()
×
239
                .map(|(output, _spent)| output)
×
240
                .collect::<Vec<TransactionOutput>>();
×
241

242
            for output_chunk in outputs.chunks(2000) {
×
243
                let inputs_to_send = if inputs.is_empty() {
×
244
                    Vec::new()
×
245
                } else {
246
                    let num_to_drain = inputs.len().min(2000);
×
247
                    inputs.drain(..num_to_drain).collect()
×
248
                };
249

250
                let output_block_response = BlockUtxoInfo {
×
251
                    outputs: output_chunk
×
252
                        .iter()
×
253
                        .map(|output| MinimalUtxoSyncInfo {
×
254
                            output_hash: output.hash().to_vec(),
×
255
                            commitment: output.commitment().to_vec(),
×
256
                            encrypted_data: output.encrypted_data().as_bytes().to_vec(),
×
257
                            sender_offset_public_key: output.sender_offset_public_key.to_vec(),
×
258
                        })
×
259
                        .collect(),
×
260
                    inputs: inputs_to_send,
×
261
                    height: current_header.height,
×
262
                    header_hash: current_header_hash.to_vec(),
×
263
                    mined_timestamp: current_header.timestamp.as_u64(),
×
264
                };
265
                utxos.push(output_block_response);
×
266
            }
267
            // We might still have inputs left to send if they are more than the outputs
268
            for input_chunk in inputs.chunks(2000) {
×
269
                let output_block_response = BlockUtxoInfo {
×
270
                    outputs: Vec::new(),
×
271
                    inputs: input_chunk.to_vec(),
×
272
                    height: current_header.height,
×
273
                    header_hash: current_header_hash.to_vec(),
×
274
                    mined_timestamp: current_header.timestamp.as_u64(),
×
275
                };
×
276
                utxos.push(output_block_response);
×
277
            }
×
278

279
            fetched_utxos += 1;
×
280

281
            if current_header.height >= tip_header.header().height {
×
282
                next_header_to_request = vec![];
×
283
                break;
×
284
            }
×
285
            if fetched_utxos >= request.limit {
×
286
                next_header_to_request = current_header.hash().to_vec();
×
287
                // This is a special edge case, our request has reached the page limit, but we are also not done with
288
                // the block. We also dont want to split up the block over two requests. So we need to ensure that we
289
                // remove the partial block we added so that it can be requested fully in the next request. We also dont
290
                // want to get in a loop where the block cannot fit into the page limit, so if the block is the same as
291
                // the first one, we just send it as is, partial. If not we remove it and let it be sent in the next
292
                // request.
NEW
293
                if utxos.first().ok_or(Error::General(anyhow::anyhow!("No utxos founds")))? // should never happen as we always add at least one block
×
294
                    .header_hash ==
NEW
295
                    current_header.hash().to_vec()
×
296
                {
297
                    // special edge case where the first block is also the last block we can send, so we just send it as
298
                    // is, partial
NEW
299
                    break;
×
NEW
300
                }
×
NEW
301
                while !utxos.is_empty() &&
×
NEW
302
                    utxos.last().ok_or(Error::General(anyhow::anyhow!("No utxos found")))? // should never happen as we always add at least one block
×
303
                    .header_hash ==
NEW
304
                        current_header.hash().to_vec()
×
NEW
305
                {
×
NEW
306
                    utxos.pop();
×
NEW
307
                }
×
308
                break;
×
309
            }
×
310

311
            current_header =
×
312
                self.db
×
313
                    .fetch_header(current_header.height + 1)
×
314
                    .await?
×
315
                    .ok_or_else(|| Error::HeaderNotFound {
×
316
                        height: current_header.height + 1,
×
317
                    })?;
×
318
            if current_header.height == end_height {
×
319
                next_header_to_request = current_header.hash().to_vec();
×
320
                break; // Stop if we reach the end height}
×
321
            }
×
322
        }
323

324
        let has_next_page = (end_height.saturating_sub(current_header.height)) > 0;
×
325

326
        Ok(SyncUtxosByBlockResponse {
×
327
            blocks: utxos,
×
328
            has_next_page,
×
329
            next_header_to_scan: next_header_to_request,
×
330
        })
×
331
    }
×
332
}
333

334
#[async_trait::async_trait]
335
impl<B: BlockchainBackend + 'static> BaseNodeWalletQueryService for Service<B> {
336
    type Error = Error;
337

338
    async fn get_tip_info(&self) -> Result<TipInfoResponse, Self::Error> {
×
339
        let state_machine = self.state_machine();
×
340
        let status_watch = state_machine.get_status_info_watch();
×
341
        let is_synced = match status_watch.borrow().state_info {
×
342
            StateInfo::Listening(li) => li.is_synced(),
×
343
            _ => false,
×
344
        };
345

346
        let metadata = self.db.get_chain_metadata().await?;
×
347

348
        Ok(TipInfoResponse {
×
349
            metadata: Some(metadata),
×
350
            is_synced,
×
351
        })
×
352
    }
×
353

354
    async fn get_header_by_height(&self, height: u64) -> Result<models::BlockHeader, Self::Error> {
×
355
        let result = self
×
356
            .db
×
357
            .fetch_header(height)
×
358
            .await?
×
359
            .ok_or(Error::HeaderNotFound { height })?
×
360
            .into();
×
361
        Ok(result)
×
362
    }
×
363

364
    async fn get_height_at_time(&self, epoch_time: u64) -> Result<u64, Self::Error> {
×
365
        trace!(target: LOG_TARGET, "requested_epoch_time: {}", epoch_time);
×
366
        let tip_header = self.db.fetch_tip_header().await?;
×
367

368
        let mut left_height = 0u64;
×
369
        let mut right_height = tip_header.height();
×
370

371
        while left_height <= right_height {
×
372
            let mut mid_height = (left_height + right_height) / 2;
×
373

374
            if mid_height == 0 {
×
375
                return Ok(0u64);
×
376
            }
×
377
            // If the two bounds are adjacent then perform the test between the right and left sides
378
            if left_height == mid_height {
×
379
                mid_height = right_height;
×
380
            }
×
381

382
            let mid_header = self
×
383
                .db
×
384
                .fetch_header(mid_height)
×
385
                .await?
×
386
                .ok_or_else(|| Error::HeaderNotFound { height: mid_height })?;
×
387
            let before_mid_header = self
×
388
                .db
×
389
                .fetch_header(mid_height - 1)
×
390
                .await?
×
391
                .ok_or_else(|| Error::HeaderNotFound { height: mid_height - 1 })?;
×
392
            trace!(
×
393
                target: LOG_TARGET,
×
394
                "requested_epoch_time: {}, left: {}, mid: {}/{} ({}/{}), right: {}",
×
395
                epoch_time,
396
                left_height,
397
                mid_height,
398
                mid_height-1,
×
399
                mid_header.timestamp.as_u64(),
×
400
                before_mid_header.timestamp.as_u64(),
×
401
                right_height
402
            );
403
            if epoch_time < mid_header.timestamp.as_u64() && epoch_time >= before_mid_header.timestamp.as_u64() {
×
404
                trace!(
×
405
                    target: LOG_TARGET,
×
406
                    "requested_epoch_time: {}, selected height: {}",
×
407
                    epoch_time, before_mid_header.height
408
                );
409
                return Ok(before_mid_header.height);
×
410
            } else if mid_height == right_height {
×
411
                trace!(
×
412
                    target: LOG_TARGET,
×
413
                    "requested_epoch_time: {epoch_time}, selected height: {right_height}"
×
414
                );
415
                return Ok(right_height);
×
416
            } else if epoch_time <= mid_header.timestamp.as_u64() {
×
417
                right_height = mid_height;
×
418
            } else {
×
419
                left_height = mid_height;
×
420
            }
×
421
        }
422

423
        Ok(0u64)
×
424
    }
×
425

426
    async fn transaction_query(
427
        &self,
428
        signature: crate::base_node::rpc::models::Signature,
429
    ) -> Result<TxQueryResponse, Self::Error> {
×
430
        let signature = signature.try_into().map_err(Error::SignatureConversion)?;
×
431

432
        let response = self.fetch_kernel(signature).await?;
×
433

434
        Ok(response)
×
435
    }
×
436

437
    async fn sync_utxos_by_block(
438
        &self,
439
        request: SyncUtxosByBlockRequest,
440
    ) -> Result<SyncUtxosByBlockResponse, Self::Error> {
×
441
        self.fetch_utxos(request).await
×
442
    }
×
443

444
    async fn get_utxos_by_block(
445
        &self,
446
        request: GetUtxosByBlockRequest,
447
    ) -> Result<GetUtxosByBlockResponse, Self::Error> {
×
448
        self.fetch_utxos_by_block(request).await
×
449
    }
×
450

451
    async fn get_utxos_mined_info(
452
        &self,
453
        request: models::GetUtxosMinedInfoRequest,
454
    ) -> Result<models::GetUtxosMinedInfoResponse, Self::Error> {
×
455
        request.validate()?;
×
456

457
        let mut utxos = vec![];
×
458

459
        let tip_header = self.db().fetch_tip_header().await?;
×
460
        for hash in request.hashes {
×
461
            let hash = hash.try_into()?;
×
462
            let output = self.db().fetch_output(hash).await?;
×
463
            if let Some(output) = output {
×
464
                utxos.push(models::MinedUtxoInfo {
×
465
                    utxo_hash: hash.to_vec(),
×
466
                    mined_in_hash: output.header_hash.to_vec(),
×
467
                    mined_in_height: output.mined_height,
×
468
                    mined_in_timestamp: output.mined_timestamp,
×
469
                });
×
470
            }
×
471
        }
472

473
        Ok(models::GetUtxosMinedInfoResponse {
×
474
            utxos,
×
475
            best_block_hash: tip_header.hash().to_vec(),
×
476
            best_block_height: tip_header.height(),
×
477
        })
×
478
    }
×
479

480
    async fn get_utxos_deleted_info(
481
        &self,
482
        request: models::GetUtxosDeletedInfoRequest,
483
    ) -> Result<models::GetUtxosDeletedInfoResponse, Self::Error> {
×
484
        request.validate()?;
×
485

486
        let mut utxos = vec![];
×
487

488
        let must_include_header = request.must_include_header.clone().try_into()?;
×
489
        if self
×
490
            .db()
×
491
            .fetch_header_by_block_hash(must_include_header)
×
492
            .await?
×
493
            .is_none()
×
494
        {
495
            return Err(Error::HeaderHashNotFound);
×
496
        }
×
497

498
        let tip_header = self.db().fetch_tip_header().await?;
×
499
        for hash in request.hashes {
×
500
            let hash = hash.try_into()?;
×
501
            let output = self.db().fetch_output(hash).await?;
×
502

503
            if let Some(output) = output {
×
504
                // is it still unspent?
505
                let input = self.db().fetch_input(hash).await?;
×
506
                if let Some(i) = input {
×
507
                    utxos.push(models::DeletedUtxoInfo {
×
508
                        utxo_hash: hash.to_vec(),
×
509
                        found_in_header: Some((output.mined_height, output.header_hash.to_vec())),
×
510
                        spent_in_header: Some((i.spent_height, i.header_hash.to_vec())),
×
511
                    });
×
512
                } else {
×
513
                    utxos.push(models::DeletedUtxoInfo {
×
514
                        utxo_hash: hash.to_vec(),
×
515
                        found_in_header: Some((output.mined_height, output.header_hash.to_vec())),
×
516
                        spent_in_header: None,
×
517
                    });
×
518
                }
×
519
            } else {
×
520
                utxos.push(models::DeletedUtxoInfo {
×
521
                    utxo_hash: hash.to_vec(),
×
522
                    found_in_header: None,
×
523
                    spent_in_header: None,
×
524
                });
×
525
            }
×
526
        }
527

528
        Ok(models::GetUtxosDeletedInfoResponse {
×
529
            utxos,
×
530
            best_block_hash: tip_header.hash().to_vec(),
×
531
            best_block_height: tip_header.height(),
×
532
        })
×
533
    }
×
534

535
    async fn generate_kernel_merkle_proof(
536
        &self,
537
        excess_sig: types::CompressedSignature,
538
    ) -> Result<GenerateKernelMerkleProofResponse, Self::Error> {
×
539
        let proof = self.db().generate_kernel_merkle_proof(excess_sig).await?;
×
540
        Ok(GenerateKernelMerkleProofResponse {
541
            encoded_merkle_proof: bincode::serialize(&proof.merkle_proof).map_err(Error::general)?,
×
542
            block_hash: proof.block_hash,
×
543
            leaf_index: proof.leaf_index.value() as u64,
×
544
        })
545
    }
×
546
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc