• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 16350465781

17 Jul 2025 04:20PM UTC coverage: 57.753% (+0.01%) from 57.741%
16350465781

push

github

web-flow
feat: add output hash of inputs to scanning stream (#7334)

Description
---
adds spent output hashes to the stream by utxo block for the utxpo
scanner
fixes limites

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

* **New Features**
* Added an `inputs` field to UTXO sync responses, allowing clients to
receive input data alongside outputs when syncing by block.

* **Improvements**
* Enhanced UTXO sync logic to ensure all inputs are included and chunked
appropriately, even if their count exceeds outputs.
* Applied a global request body size limit of 4 MiB to the HTTP server
for improved consistency.

* **Tests**
* Updated test mocks to include the new `inputs` field in UTXO sync
responses.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

0 of 16 new or added lines in 1 file covered. (0.0%)

6 existing lines in 2 files now uncovered.

70116 of 121406 relevant lines covered (57.75%)

224179.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/base_layer/core/src/base_node/rpc/query_service.rs
1
// Copyright 2025 The Tari Project
2
// SPDX-License-Identifier: BSD-3-Clause
3

4
use log::trace;
5
use serde_valid::{validation, Validate};
6
use tari_common_types::{types, types::FixedHashSizeError};
7
use tari_utilities::{hex::Hex, ByteArray, ByteArrayError};
8
use thiserror::Error;
9

10
use crate::{
11
    base_node::{
12
        rpc::{
13
            models::{
14
                self,
15
                BlockUtxoInfo,
16
                GetUtxosByBlockRequest,
17
                GetUtxosByBlockResponse,
18
                MinimalUtxoSyncInfo,
19
                SyncUtxosByBlockRequest,
20
                SyncUtxosByBlockResponse,
21
                TipInfoResponse,
22
                TxLocation,
23
                TxQueryResponse,
24
            },
25
            BaseNodeWalletQueryService,
26
        },
27
        state_machine_service::states::StateInfo,
28
        StateMachineHandle,
29
    },
30
    chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend, ChainStorageError},
31
    mempool::{service::MempoolHandle, MempoolServiceError, TxStorageResponse},
32
    transactions::transaction_components::TransactionOutput,
33
};
34

35
const LOG_TARGET: &str = "c::bn::rpc::query_service";
36

37
#[derive(Debug, Error)]
38
pub enum Error {
39
    #[error("Failed to get chain metadata: {0}")]
40
    FailedToGetChainMetadata(#[from] ChainStorageError),
41
    #[error("Header not found at height: {height}")]
42
    HeaderNotFound { height: u64 },
43
    #[error("Signature conversion error: {0}")]
44
    SignatureConversion(ByteArrayError),
45
    #[error("Mempool service error: {0}")]
46
    MempoolService(#[from] MempoolServiceError),
47
    #[error("Serde validation error: {0}")]
48
    SerdeValidation(#[from] validation::Errors),
49
    #[error("Hash conversion error: {0}")]
50
    HashConversion(#[from] FixedHashSizeError),
51
    #[error("Start header hash not found")]
52
    StartHeaderHashNotFound,
53
    #[error("End header hash not found")]
54
    EndHeaderHashNotFound,
55
    #[error("Header hash not found")]
56
    HeaderHashNotFound,
57
    #[error("Start header height {start_height} cannot be greater than the end header height {end_height}")]
58
    HeaderHeightMismatch { start_height: u64, end_height: u64 },
59
}
60

61
pub struct Service<B> {
62
    db: AsyncBlockchainDb<B>,
63
    state_machine: StateMachineHandle,
64
    mempool: MempoolHandle,
65
}
66

67
impl<B: BlockchainBackend + 'static> Service<B> {
68
    pub fn new(db: AsyncBlockchainDb<B>, state_machine: StateMachineHandle, mempool: MempoolHandle) -> Self {
×
69
        Self {
×
70
            db,
×
71
            state_machine,
×
72
            mempool,
×
73
        }
×
74
    }
×
75

76
    fn state_machine(&self) -> StateMachineHandle {
×
77
        self.state_machine.clone()
×
78
    }
×
79

80
    fn db(&self) -> &AsyncBlockchainDb<B> {
×
81
        &self.db
×
82
    }
×
83

84
    fn mempool(&self) -> MempoolHandle {
×
85
        self.mempool.clone()
×
86
    }
×
87

88
    async fn fetch_kernel(&self, signature: types::Signature) -> Result<TxQueryResponse, Error> {
×
89
        let db = self.db();
×
90

×
91
        match db.fetch_kernel_by_excess_sig(signature.clone()).await? {
×
92
            None => (),
×
93
            Some((_, block_hash)) => match db.fetch_header_by_block_hash(block_hash).await? {
×
94
                None => (),
×
95
                Some(header) => {
×
96
                    let response = TxQueryResponse {
×
97
                        location: TxLocation::Mined,
×
98
                        mined_header_hash: Some(block_hash.to_vec()),
×
99
                        mined_height: Some(header.height),
×
100
                        mined_timestamp: Some(header.timestamp.as_u64()),
×
101
                    };
×
102
                    return Ok(response);
×
103
                },
104
            },
105
        };
106

107
        // If not in a block then check the mempool
108
        let mut mempool = self.mempool();
×
109
        let mempool_response = match mempool.get_tx_state_by_excess_sig(signature.clone()).await? {
×
110
            TxStorageResponse::UnconfirmedPool => TxQueryResponse {
×
111
                location: TxLocation::InMempool,
×
112
                mined_header_hash: None,
×
113
                mined_height: None,
×
114
                mined_timestamp: None,
×
115
            },
×
116
            TxStorageResponse::ReorgPool |
117
            TxStorageResponse::NotStoredOrphan |
118
            TxStorageResponse::NotStoredTimeLocked |
119
            TxStorageResponse::NotStoredAlreadySpent |
120
            TxStorageResponse::NotStoredConsensus |
121
            TxStorageResponse::NotStored |
122
            TxStorageResponse::NotStoredFeeTooLow |
123
            TxStorageResponse::NotStoredAlreadyMined => TxQueryResponse {
×
124
                location: TxLocation::NotStored,
×
125
                mined_timestamp: None,
×
126
                mined_height: None,
×
127
                mined_header_hash: None,
×
128
            },
×
129
        };
130

131
        Ok(mempool_response)
×
132
    }
×
133

134
    async fn fetch_utxos_by_block(&self, request: GetUtxosByBlockRequest) -> Result<GetUtxosByBlockResponse, Error> {
×
135
        request.validate()?;
×
136

137
        let hash = request.header_hash.clone().try_into()?;
×
138

139
        let header = self
×
140
            .db()
×
141
            .fetch_header_by_block_hash(hash)
×
142
            .await?
×
143
            .ok_or_else(|| Error::HeaderHashNotFound)?;
×
144

145
        // fetch utxos
146
        let outputs_with_statuses = self.db.fetch_outputs_in_block_with_spend_state(hash, None).await?;
×
147

148
        let outputs = outputs_with_statuses
×
149
            .into_iter()
×
150
            .map(|(output, _spent)| output)
×
151
            .collect::<Vec<TransactionOutput>>();
×
152

×
153
        // if its empty, we need to send an empty vec of outputs.
×
154
        let utxo_block_response = GetUtxosByBlockResponse {
×
155
            outputs,
×
156
            height: header.height,
×
157
            header_hash: hash.to_vec(),
×
158
            mined_timestamp: header.timestamp.as_u64(),
×
159
        };
×
160

×
161
        Ok(utxo_block_response)
×
162
    }
×
163

164
    #[allow(clippy::too_many_lines)]
165
    async fn fetch_utxos(&self, request: SyncUtxosByBlockRequest) -> Result<SyncUtxosByBlockResponse, Error> {
×
166
        // validate and fetch inputs
×
167
        request.validate()?;
×
168

169
        let hash = request.start_header_hash.clone().try_into()?;
×
170

171
        let start_header = self
×
172
            .db()
×
173
            .fetch_header_by_block_hash(hash)
×
174
            .await?
×
175
            .ok_or_else(|| Error::StartHeaderHashNotFound)?;
×
176

177
        let hash = request.end_header_hash.clone().try_into()?;
×
178

179
        let end_header = self
×
180
            .db
×
181
            .fetch_header_by_block_hash(hash)
×
182
            .await?
×
183
            .ok_or_else(|| Error::EndHeaderHashNotFound)?;
×
184

185
        // pagination
186
        let start_header_height = start_header.height + (request.page * request.limit);
×
187
        let start_header = self
×
188
            .db
×
189
            .fetch_header(start_header_height)
×
190
            .await?
×
191
            .ok_or_else(|| Error::HeaderNotFound {
×
192
                height: start_header_height,
×
193
            })?;
×
194

195
        if start_header.height > end_header.height {
×
196
            return Err(Error::HeaderHeightMismatch {
×
197
                start_height: start_header.height,
×
198
                end_height: end_header.height,
×
199
            });
×
200
        }
×
201

×
202
        // fetch utxos
×
203
        let mut utxos = vec![];
×
204
        let mut current_header = start_header;
×
205
        let mut fetched_utxos = 0;
×
206
        loop {
207
            let current_header_hash = current_header.hash();
×
208

×
209
            trace!(
×
210
                target: LOG_TARGET,
×
211
                "current header = {} ({})",
×
212
                current_header.height,
×
213
                current_header_hash.to_hex()
×
214
            );
215

216
            let outputs_with_statuses = self
×
217
                .db
×
218
                .fetch_outputs_in_block_with_spend_state(current_header.hash(), None)
×
219
                .await?;
×
NEW
220
            let mut inputs = self
×
NEW
221
                .db
×
NEW
222
                .fetch_inputs_in_block(current_header.hash())
×
NEW
223
                .await?
×
NEW
224
                .into_iter()
×
NEW
225
                .map(|input| input.output_hash().to_vec())
×
NEW
226
                .collect::<Vec<Vec<u8>>>();
×
227

×
228
            let outputs = outputs_with_statuses
×
229
                .into_iter()
×
230
                .map(|(output, _spent)| output)
×
231
                .collect::<Vec<TransactionOutput>>();
×
232

233
            for output_chunk in outputs.chunks(2000) {
×
NEW
234
                let inputs_to_send = if inputs.is_empty() {
×
NEW
235
                    Vec::new()
×
236
                } else {
NEW
237
                    let num_to_drain = inputs.len().min(2000);
×
NEW
238
                    inputs.drain(..num_to_drain).collect()
×
239
                };
240

241
                let output_block_response = BlockUtxoInfo {
×
242
                    outputs: output_chunk
×
243
                        .iter()
×
244
                        .map(|output| MinimalUtxoSyncInfo {
×
245
                            output_hash: output.hash().to_vec(),
×
246
                            commitment: output.commitment().to_vec(),
×
247
                            encrypted_data: output.encrypted_data().as_bytes().to_vec(),
×
248
                            sender_offset_public_key: output.sender_offset_public_key.to_vec(),
×
249
                        })
×
250
                        .collect(),
×
NEW
251
                    inputs: inputs_to_send,
×
252
                    height: current_header.height,
×
253
                    header_hash: current_header_hash.to_vec(),
×
254
                    mined_timestamp: current_header.timestamp.as_u64(),
×
255
                };
×
256
                utxos.push(output_block_response);
×
257
            }
×
258
            // We might still have inputs left to send if they are more than the outputs
NEW
259
            for input_chunk in inputs.chunks(2000) {
×
NEW
260
                let output_block_response = BlockUtxoInfo {
×
261
                    outputs: Vec::new(),
×
NEW
262
                    inputs: input_chunk.to_vec(),
×
263
                    height: current_header.height,
×
264
                    header_hash: current_header_hash.to_vec(),
×
265
                    mined_timestamp: current_header.timestamp.as_u64(),
×
266
                };
×
NEW
267
                utxos.push(output_block_response);
×
268
            }
×
269

270
            fetched_utxos += 1;
×
271

×
272
            if current_header.height >= end_header.height || fetched_utxos >= request.limit {
×
273
                break;
×
274
            }
×
275

276
            current_header =
×
277
                self.db
×
278
                    .fetch_header(current_header.height + 1)
×
279
                    .await?
×
280
                    .ok_or_else(|| Error::HeaderNotFound {
×
281
                        height: current_header.height + 1,
×
282
                    })?;
×
283
        }
284

285
        let has_next_page = (end_header.height - current_header.height) > 0;
×
286

×
287
        Ok(SyncUtxosByBlockResponse {
×
288
            blocks: utxos,
×
289
            has_next_page,
×
290
        })
×
291
    }
×
292
}
293

294
#[async_trait::async_trait]
295
impl<B: BlockchainBackend + 'static> BaseNodeWalletQueryService for Service<B> {
296
    type Error = Error;
297

298
    async fn get_tip_info(&self) -> Result<TipInfoResponse, Self::Error> {
×
299
        let state_machine = self.state_machine();
×
300
        let status_watch = state_machine.get_status_info_watch();
×
301
        let is_synced = match status_watch.borrow().state_info {
×
302
            StateInfo::Listening(li) => li.is_synced(),
×
303
            _ => false,
×
304
        };
305

306
        let metadata = self.db.get_chain_metadata().await?;
×
307

308
        Ok(TipInfoResponse {
×
309
            metadata: Some(metadata),
×
310
            is_synced,
×
311
        })
×
312
    }
×
313

314
    async fn get_header_by_height(&self, height: u64) -> Result<models::BlockHeader, Self::Error> {
×
315
        let result = self
×
316
            .db
×
317
            .fetch_header(height)
×
318
            .await?
×
319
            .ok_or(Error::HeaderNotFound { height })?
×
320
            .into();
×
321
        Ok(result)
×
322
    }
×
323

324
    async fn get_height_at_time(&self, epoch_time: u64) -> Result<u64, Self::Error> {
×
325
        trace!(target: LOG_TARGET, "requested_epoch_time: {}", epoch_time);
×
326
        let tip_header = self.db.fetch_tip_header().await?;
×
327

328
        let mut left_height = 0u64;
×
329
        let mut right_height = tip_header.height();
×
330

331
        while left_height <= right_height {
×
332
            let mut mid_height = (left_height + right_height) / 2;
×
333

×
334
            if mid_height == 0 {
×
335
                return Ok(0u64);
×
336
            }
×
337
            // If the two bounds are adjacent then perform the test between the right and left sides
×
338
            if left_height == mid_height {
×
339
                mid_height = right_height;
×
340
            }
×
341

342
            let mid_header = self
×
343
                .db
×
344
                .fetch_header(mid_height)
×
345
                .await?
×
346
                .ok_or_else(|| Error::HeaderNotFound { height: mid_height })?;
×
347
            let before_mid_header = self
×
348
                .db
×
349
                .fetch_header(mid_height - 1)
×
350
                .await?
×
351
                .ok_or_else(|| Error::HeaderNotFound { height: mid_height - 1 })?;
×
352
            trace!(
×
353
                target: LOG_TARGET,
×
354
                "requested_epoch_time: {}, left: {}, mid: {}/{} ({}/{}), right: {}",
×
355
                epoch_time,
×
356
                left_height,
×
357
                mid_height,
×
358
                mid_height-1,
×
359
                mid_header.timestamp.as_u64(),
×
360
                before_mid_header.timestamp.as_u64(),
×
361
                right_height
362
            );
363
            if epoch_time < mid_header.timestamp.as_u64() && epoch_time >= before_mid_header.timestamp.as_u64() {
×
364
                trace!(
×
365
                    target: LOG_TARGET,
×
366
                    "requested_epoch_time: {}, selected height: {}",
×
367
                    epoch_time, before_mid_header.height
368
                );
369
                return Ok(before_mid_header.height);
×
370
            } else if mid_height == right_height {
×
371
                trace!(
×
372
                    target: LOG_TARGET,
×
373
                    "requested_epoch_time: {}, selected height: {}",
×
374
                    epoch_time, right_height
375
                );
376
                return Ok(right_height);
×
377
            } else if epoch_time <= mid_header.timestamp.as_u64() {
×
378
                right_height = mid_height;
×
379
            } else {
×
380
                left_height = mid_height;
×
381
            }
×
382
        }
383

384
        Ok(0u64)
×
385
    }
×
386

387
    async fn transaction_query(
388
        &self,
389
        signature: crate::base_node::rpc::models::Signature,
390
    ) -> Result<TxQueryResponse, Self::Error> {
×
391
        let signature = signature.try_into().map_err(Error::SignatureConversion)?;
×
392

393
        let response = self.fetch_kernel(signature).await?;
×
394

395
        Ok(response)
×
396
    }
×
397

398
    async fn sync_utxos_by_block(
399
        &self,
400
        request: SyncUtxosByBlockRequest,
401
    ) -> Result<SyncUtxosByBlockResponse, Self::Error> {
×
402
        self.fetch_utxos(request).await
×
403
    }
×
404

405
    async fn get_utxos_by_block(
406
        &self,
407
        request: GetUtxosByBlockRequest,
408
    ) -> Result<GetUtxosByBlockResponse, Self::Error> {
×
409
        self.fetch_utxos_by_block(request).await
×
410
    }
×
411

412
    async fn get_utxos_mined_info(
413
        &self,
414
        request: models::GetUtxosMinedInfoRequest,
415
    ) -> Result<models::GetUtxosMinedInfoResponse, Self::Error> {
×
416
        request.validate()?;
×
417

418
        let mut utxos = vec![];
×
419

420
        let tip_header = self.db().fetch_tip_header().await?;
×
421
        for hash in request.hashes {
×
422
            let hash = hash.try_into()?;
×
423
            let output = self.db().fetch_output(hash).await?;
×
424
            if let Some(output) = output {
×
425
                utxos.push(models::MinedUtxoInfo {
×
426
                    utxo_hash: hash.to_vec(),
×
427
                    mined_in_hash: output.header_hash.to_vec(),
×
428
                    mined_in_height: output.mined_height,
×
429
                    mined_in_timestamp: output.mined_timestamp,
×
430
                });
×
431
            }
×
432
        }
433

434
        Ok(models::GetUtxosMinedInfoResponse {
×
435
            utxos,
×
436
            best_block_hash: tip_header.hash().to_vec(),
×
437
            best_block_height: tip_header.height(),
×
438
        })
×
439
    }
×
440

441
    async fn get_utxos_deleted_info(
442
        &self,
443
        request: models::GetUtxosDeletedInfoRequest,
444
    ) -> Result<models::GetUtxosDeletedInfoResponse, Self::Error> {
×
445
        request.validate()?;
×
446

447
        let mut utxos = vec![];
×
448

449
        let must_include_header = request.must_include_header.clone().try_into()?;
×
450
        if self
×
451
            .db()
×
452
            .fetch_header_by_block_hash(must_include_header)
×
453
            .await?
×
454
            .is_none()
×
455
        {
456
            return Err(Error::HeaderHashNotFound);
×
457
        }
×
458

459
        let tip_header = self.db().fetch_tip_header().await?;
×
460
        for hash in request.hashes {
×
461
            let hash = hash.try_into()?;
×
462
            let output = self.db().fetch_output(hash).await?;
×
463

464
            if let Some(output) = output {
×
465
                // is it still unspent?
466
                let input = self.db().fetch_input(hash).await?;
×
467
                if let Some(i) = input {
×
468
                    utxos.push(models::DeletedUtxoInfo {
×
469
                        utxo_hash: hash.to_vec(),
×
470
                        found_in_header: Some((output.mined_height, output.header_hash.to_vec())),
×
471
                        spent_in_header: Some((i.spent_height, i.header_hash.to_vec())),
×
472
                    });
×
473
                } else {
×
474
                    utxos.push(models::DeletedUtxoInfo {
×
475
                        utxo_hash: hash.to_vec(),
×
476
                        found_in_header: Some((output.mined_height, output.header_hash.to_vec())),
×
477
                        spent_in_header: None,
×
478
                    });
×
479
                }
×
480
            } else {
×
481
                utxos.push(models::DeletedUtxoInfo {
×
482
                    utxo_hash: hash.to_vec(),
×
483
                    found_in_header: None,
×
484
                    spent_in_header: None,
×
485
                });
×
486
            }
×
487
        }
488

489
        Ok(models::GetUtxosDeletedInfoResponse {
×
490
            utxos,
×
491
            best_block_hash: tip_header.hash().to_vec(),
×
492
            best_block_height: tip_header.height(),
×
493
        })
×
494
    }
×
495
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc