• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 16123384529

07 Jul 2025 05:11PM UTC coverage: 64.327% (-7.6%) from 71.89%
16123384529

push

github

web-flow
chore: new release v4.9.0-pre.0 (#7289)

Description
---
new release esmeralda

77151 of 119935 relevant lines covered (64.33%)

227108.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/base_layer/core/src/base_node/rpc/sync_utxos_by_block_task.rs
1
// Copyright 2025 The Tari Project
2
// SPDX-License-Identifier: BSD-3-Clause
3

4
use std::{convert::TryInto, time::Instant};
5

6
use log::*;
7
use tari_comms::protocol::rpc::{RpcStatus, RpcStatusResultExt};
8
use tari_utilities::hex::Hex;
9
use tokio::{sync::mpsc, task};
10

11
use crate::{
12
    blocks::BlockHeader,
13
    chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend},
14
    proto,
15
    proto::base_node::{SyncUtxosByBlockRequest, SyncUtxosByBlockResponse},
16
};
17

18
const LOG_TARGET: &str = "c::base_node::sync_rpc::sync_utxo_by_block_task";
19

20
pub(crate) struct SyncUtxosByBlockTask<B> {
21
    db: AsyncBlockchainDb<B>,
22
}
23

24
impl<B> SyncUtxosByBlockTask<B>
25
where B: BlockchainBackend + 'static
26
{
27
    pub(crate) fn new(db: AsyncBlockchainDb<B>) -> Self {
×
28
        Self { db }
×
29
    }
×
30

31
    pub(crate) async fn run(
×
32
        self,
×
33
        request: SyncUtxosByBlockRequest,
×
34
        mut tx: mpsc::Sender<Result<SyncUtxosByBlockResponse, RpcStatus>>,
×
35
    ) -> Result<(), RpcStatus> {
×
36
        let hash = request
×
37
            .start_header_hash
×
38
            .clone()
×
39
            .try_into()
×
40
            .rpc_status_internal_error(LOG_TARGET)?;
×
41
        let start_header = self
×
42
            .db
×
43
            .fetch_header_by_block_hash(hash)
×
44
            .await
×
45
            .rpc_status_internal_error(LOG_TARGET)?
×
46
            .ok_or_else(|| RpcStatus::not_found("Start header hash is was not found"))?;
×
47
        let hash = request
×
48
            .end_header_hash
×
49
            .clone()
×
50
            .try_into()
×
51
            .rpc_status_internal_error(LOG_TARGET)?;
×
52
        let end_header = self
×
53
            .db
×
54
            .fetch_header_by_block_hash(hash)
×
55
            .await
×
56
            .rpc_status_internal_error(LOG_TARGET)?
×
57
            .ok_or_else(|| RpcStatus::not_found("End header hash is was not found"))?;
×
58

59
        if start_header.height > end_header.height {
×
60
            return Err(RpcStatus::bad_request(&format!(
×
61
                "start header height {} cannot be greater than the end header height ({})",
×
62
                start_header.height, end_header.height
×
63
            )));
×
64
        }
×
65

×
66
        task::spawn(async move {
×
67
            if let Err(err) = self.start_streaming(&mut tx, start_header, end_header).await {
×
68
                let _result = tx.send(Err(err)).await;
×
69
            }
×
70
        });
×
71

×
72
        Ok(())
×
73
    }
×
74

75
    #[allow(clippy::too_many_lines)]
76
    async fn start_streaming(
×
77
        &self,
×
78
        tx: &mut mpsc::Sender<Result<SyncUtxosByBlockResponse, RpcStatus>>,
×
79
        start_header: BlockHeader,
×
80
        end_header: BlockHeader,
×
81
    ) -> Result<(), RpcStatus> {
×
82
        trace!(
×
83
            target: LOG_TARGET,
×
84
            "Starting stream task with start_header: {} and end_header: {}",
×
85
            start_header.hash().to_hex(),
×
86
            end_header.hash().to_hex(),
×
87
        );
88

89
        let mut current_header = start_header;
×
90
        loop {
91
            let timer = Instant::now();
×
92
            let current_header_hash = current_header.hash();
×
93

×
94
            trace!(
×
95
                target: LOG_TARGET,
×
96
                "current header = {} ({})",
×
97
                current_header.height,
×
98
                current_header_hash.to_hex()
×
99
            );
100

101
            if tx.is_closed() {
×
102
                debug!(target: LOG_TARGET, "Exiting sync_utxos early because client has gone",);
×
103
                break;
×
104
            }
×
105

106
            let outputs_with_statuses = self
×
107
                .db
×
108
                .fetch_outputs_in_block_with_spend_state(current_header.hash(), None)
×
109
                .await
×
110
                .rpc_status_internal_error(LOG_TARGET)?;
×
111
            let outputs = outputs_with_statuses
×
112
                .into_iter()
×
113
                .map(|(output, _spent)| output.try_into())
×
114
                .collect::<Result<Vec<proto::types::TransactionOutput>, String>>()
×
115
                .map_err(|err| RpcStatus::general(&err))?;
×
116

117
            trace!(
×
118
                target: LOG_TARGET,
×
119
                "Streaming {} UTXO(s) for block #{} (Hash: {})",
×
120
                outputs.len(),
×
121
                current_header.height,
×
122
                current_header_hash.to_hex(),
×
123
            );
124

125
            for output_chunk in outputs.chunks(2000) {
×
126
                let output_block_response = SyncUtxosByBlockResponse {
×
127
                    outputs: output_chunk.to_vec(),
×
128
                    height: current_header.height,
×
129
                    header_hash: current_header_hash.to_vec(),
×
130
                    mined_timestamp: current_header.timestamp.as_u64(),
×
131
                };
×
132
                // Ensure task stops if the peer prematurely stops their RPC session
×
133
                if tx.send(Ok(output_block_response)).await.is_err() {
×
134
                    break;
×
135
                }
×
136
            }
137
            if outputs.is_empty() {
×
138
                // if its empty, we need to send an empty vec of outputs.
139
                let utxo_block_response = SyncUtxosByBlockResponse {
×
140
                    outputs: Vec::new(),
×
141
                    height: current_header.height,
×
142
                    header_hash: current_header_hash.to_vec(),
×
143
                    mined_timestamp: current_header.timestamp.as_u64(),
×
144
                };
×
145
                // Ensure task stops if the peer prematurely stops their RPC session
×
146
                if tx.send(Ok(utxo_block_response)).await.is_err() {
×
147
                    break;
×
148
                }
×
149
            }
×
150

151
            trace!(
×
152
                target: LOG_TARGET,
×
153
                "Streamed utxos in {:.2?} (including stream backpressure)",
×
154
                timer.elapsed()
×
155
            );
156

157
            if current_header.height >= end_header.height {
×
158
                break;
×
159
            }
×
160

161
            current_header = self
×
162
                .db
×
163
                .fetch_header(current_header.height + 1)
×
164
                .await
×
165
                .rpc_status_internal_error(LOG_TARGET)?
×
166
                .ok_or_else(|| {
×
167
                    RpcStatus::general(&format!(
×
168
                        "Potential data consistency issue: header {} not found",
×
169
                        current_header.height + 1
×
170
                    ))
×
171
                })?;
×
172
        }
173

174
        trace!(
×
175
            target: LOG_TARGET,
×
176
            "UTXO sync by block completed to UTXO {} (Header hash = {})",
×
177
            current_header.output_smt_size,
×
178
            current_header.hash().to_hex()
×
179
        );
180

181
        Ok(())
×
182
    }
×
183
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc