• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 17033178607

18 Aug 2025 06:45AM UTC coverage: 54.49% (-0.007%) from 54.497%
17033178607

push

github

stringhandler
Merge branch 'development' of github.com:tari-project/tari into odev

971 of 2923 new or added lines in 369 files covered. (33.22%)

5804 existing lines in 173 files now uncovered.

76688 of 140739 relevant lines covered (54.49%)

193850.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/base_layer/core/src/base_node/rpc/query_service.rs
1
// Copyright 2025 The Tari Project
2
// SPDX-License-Identifier: BSD-3-Clause
3

4
use std::cmp;
5

6
use log::trace;
7
use serde_valid::{validation, Validate};
8
use tari_common_types::{types, types::FixedHashSizeError};
9
use tari_utilities::{hex::Hex, ByteArray, ByteArrayError};
10
use thiserror::Error;
11

12
use crate::{
13
    base_node::{
14
        rpc::{
15
            models::{
16
                self,
17
                BlockUtxoInfo,
18
                GetUtxosByBlockRequest,
19
                GetUtxosByBlockResponse,
20
                MinimalUtxoSyncInfo,
21
                SyncUtxosByBlockRequest,
22
                SyncUtxosByBlockResponse,
23
                TipInfoResponse,
24
                TxLocation,
25
                TxQueryResponse,
26
            },
27
            BaseNodeWalletQueryService,
28
        },
29
        state_machine_service::states::StateInfo,
30
        StateMachineHandle,
31
    },
32
    chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend, ChainStorageError},
33
    mempool::{service::MempoolHandle, MempoolServiceError, TxStorageResponse},
34
    transactions::transaction_components::TransactionOutput,
35
};
36

37
const LOG_TARGET: &str = "c::bn::rpc::query_service";
38

39
#[derive(Debug, Error)]
40
pub enum Error {
41
    #[error("Failed to get chain metadata: {0}")]
42
    FailedToGetChainMetadata(#[from] ChainStorageError),
43
    #[error("Header not found at height: {height}")]
44
    HeaderNotFound { height: u64 },
45
    #[error("Signature conversion error: {0}")]
46
    SignatureConversion(ByteArrayError),
47
    #[error("Mempool service error: {0}")]
48
    MempoolService(#[from] MempoolServiceError),
49
    #[error("Serde validation error: {0}")]
50
    SerdeValidation(#[from] validation::Errors),
51
    #[error("Hash conversion error: {0}")]
52
    HashConversion(#[from] FixedHashSizeError),
53
    #[error("Start header hash not found")]
54
    StartHeaderHashNotFound,
55
    #[error("End header hash not found")]
56
    EndHeaderHashNotFound,
57
    #[error("Header hash not found")]
58
    HeaderHashNotFound,
59
    #[error("Start header height {start_height} cannot be greater than the end header height {end_height}")]
60
    HeaderHeightMismatch { start_height: u64, end_height: u64 },
61
}
62

63
pub struct Service<B> {
64
    db: AsyncBlockchainDb<B>,
65
    state_machine: StateMachineHandle,
66
    mempool: MempoolHandle,
67
}
68

69
impl<B: BlockchainBackend + 'static> Service<B> {
70
    pub fn new(db: AsyncBlockchainDb<B>, state_machine: StateMachineHandle, mempool: MempoolHandle) -> Self {
×
71
        Self {
×
72
            db,
×
73
            state_machine,
×
74
            mempool,
×
75
        }
×
76
    }
×
77

78
    fn state_machine(&self) -> StateMachineHandle {
×
79
        self.state_machine.clone()
×
80
    }
×
81

82
    fn db(&self) -> &AsyncBlockchainDb<B> {
×
83
        &self.db
×
84
    }
×
85

86
    fn mempool(&self) -> MempoolHandle {
×
87
        self.mempool.clone()
×
88
    }
×
89

90
    async fn fetch_kernel(&self, signature: types::Signature) -> Result<TxQueryResponse, Error> {
×
91
        let db = self.db();
×
92

×
93
        match db.fetch_kernel_by_excess_sig(signature.clone()).await? {
×
94
            None => (),
×
95
            Some((_, block_hash)) => match db.fetch_header_by_block_hash(block_hash).await? {
×
96
                None => (),
×
97
                Some(header) => {
×
98
                    let response = TxQueryResponse {
×
99
                        location: TxLocation::Mined,
×
100
                        mined_header_hash: Some(block_hash.to_vec()),
×
101
                        mined_height: Some(header.height),
×
102
                        mined_timestamp: Some(header.timestamp.as_u64()),
×
103
                    };
×
104
                    return Ok(response);
×
105
                },
106
            },
107
        };
108

109
        // If not in a block then check the mempool
110
        let mut mempool = self.mempool();
×
111
        let mempool_response = match mempool.get_tx_state_by_excess_sig(signature.clone()).await? {
×
112
            TxStorageResponse::UnconfirmedPool => TxQueryResponse {
×
113
                location: TxLocation::InMempool,
×
114
                mined_header_hash: None,
×
115
                mined_height: None,
×
116
                mined_timestamp: None,
×
117
            },
×
118
            TxStorageResponse::ReorgPool |
119
            TxStorageResponse::NotStoredOrphan |
120
            TxStorageResponse::NotStoredTimeLocked |
121
            TxStorageResponse::NotStoredAlreadySpent |
122
            TxStorageResponse::NotStoredConsensus |
123
            TxStorageResponse::NotStored |
124
            TxStorageResponse::NotStoredFeeTooLow |
125
            TxStorageResponse::NotStoredAlreadyMined => TxQueryResponse {
×
126
                location: TxLocation::NotStored,
×
127
                mined_timestamp: None,
×
128
                mined_height: None,
×
129
                mined_header_hash: None,
×
130
            },
×
131
        };
132

133
        Ok(mempool_response)
×
134
    }
×
135

136
    async fn fetch_utxos_by_block(&self, request: GetUtxosByBlockRequest) -> Result<GetUtxosByBlockResponse, Error> {
×
137
        request.validate()?;
×
138

139
        let hash = request.header_hash.clone().try_into()?;
×
140

141
        let header = self
×
142
            .db()
×
143
            .fetch_header_by_block_hash(hash)
×
144
            .await?
×
145
            .ok_or_else(|| Error::HeaderHashNotFound)?;
×
146

147
        // fetch utxos
148
        let outputs_with_statuses = self.db.fetch_outputs_in_block_with_spend_state(hash, None).await?;
×
149

150
        let outputs = outputs_with_statuses
×
151
            .into_iter()
×
152
            .map(|(output, _spent)| output)
×
153
            .collect::<Vec<TransactionOutput>>();
×
154

×
155
        // if its empty, we need to send an empty vec of outputs.
×
156
        let utxo_block_response = GetUtxosByBlockResponse {
×
157
            outputs,
×
158
            height: header.height,
×
159
            header_hash: hash.to_vec(),
×
160
            mined_timestamp: header.timestamp.as_u64(),
×
161
        };
×
162

×
163
        Ok(utxo_block_response)
×
164
    }
×
165

166
    #[allow(clippy::too_many_lines)]
167
    async fn fetch_utxos(&self, request: SyncUtxosByBlockRequest) -> Result<SyncUtxosByBlockResponse, Error> {
×
168
        // validate and fetch inputs
×
169
        request.validate()?;
×
170

171
        let hash = request.start_header_hash.clone().try_into()?;
×
172

173
        let start_header = self
×
174
            .db()
×
175
            .fetch_header_by_block_hash(hash)
×
176
            .await?
×
177
            .ok_or_else(|| Error::StartHeaderHashNotFound)?;
×
178

179
        let tip_header = self.db.fetch_tip_header().await?;
×
180
        // we only allow wallets to ask for a max of 100 blocks at a time and we want to cache the queries to ensure
181
        // they are in batch of 100 and we want to ensure they request goes to the nearest 100 block height so
182
        // we can cache all wallet's queries
183
        let increase = ((start_header.height + 100) / 100) * 100;
×
184
        let end_height = cmp::min(tip_header.header().height, increase);
×
185

×
186
        // pagination
×
187
        let start_header_height = start_header.height + (request.page * request.limit);
×
188
        let start_header = self
×
189
            .db
×
190
            .fetch_header(start_header_height)
×
191
            .await?
×
192
            .ok_or_else(|| Error::HeaderNotFound {
×
193
                height: start_header_height,
×
194
            })?;
×
195

196
        if start_header.height > tip_header.header().height {
×
197
            return Err(Error::HeaderHeightMismatch {
×
198
                start_height: start_header.height,
×
199
                end_height: tip_header.header().height,
×
200
            });
×
201
        }
×
202

×
203
        // fetch utxos
×
204
        let mut utxos = vec![];
×
205
        let mut current_header = start_header;
×
206
        let mut fetched_utxos = 0;
×
207
        let next_header_to_request;
208
        loop {
209
            let current_header_hash = current_header.hash();
×
210

×
211
            trace!(
×
212
                target: LOG_TARGET,
×
213
                "current header = {} ({})",
×
214
                current_header.height,
×
215
                current_header_hash.to_hex()
×
216
            );
217

218
            let outputs_with_statuses = self
×
219
                .db
×
220
                .fetch_outputs_in_block_with_spend_state(current_header.hash(), None)
×
221
                .await?;
×
222
            let mut inputs = self
×
223
                .db
×
224
                .fetch_inputs_in_block(current_header.hash())
×
225
                .await?
×
226
                .into_iter()
×
227
                .map(|input| input.output_hash().to_vec())
×
228
                .collect::<Vec<Vec<u8>>>();
×
229

×
230
            let outputs = outputs_with_statuses
×
231
                .into_iter()
×
232
                .map(|(output, _spent)| output)
×
233
                .collect::<Vec<TransactionOutput>>();
×
234

235
            for output_chunk in outputs.chunks(2000) {
×
236
                let inputs_to_send = if inputs.is_empty() {
×
237
                    Vec::new()
×
238
                } else {
239
                    let num_to_drain = inputs.len().min(2000);
×
240
                    inputs.drain(..num_to_drain).collect()
×
241
                };
242

243
                let output_block_response = BlockUtxoInfo {
×
244
                    outputs: output_chunk
×
245
                        .iter()
×
246
                        .map(|output| MinimalUtxoSyncInfo {
×
247
                            output_hash: output.hash().to_vec(),
×
248
                            commitment: output.commitment().to_vec(),
×
249
                            encrypted_data: output.encrypted_data().as_bytes().to_vec(),
×
250
                            sender_offset_public_key: output.sender_offset_public_key.to_vec(),
×
251
                        })
×
252
                        .collect(),
×
253
                    inputs: inputs_to_send,
×
254
                    height: current_header.height,
×
255
                    header_hash: current_header_hash.to_vec(),
×
256
                    mined_timestamp: current_header.timestamp.as_u64(),
×
257
                };
×
258
                utxos.push(output_block_response);
×
259
            }
×
260
            // We might still have inputs left to send if they are more than the outputs
261
            for input_chunk in inputs.chunks(2000) {
×
262
                let output_block_response = BlockUtxoInfo {
×
263
                    outputs: Vec::new(),
×
264
                    inputs: input_chunk.to_vec(),
×
265
                    height: current_header.height,
×
266
                    header_hash: current_header_hash.to_vec(),
×
267
                    mined_timestamp: current_header.timestamp.as_u64(),
×
268
                };
×
269
                utxos.push(output_block_response);
×
270
            }
×
271

272
            fetched_utxos += 1;
×
273

×
274
            if current_header.height >= tip_header.header().height {
×
275
                next_header_to_request = vec![];
×
276
                break;
×
277
            }
×
278
            if fetched_utxos >= request.limit {
×
279
                next_header_to_request = current_header.hash().to_vec();
×
280
                break;
×
281
            }
×
282

283
            current_header =
×
284
                self.db
×
285
                    .fetch_header(current_header.height + 1)
×
286
                    .await?
×
287
                    .ok_or_else(|| Error::HeaderNotFound {
×
288
                        height: current_header.height + 1,
×
289
                    })?;
×
290
            if current_header.height == end_height {
×
291
                next_header_to_request = current_header.hash().to_vec();
×
292
                break; // Stop if we reach the end height}
×
293
            }
×
294
        }
295

296
        let has_next_page = (end_height.saturating_sub(current_header.height)) > 0;
×
297

×
298
        Ok(SyncUtxosByBlockResponse {
×
299
            blocks: utxos,
×
300
            has_next_page,
×
301
            next_header_to_scan: next_header_to_request,
×
302
        })
×
303
    }
×
304
}
305

306
#[async_trait::async_trait]
307
impl<B: BlockchainBackend + 'static> BaseNodeWalletQueryService for Service<B> {
308
    type Error = Error;
309

310
    async fn get_tip_info(&self) -> Result<TipInfoResponse, Self::Error> {
×
311
        let state_machine = self.state_machine();
×
312
        let status_watch = state_machine.get_status_info_watch();
×
313
        let is_synced = match status_watch.borrow().state_info {
×
314
            StateInfo::Listening(li) => li.is_synced(),
×
315
            _ => false,
×
316
        };
317

318
        let metadata = self.db.get_chain_metadata().await?;
×
319

320
        Ok(TipInfoResponse {
×
321
            metadata: Some(metadata),
×
322
            is_synced,
×
323
        })
×
324
    }
×
325

326
    async fn get_header_by_height(&self, height: u64) -> Result<models::BlockHeader, Self::Error> {
×
327
        let result = self
×
328
            .db
×
329
            .fetch_header(height)
×
330
            .await?
×
331
            .ok_or(Error::HeaderNotFound { height })?
×
332
            .into();
×
333
        Ok(result)
×
334
    }
×
335

336
    async fn get_height_at_time(&self, epoch_time: u64) -> Result<u64, Self::Error> {
×
337
        trace!(target: LOG_TARGET, "requested_epoch_time: {}", epoch_time);
×
338
        let tip_header = self.db.fetch_tip_header().await?;
×
339

340
        let mut left_height = 0u64;
×
341
        let mut right_height = tip_header.height();
×
342

343
        while left_height <= right_height {
×
344
            let mut mid_height = (left_height + right_height) / 2;
×
345

×
346
            if mid_height == 0 {
×
347
                return Ok(0u64);
×
348
            }
×
349
            // If the two bounds are adjacent then perform the test between the right and left sides
×
350
            if left_height == mid_height {
×
351
                mid_height = right_height;
×
352
            }
×
353

354
            let mid_header = self
×
355
                .db
×
356
                .fetch_header(mid_height)
×
357
                .await?
×
358
                .ok_or_else(|| Error::HeaderNotFound { height: mid_height })?;
×
359
            let before_mid_header = self
×
360
                .db
×
361
                .fetch_header(mid_height - 1)
×
362
                .await?
×
363
                .ok_or_else(|| Error::HeaderNotFound { height: mid_height - 1 })?;
×
364
            trace!(
×
365
                target: LOG_TARGET,
×
366
                "requested_epoch_time: {}, left: {}, mid: {}/{} ({}/{}), right: {}",
×
367
                epoch_time,
×
368
                left_height,
×
369
                mid_height,
×
370
                mid_height-1,
×
371
                mid_header.timestamp.as_u64(),
×
372
                before_mid_header.timestamp.as_u64(),
×
373
                right_height
374
            );
375
            if epoch_time < mid_header.timestamp.as_u64() && epoch_time >= before_mid_header.timestamp.as_u64() {
×
376
                trace!(
×
377
                    target: LOG_TARGET,
×
378
                    "requested_epoch_time: {}, selected height: {}",
×
379
                    epoch_time, before_mid_header.height
380
                );
381
                return Ok(before_mid_header.height);
×
382
            } else if mid_height == right_height {
×
383
                trace!(
×
384
                    target: LOG_TARGET,
×
NEW
385
                    "requested_epoch_time: {epoch_time}, selected height: {right_height}"
×
386
                );
387
                return Ok(right_height);
×
388
            } else if epoch_time <= mid_header.timestamp.as_u64() {
×
389
                right_height = mid_height;
×
390
            } else {
×
391
                left_height = mid_height;
×
UNCOV
392
            }
×
393
        }
394

395
        Ok(0u64)
×
UNCOV
396
    }
×
397

398
    async fn transaction_query(
399
        &self,
400
        signature: crate::base_node::rpc::models::Signature,
401
    ) -> Result<TxQueryResponse, Self::Error> {
×
UNCOV
402
        let signature = signature.try_into().map_err(Error::SignatureConversion)?;
×
403

UNCOV
404
        let response = self.fetch_kernel(signature).await?;
×
405

406
        Ok(response)
×
UNCOV
407
    }
×
408

409
    async fn sync_utxos_by_block(
410
        &self,
411
        request: SyncUtxosByBlockRequest,
412
    ) -> Result<SyncUtxosByBlockResponse, Self::Error> {
×
413
        self.fetch_utxos(request).await
×
UNCOV
414
    }
×
415

416
    async fn get_utxos_by_block(
417
        &self,
418
        request: GetUtxosByBlockRequest,
419
    ) -> Result<GetUtxosByBlockResponse, Self::Error> {
×
420
        self.fetch_utxos_by_block(request).await
×
UNCOV
421
    }
×
422

423
    async fn get_utxos_mined_info(
424
        &self,
425
        request: models::GetUtxosMinedInfoRequest,
426
    ) -> Result<models::GetUtxosMinedInfoResponse, Self::Error> {
×
UNCOV
427
        request.validate()?;
×
428

UNCOV
429
        let mut utxos = vec![];
×
430

431
        let tip_header = self.db().fetch_tip_header().await?;
×
432
        for hash in request.hashes {
×
433
            let hash = hash.try_into()?;
×
434
            let output = self.db().fetch_output(hash).await?;
×
435
            if let Some(output) = output {
×
436
                utxos.push(models::MinedUtxoInfo {
×
437
                    utxo_hash: hash.to_vec(),
×
438
                    mined_in_hash: output.header_hash.to_vec(),
×
439
                    mined_in_height: output.mined_height,
×
440
                    mined_in_timestamp: output.mined_timestamp,
×
441
                });
×
UNCOV
442
            }
×
443
        }
444

445
        Ok(models::GetUtxosMinedInfoResponse {
×
446
            utxos,
×
447
            best_block_hash: tip_header.hash().to_vec(),
×
448
            best_block_height: tip_header.height(),
×
449
        })
×
UNCOV
450
    }
×
451

452
    async fn get_utxos_deleted_info(
453
        &self,
454
        request: models::GetUtxosDeletedInfoRequest,
455
    ) -> Result<models::GetUtxosDeletedInfoResponse, Self::Error> {
×
UNCOV
456
        request.validate()?;
×
457

UNCOV
458
        let mut utxos = vec![];
×
459

460
        let must_include_header = request.must_include_header.clone().try_into()?;
×
461
        if self
×
462
            .db()
×
463
            .fetch_header_by_block_hash(must_include_header)
×
464
            .await?
×
UNCOV
465
            .is_none()
×
466
        {
467
            return Err(Error::HeaderHashNotFound);
×
UNCOV
468
        }
×
469

470
        let tip_header = self.db().fetch_tip_header().await?;
×
471
        for hash in request.hashes {
×
472
            let hash = hash.try_into()?;
×
UNCOV
473
            let output = self.db().fetch_output(hash).await?;
×
474

UNCOV
475
            if let Some(output) = output {
×
476
                // is it still unspent?
477
                let input = self.db().fetch_input(hash).await?;
×
478
                if let Some(i) = input {
×
479
                    utxos.push(models::DeletedUtxoInfo {
×
480
                        utxo_hash: hash.to_vec(),
×
481
                        found_in_header: Some((output.mined_height, output.header_hash.to_vec())),
×
482
                        spent_in_header: Some((i.spent_height, i.header_hash.to_vec())),
×
483
                    });
×
484
                } else {
×
485
                    utxos.push(models::DeletedUtxoInfo {
×
486
                        utxo_hash: hash.to_vec(),
×
487
                        found_in_header: Some((output.mined_height, output.header_hash.to_vec())),
×
488
                        spent_in_header: None,
×
489
                    });
×
490
                }
×
491
            } else {
×
492
                utxos.push(models::DeletedUtxoInfo {
×
493
                    utxo_hash: hash.to_vec(),
×
494
                    found_in_header: None,
×
495
                    spent_in_header: None,
×
496
                });
×
UNCOV
497
            }
×
498
        }
499

500
        Ok(models::GetUtxosDeletedInfoResponse {
×
501
            utxos,
×
502
            best_block_hash: tip_header.hash().to_vec(),
×
503
            best_block_height: tip_header.height(),
×
504
        })
×
UNCOV
505
    }
×
506
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc