• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 18097567115

29 Sep 2025 12:50PM UTC coverage: 58.554% (-2.3%) from 60.88%
18097567115

push

github

web-flow
chore(ci): switch rust toolchain to stable (#7524)

Description
switch rust toolchain to stable

Motivation and Context
use stable rust toolchain


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

* **Chores**
* Standardized Rust toolchain on stable across CI workflows for more
predictable builds.
* Streamlined setup by removing unnecessary components and aligning
toolchain configuration with environment variables.
  * Enabled an environment flag to improve rustup behavior during CI.
* Improved coverage workflow consistency with dynamic toolchain
selection.

* **Tests**
* Removed nightly-only requirements, simplifying test commands and
improving compatibility.
* Expanded CI triggers to include ci-* branches for better pre-merge
validation.
* Maintained existing job logic while improving reliability and
maintainability.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->

66336 of 113291 relevant lines covered (58.55%)

551641.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/base_layer/core/src/base_node/rpc/query_service.rs
1
// Copyright 2025 The Tari Project
2
// SPDX-License-Identifier: BSD-3-Clause
3

4
use std::cmp;
5

6
use log::trace;
7
use serde_valid::{validation, Validate};
8
use tari_common_types::{types, types::FixedHashSizeError};
9
use tari_transaction_components::{
10
    rpc::{
11
        models,
12
        models::{
13
            BlockUtxoInfo,
14
            GenerateKernelMerkleProofResponse,
15
            GetUtxosByBlockRequest,
16
            GetUtxosByBlockResponse,
17
            MinimalUtxoSyncInfo,
18
            SyncUtxosByBlockRequest,
19
            SyncUtxosByBlockResponse,
20
            TipInfoResponse,
21
            TxLocation,
22
            TxQueryResponse,
23
        },
24
    },
25
    transaction_components::TransactionOutput,
26
};
27
use tari_utilities::{hex::Hex, ByteArray, ByteArrayError};
28
use thiserror::Error;
29

30
use crate::{
31
    base_node::{rpc::BaseNodeWalletQueryService, state_machine_service::states::StateInfo, StateMachineHandle},
32
    chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend, ChainStorageError},
33
    mempool::{service::MempoolHandle, MempoolServiceError, TxStorageResponse},
34
};
35

36
const LOG_TARGET: &str = "c::bn::rpc::query_service";
37

38
#[derive(Debug, Error)]
39
pub enum Error {
40
    #[error("Failed to get chain metadata: {0}")]
41
    FailedToGetChainMetadata(#[from] ChainStorageError),
42
    #[error("Header not found at height: {height}")]
43
    HeaderNotFound { height: u64 },
44
    #[error("Signature conversion error: {0}")]
45
    SignatureConversion(ByteArrayError),
46
    #[error("Mempool service error: {0}")]
47
    MempoolService(#[from] MempoolServiceError),
48
    #[error("Serde validation error: {0}")]
49
    SerdeValidation(#[from] validation::Errors),
50
    #[error("Hash conversion error: {0}")]
51
    HashConversion(#[from] FixedHashSizeError),
52
    #[error("Start header hash not found")]
53
    StartHeaderHashNotFound,
54
    #[error("End header hash not found")]
55
    EndHeaderHashNotFound,
56
    #[error("Header hash not found")]
57
    HeaderHashNotFound,
58
    #[error("Start header height {start_height} cannot be greater than the end header height {end_height}")]
59
    HeaderHeightMismatch { start_height: u64, end_height: u64 },
60
    #[error("A general error occurred: {0}")]
61
    General(anyhow::Error),
62
}
63

64
impl Error {
65
    fn general(err: impl Into<anyhow::Error>) -> Self {
×
66
        Error::General(err.into())
×
67
    }
×
68
}
69

70
pub struct Service<B> {
71
    db: AsyncBlockchainDb<B>,
72
    state_machine: StateMachineHandle,
73
    mempool: MempoolHandle,
74
}
75

76
impl<B: BlockchainBackend + 'static> Service<B> {
77
    pub fn new(db: AsyncBlockchainDb<B>, state_machine: StateMachineHandle, mempool: MempoolHandle) -> Self {
×
78
        Self {
×
79
            db,
×
80
            state_machine,
×
81
            mempool,
×
82
        }
×
83
    }
×
84

85
    fn state_machine(&self) -> StateMachineHandle {
×
86
        self.state_machine.clone()
×
87
    }
×
88

89
    fn db(&self) -> &AsyncBlockchainDb<B> {
×
90
        &self.db
×
91
    }
×
92

93
    fn mempool(&self) -> MempoolHandle {
×
94
        self.mempool.clone()
×
95
    }
×
96

97
    async fn fetch_kernel(&self, signature: types::CompressedSignature) -> Result<TxQueryResponse, Error> {
×
98
        let db = self.db();
×
99

100
        match db.fetch_kernel_by_excess_sig(signature.clone()).await? {
×
101
            None => (),
×
102
            Some((_, block_hash)) => match db.fetch_header_by_block_hash(block_hash).await? {
×
103
                None => (),
×
104
                Some(header) => {
×
105
                    let response = TxQueryResponse {
×
106
                        location: TxLocation::Mined,
×
107
                        mined_header_hash: Some(block_hash.to_vec()),
×
108
                        mined_height: Some(header.height),
×
109
                        mined_timestamp: Some(header.timestamp.as_u64()),
×
110
                    };
×
111
                    return Ok(response);
×
112
                },
113
            },
114
        };
115

116
        // If not in a block then check the mempool
117
        let mut mempool = self.mempool();
×
118
        let mempool_response = match mempool.get_tx_state_by_excess_sig(signature.clone()).await? {
×
119
            TxStorageResponse::UnconfirmedPool => TxQueryResponse {
×
120
                location: TxLocation::InMempool,
×
121
                mined_header_hash: None,
×
122
                mined_height: None,
×
123
                mined_timestamp: None,
×
124
            },
×
125
            TxStorageResponse::ReorgPool |
126
            TxStorageResponse::NotStoredOrphan |
127
            TxStorageResponse::NotStoredTimeLocked |
128
            TxStorageResponse::NotStoredAlreadySpent |
129
            TxStorageResponse::NotStoredConsensus |
130
            TxStorageResponse::NotStored |
131
            TxStorageResponse::NotStoredFeeTooLow |
132
            TxStorageResponse::NotStoredAlreadyMined => TxQueryResponse {
×
133
                location: TxLocation::NotStored,
×
134
                mined_timestamp: None,
×
135
                mined_height: None,
×
136
                mined_header_hash: None,
×
137
            },
×
138
        };
139

140
        Ok(mempool_response)
×
141
    }
×
142

143
    async fn fetch_utxos_by_block(&self, request: GetUtxosByBlockRequest) -> Result<GetUtxosByBlockResponse, Error> {
×
144
        request.validate()?;
×
145

146
        let hash = request.header_hash.clone().try_into()?;
×
147

148
        let header = self
×
149
            .db()
×
150
            .fetch_header_by_block_hash(hash)
×
151
            .await?
×
152
            .ok_or_else(|| Error::HeaderHashNotFound)?;
×
153

154
        // fetch utxos
155
        let outputs_with_statuses = self.db.fetch_outputs_in_block_with_spend_state(hash, None).await?;
×
156

157
        let outputs = outputs_with_statuses
×
158
            .into_iter()
×
159
            .map(|(output, _spent)| output)
×
160
            .collect::<Vec<TransactionOutput>>();
×
161

162
        // if its empty, we need to send an empty vec of outputs.
163
        let utxo_block_response = GetUtxosByBlockResponse {
×
164
            outputs,
×
165
            height: header.height,
×
166
            header_hash: hash.to_vec(),
×
167
            mined_timestamp: header.timestamp.as_u64(),
×
168
        };
×
169

170
        Ok(utxo_block_response)
×
171
    }
×
172

173
    #[allow(clippy::too_many_lines)]
174
    async fn fetch_utxos(&self, request: SyncUtxosByBlockRequest) -> Result<SyncUtxosByBlockResponse, Error> {
×
175
        // validate and fetch inputs
176
        request.validate()?;
×
177

178
        let hash = request.start_header_hash.clone().try_into()?;
×
179

180
        let start_header = self
×
181
            .db()
×
182
            .fetch_header_by_block_hash(hash)
×
183
            .await?
×
184
            .ok_or_else(|| Error::StartHeaderHashNotFound)?;
×
185

186
        let tip_header = self.db.fetch_tip_header().await?;
×
187
        // we only allow wallets to ask for a max of 100 blocks at a time and we want to cache the queries to ensure
188
        // they are in batch of 100 and we want to ensure they request goes to the nearest 100 block height so
189
        // we can cache all wallet's queries
190
        let increase = ((start_header.height + 100) / 100) * 100;
×
191
        let end_height = cmp::min(tip_header.header().height, increase);
×
192

193
        // pagination
194
        let start_header_height = start_header.height + (request.page * request.limit);
×
195
        let start_header = self
×
196
            .db
×
197
            .fetch_header(start_header_height)
×
198
            .await?
×
199
            .ok_or_else(|| Error::HeaderNotFound {
×
200
                height: start_header_height,
×
201
            })?;
×
202

203
        if start_header.height > tip_header.header().height {
×
204
            return Err(Error::HeaderHeightMismatch {
×
205
                start_height: start_header.height,
×
206
                end_height: tip_header.header().height,
×
207
            });
×
208
        }
×
209

210
        // fetch utxos
211
        let mut utxos = vec![];
×
212
        let mut current_header = start_header;
×
213
        let mut fetched_utxos = 0;
×
214
        let next_header_to_request;
215
        loop {
216
            let current_header_hash = current_header.hash();
×
217

218
            trace!(
×
219
                target: LOG_TARGET,
×
220
                "current header = {} ({})",
×
221
                current_header.height,
222
                current_header_hash.to_hex()
×
223
            );
224

225
            let outputs_with_statuses = self
×
226
                .db
×
227
                .fetch_outputs_in_block_with_spend_state(current_header.hash(), None)
×
228
                .await?;
×
229
            let mut inputs = self
×
230
                .db
×
231
                .fetch_inputs_in_block(current_header.hash())
×
232
                .await?
×
233
                .into_iter()
×
234
                .map(|input| input.output_hash().to_vec())
×
235
                .collect::<Vec<Vec<u8>>>();
×
236

237
            let outputs = outputs_with_statuses
×
238
                .into_iter()
×
239
                .map(|(output, _spent)| output)
×
240
                .collect::<Vec<TransactionOutput>>();
×
241

242
            for output_chunk in outputs.chunks(2000) {
×
243
                let inputs_to_send = if inputs.is_empty() {
×
244
                    Vec::new()
×
245
                } else {
246
                    let num_to_drain = inputs.len().min(2000);
×
247
                    inputs.drain(..num_to_drain).collect()
×
248
                };
249

250
                let output_block_response = BlockUtxoInfo {
×
251
                    outputs: output_chunk
×
252
                        .iter()
×
253
                        .map(|output| MinimalUtxoSyncInfo {
×
254
                            output_hash: output.hash().to_vec(),
×
255
                            commitment: output.commitment().to_vec(),
×
256
                            encrypted_data: output.encrypted_data().as_bytes().to_vec(),
×
257
                            sender_offset_public_key: output.sender_offset_public_key.to_vec(),
×
258
                        })
×
259
                        .collect(),
×
260
                    inputs: inputs_to_send,
×
261
                    height: current_header.height,
×
262
                    header_hash: current_header_hash.to_vec(),
×
263
                    mined_timestamp: current_header.timestamp.as_u64(),
×
264
                };
265
                utxos.push(output_block_response);
×
266
            }
267
            // We might still have inputs left to send if they are more than the outputs
268
            for input_chunk in inputs.chunks(2000) {
×
269
                let output_block_response = BlockUtxoInfo {
×
270
                    outputs: Vec::new(),
×
271
                    inputs: input_chunk.to_vec(),
×
272
                    height: current_header.height,
×
273
                    header_hash: current_header_hash.to_vec(),
×
274
                    mined_timestamp: current_header.timestamp.as_u64(),
×
275
                };
×
276
                utxos.push(output_block_response);
×
277
            }
×
278

279
            fetched_utxos += 1;
×
280

281
            if current_header.height >= tip_header.header().height {
×
282
                next_header_to_request = vec![];
×
283
                break;
×
284
            }
×
285
            if fetched_utxos >= request.limit {
×
286
                next_header_to_request = current_header.hash().to_vec();
×
287
                break;
×
288
            }
×
289

290
            current_header =
×
291
                self.db
×
292
                    .fetch_header(current_header.height + 1)
×
293
                    .await?
×
294
                    .ok_or_else(|| Error::HeaderNotFound {
×
295
                        height: current_header.height + 1,
×
296
                    })?;
×
297
            if current_header.height == end_height {
×
298
                next_header_to_request = current_header.hash().to_vec();
×
299
                break; // Stop if we reach the end height}
×
300
            }
×
301
        }
302

303
        let has_next_page = (end_height.saturating_sub(current_header.height)) > 0;
×
304

305
        Ok(SyncUtxosByBlockResponse {
×
306
            blocks: utxos,
×
307
            has_next_page,
×
308
            next_header_to_scan: next_header_to_request,
×
309
        })
×
310
    }
×
311
}
312

313
#[async_trait::async_trait]
314
impl<B: BlockchainBackend + 'static> BaseNodeWalletQueryService for Service<B> {
315
    type Error = Error;
316

317
    async fn get_tip_info(&self) -> Result<TipInfoResponse, Self::Error> {
×
318
        let state_machine = self.state_machine();
×
319
        let status_watch = state_machine.get_status_info_watch();
×
320
        let is_synced = match status_watch.borrow().state_info {
×
321
            StateInfo::Listening(li) => li.is_synced(),
×
322
            _ => false,
×
323
        };
324

325
        let metadata = self.db.get_chain_metadata().await?;
×
326

327
        Ok(TipInfoResponse {
×
328
            metadata: Some(metadata),
×
329
            is_synced,
×
330
        })
×
331
    }
×
332

333
    async fn get_header_by_height(&self, height: u64) -> Result<models::BlockHeader, Self::Error> {
×
334
        let result = self
×
335
            .db
×
336
            .fetch_header(height)
×
337
            .await?
×
338
            .ok_or(Error::HeaderNotFound { height })?
×
339
            .into();
×
340
        Ok(result)
×
341
    }
×
342

343
    async fn get_height_at_time(&self, epoch_time: u64) -> Result<u64, Self::Error> {
×
344
        trace!(target: LOG_TARGET, "requested_epoch_time: {}", epoch_time);
×
345
        let tip_header = self.db.fetch_tip_header().await?;
×
346

347
        let mut left_height = 0u64;
×
348
        let mut right_height = tip_header.height();
×
349

350
        while left_height <= right_height {
×
351
            let mut mid_height = (left_height + right_height) / 2;
×
352

353
            if mid_height == 0 {
×
354
                return Ok(0u64);
×
355
            }
×
356
            // If the two bounds are adjacent then perform the test between the right and left sides
357
            if left_height == mid_height {
×
358
                mid_height = right_height;
×
359
            }
×
360

361
            let mid_header = self
×
362
                .db
×
363
                .fetch_header(mid_height)
×
364
                .await?
×
365
                .ok_or_else(|| Error::HeaderNotFound { height: mid_height })?;
×
366
            let before_mid_header = self
×
367
                .db
×
368
                .fetch_header(mid_height - 1)
×
369
                .await?
×
370
                .ok_or_else(|| Error::HeaderNotFound { height: mid_height - 1 })?;
×
371
            trace!(
×
372
                target: LOG_TARGET,
×
373
                "requested_epoch_time: {}, left: {}, mid: {}/{} ({}/{}), right: {}",
×
374
                epoch_time,
375
                left_height,
376
                mid_height,
377
                mid_height-1,
×
378
                mid_header.timestamp.as_u64(),
×
379
                before_mid_header.timestamp.as_u64(),
×
380
                right_height
381
            );
382
            if epoch_time < mid_header.timestamp.as_u64() && epoch_time >= before_mid_header.timestamp.as_u64() {
×
383
                trace!(
×
384
                    target: LOG_TARGET,
×
385
                    "requested_epoch_time: {}, selected height: {}",
×
386
                    epoch_time, before_mid_header.height
387
                );
388
                return Ok(before_mid_header.height);
×
389
            } else if mid_height == right_height {
×
390
                trace!(
×
391
                    target: LOG_TARGET,
×
392
                    "requested_epoch_time: {epoch_time}, selected height: {right_height}"
×
393
                );
394
                return Ok(right_height);
×
395
            } else if epoch_time <= mid_header.timestamp.as_u64() {
×
396
                right_height = mid_height;
×
397
            } else {
×
398
                left_height = mid_height;
×
399
            }
×
400
        }
401

402
        Ok(0u64)
×
403
    }
×
404

405
    async fn transaction_query(
406
        &self,
407
        signature: crate::base_node::rpc::models::Signature,
408
    ) -> Result<TxQueryResponse, Self::Error> {
×
409
        let signature = signature.try_into().map_err(Error::SignatureConversion)?;
×
410

411
        let response = self.fetch_kernel(signature).await?;
×
412

413
        Ok(response)
×
414
    }
×
415

416
    async fn sync_utxos_by_block(
417
        &self,
418
        request: SyncUtxosByBlockRequest,
419
    ) -> Result<SyncUtxosByBlockResponse, Self::Error> {
×
420
        self.fetch_utxos(request).await
×
421
    }
×
422

423
    async fn get_utxos_by_block(
424
        &self,
425
        request: GetUtxosByBlockRequest,
426
    ) -> Result<GetUtxosByBlockResponse, Self::Error> {
×
427
        self.fetch_utxos_by_block(request).await
×
428
    }
×
429

430
    async fn get_utxos_mined_info(
431
        &self,
432
        request: models::GetUtxosMinedInfoRequest,
433
    ) -> Result<models::GetUtxosMinedInfoResponse, Self::Error> {
×
434
        request.validate()?;
×
435

436
        let mut utxos = vec![];
×
437

438
        let tip_header = self.db().fetch_tip_header().await?;
×
439
        for hash in request.hashes {
×
440
            let hash = hash.try_into()?;
×
441
            let output = self.db().fetch_output(hash).await?;
×
442
            if let Some(output) = output {
×
443
                utxos.push(models::MinedUtxoInfo {
×
444
                    utxo_hash: hash.to_vec(),
×
445
                    mined_in_hash: output.header_hash.to_vec(),
×
446
                    mined_in_height: output.mined_height,
×
447
                    mined_in_timestamp: output.mined_timestamp,
×
448
                });
×
449
            }
×
450
        }
451

452
        Ok(models::GetUtxosMinedInfoResponse {
×
453
            utxos,
×
454
            best_block_hash: tip_header.hash().to_vec(),
×
455
            best_block_height: tip_header.height(),
×
456
        })
×
457
    }
×
458

459
    async fn get_utxos_deleted_info(
460
        &self,
461
        request: models::GetUtxosDeletedInfoRequest,
462
    ) -> Result<models::GetUtxosDeletedInfoResponse, Self::Error> {
×
463
        request.validate()?;
×
464

465
        let mut utxos = vec![];
×
466

467
        let must_include_header = request.must_include_header.clone().try_into()?;
×
468
        if self
×
469
            .db()
×
470
            .fetch_header_by_block_hash(must_include_header)
×
471
            .await?
×
472
            .is_none()
×
473
        {
474
            return Err(Error::HeaderHashNotFound);
×
475
        }
×
476

477
        let tip_header = self.db().fetch_tip_header().await?;
×
478
        for hash in request.hashes {
×
479
            let hash = hash.try_into()?;
×
480
            let output = self.db().fetch_output(hash).await?;
×
481

482
            if let Some(output) = output {
×
483
                // is it still unspent?
484
                let input = self.db().fetch_input(hash).await?;
×
485
                if let Some(i) = input {
×
486
                    utxos.push(models::DeletedUtxoInfo {
×
487
                        utxo_hash: hash.to_vec(),
×
488
                        found_in_header: Some((output.mined_height, output.header_hash.to_vec())),
×
489
                        spent_in_header: Some((i.spent_height, i.header_hash.to_vec())),
×
490
                    });
×
491
                } else {
×
492
                    utxos.push(models::DeletedUtxoInfo {
×
493
                        utxo_hash: hash.to_vec(),
×
494
                        found_in_header: Some((output.mined_height, output.header_hash.to_vec())),
×
495
                        spent_in_header: None,
×
496
                    });
×
497
                }
×
498
            } else {
×
499
                utxos.push(models::DeletedUtxoInfo {
×
500
                    utxo_hash: hash.to_vec(),
×
501
                    found_in_header: None,
×
502
                    spent_in_header: None,
×
503
                });
×
504
            }
×
505
        }
506

507
        Ok(models::GetUtxosDeletedInfoResponse {
×
508
            utxos,
×
509
            best_block_hash: tip_header.hash().to_vec(),
×
510
            best_block_height: tip_header.height(),
×
511
        })
×
512
    }
×
513

514
    async fn generate_kernel_merkle_proof(
515
        &self,
516
        excess_sig: types::CompressedSignature,
517
    ) -> Result<GenerateKernelMerkleProofResponse, Self::Error> {
×
518
        let proof = self.db().generate_kernel_merkle_proof(excess_sig).await?;
×
519
        Ok(GenerateKernelMerkleProofResponse {
520
            encoded_merkle_proof: bincode::serialize(&proof.merkle_proof).map_err(Error::general)?,
×
521
            block_hash: proof.block_hash,
×
522
            leaf_index: proof.leaf_index.value() as u64,
×
523
        })
524
    }
×
525
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc