• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 22761905253

06 Mar 2026 11:40AM UTC coverage: 62.01% (+0.05%) from 61.96%
22761905253

push

github

web-flow
chore: upgrade core to 2024 (#7693)

Description
---
Upgrades tari core to rust editition 2024

560 of 915 new or added lines in 96 files covered. (61.2%)

26 existing lines in 15 files now uncovered.

71928 of 115994 relevant lines covered (62.01%)

225062.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

15.53
/base_layer/core/src/base_node/sync/rpc/sync_utxos_task.rs
1
// Copyright 2021. The Tari Project
2
//
3
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
// following conditions are met:
5
//
6
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
// disclaimer.
8
//
9
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
// following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
// products derived from this software without specific prior written permission.
14
//
15
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
use std::{
24
    convert::{TryFrom, TryInto},
25
    sync::Arc,
26
    time::Instant,
27
};
28

29
use log::*;
30
use tari_comms::{
31
    peer_manager::NodeId,
32
    protocol::rpc::{Request, RpcStatus, RpcStatusResultExt},
33
    utils,
34
};
35
use tari_node_components::blocks::BlockHeader;
36
use tari_utilities::{ByteArray, hex::Hex};
37
use tokio::{sync::mpsc, task};
38

39
#[cfg(feature = "metrics")]
40
use crate::base_node::metrics;
41
use crate::{
42
    chain_storage::{BlockchainBackend, async_db::AsyncBlockchainDb},
43
    proto,
44
    proto::base_node::{SyncUtxosRequest, SyncUtxosResponse, sync_utxos_response::Txo},
45
};
46

47
const LOG_TARGET: &str = "c::base_node::sync_rpc::sync_utxo_task";
48

49
pub(crate) struct SyncUtxosTask<B> {
50
    db: AsyncBlockchainDb<B>,
51
    peer_node_id: Arc<NodeId>,
52
}
53

54
impl<B> SyncUtxosTask<B>
55
where B: BlockchainBackend + 'static
56
{
57
    pub(crate) fn new(db: AsyncBlockchainDb<B>, peer_node_id: Arc<NodeId>) -> Self {
2✔
58
        Self { db, peer_node_id }
2✔
59
    }
2✔
60

61
    pub(crate) async fn run(
2✔
62
        self,
2✔
63
        request: Request<SyncUtxosRequest>,
2✔
64
        mut tx: mpsc::Sender<Result<SyncUtxosResponse, RpcStatus>>,
2✔
65
    ) -> Result<(), RpcStatus> {
2✔
66
        let msg = request.into_message();
2✔
67
        let start_hash = msg
2✔
68
            .start_header_hash
2✔
69
            .clone()
2✔
70
            .try_into()
2✔
71
            .rpc_status_bad_request("Invalid header hash")?;
2✔
72

73
        let start_header = self
2✔
74
            .db
2✔
75
            .fetch_header_by_block_hash(start_hash)
2✔
76
            .await
2✔
77
            .rpc_status_internal_error(LOG_TARGET)?
2✔
78
            .ok_or_else(|| RpcStatus::not_found("Start header hash was not found"))?;
2✔
79

80
        let end_hash = msg
1✔
81
            .end_header_hash
1✔
82
            .clone()
1✔
83
            .try_into()
1✔
84
            .rpc_status_bad_request("Invalid header hash")?;
1✔
85

86
        let end_header = self
1✔
87
            .db
1✔
88
            .fetch_header_by_block_hash(end_hash)
1✔
89
            .await
1✔
90
            .rpc_status_internal_error(LOG_TARGET)?
1✔
91
            .ok_or_else(|| RpcStatus::not_found("End header hash was not found"))?;
1✔
92
        if start_header.height > end_header.height {
×
93
            return Err(RpcStatus::bad_request(&format!(
×
94
                "Start header height({}) cannot be greater than the end header height({})",
×
95
                start_header.height, end_header.height
×
96
            )));
×
97
        }
×
98

99
        task::spawn(async move {
×
100
            debug!(
×
101
                target: LOG_TARGET,
×
102
                "Starting UTXO stream for peer '{}'", self.peer_node_id
103
            );
104
            if let Err(err) = self.start_streaming(&mut tx, start_header, end_header).await {
×
105
                debug!(
×
106
                    target: LOG_TARGET,
×
107
                    "UTXO stream errored for peer '{}': {}", self.peer_node_id, err
108
                );
109
                let _result = tx.send(Err(err)).await;
×
110
            }
×
111
            debug!(
×
112
                target: LOG_TARGET,
×
113
                "UTXO stream completed for peer '{}'", self.peer_node_id
114
            );
115
            #[cfg(feature = "metrics")]
116
            metrics::active_sync_peers().dec();
×
117
        });
×
118

119
        Ok(())
×
120
    }
2✔
121

122
    #[allow(clippy::too_many_lines)]
123
    async fn start_streaming(
×
124
        &self,
×
125
        tx: &mut mpsc::Sender<Result<SyncUtxosResponse, RpcStatus>>,
×
126
        mut current_header: BlockHeader,
×
127
        end_header: BlockHeader,
×
128
    ) -> Result<(), RpcStatus> {
×
129
        debug!(
×
130
            target: LOG_TARGET,
×
131
            "Starting stream task with current_header: {}, end_header: {}",
132
            current_header.hash().to_hex(),
×
133
            end_header.hash().to_hex(),
×
134
        );
135

136
        // If this is a pruned node and outputs have been requested for an initial sync, we need to discover and send
137
        // the outputs from the genesis block that have been pruned as well
138
        let mut pruned_genesis_block_outputs = Vec::new();
×
139
        let metadata = self
×
140
            .db
×
141
            .get_chain_metadata()
×
142
            .await
×
143
            .rpc_status_internal_error(LOG_TARGET)?;
×
144
        if current_header.height == 1 && metadata.is_pruned_node() {
×
145
            let genesis_block = self.db.fetch_genesis_block();
×
146
            for output in genesis_block.block().body.outputs() {
×
147
                let output_hash = output.hash();
×
148
                if self
×
149
                    .db
×
150
                    .fetch_output(output_hash)
×
151
                    .await
×
152
                    .rpc_status_internal_error(LOG_TARGET)?
×
153
                    .is_none()
×
154
                {
155
                    trace!(
×
156
                        target: LOG_TARGET,
×
157
                        "Spent genesis TXO (commitment '{}') to peer",
158
                        output.commitment.to_hex()
×
159
                    );
160
                    pruned_genesis_block_outputs.push(Ok(SyncUtxosResponse {
×
161
                        txo: Some(Txo::Commitment(output.commitment.as_bytes().to_vec())),
×
162
                        mined_header: current_header.hash().to_vec(),
×
163
                    }));
×
164
                }
×
165
            }
166
        }
×
167

168
        let start_header = current_header.clone();
×
169
        loop {
170
            let timer = Instant::now();
×
171
            let current_header_hash = current_header.hash();
×
172
            debug!(
×
173
                target: LOG_TARGET,
×
174
                "Streaming TXO(s) for block #{} ({})",
175
                current_header.height,
176
                current_header_hash.to_hex()
×
177
            );
178
            if tx.is_closed() {
×
179
                debug!(target: LOG_TARGET, "Peer '{}' exited TXO sync session early", self.peer_node_id);
×
180
                break;
×
181
            }
×
182

183
            let outputs_with_statuses = self
×
184
                .db
×
185
                .fetch_outputs_in_block_with_spend_state(current_header_hash, Some(end_header.hash()))
×
186
                .await
×
187
                .rpc_status_internal_error(LOG_TARGET)?;
×
188
            if tx.is_closed() {
×
189
                debug!(target: LOG_TARGET, "Peer '{}' exited TXO sync session early", self.peer_node_id);
×
190
                break;
×
191
            }
×
192

193
            let mut outputs = Vec::with_capacity(outputs_with_statuses.len());
×
194
            for (output, spent) in outputs_with_statuses {
×
195
                if output.is_burned() {
×
196
                    continue;
×
197
                }
×
198
                if !spent {
×
199
                    match proto::types::TransactionOutput::try_from(output.clone()) {
×
200
                        Ok(tx_ouput) => {
×
201
                            trace!(
×
202
                                target: LOG_TARGET,
×
203
                                "Unspent TXO (commitment '{}') to peer",
204
                                output.commitment.to_hex()
×
205
                            );
206
                            outputs.push(Ok(SyncUtxosResponse {
×
207
                                txo: Some(Txo::Output(tx_ouput)),
×
208
                                mined_header: current_header_hash.to_vec(),
×
209
                            }));
×
210
                        },
211
                        Err(e) => {
×
212
                            return Err(RpcStatus::general(&format!(
×
213
                                "Output '{}' RPC conversion error ({})",
×
214
                                output.hash().to_hex(),
×
215
                                e
×
NEW
216
                            )));
×
217
                        },
218
                    }
219
                }
×
220
            }
221
            debug!(
×
222
                target: LOG_TARGET,
×
223
                "Adding {} outputs in response for block #{} '{}'", outputs.len(),
×
224
                current_header.height,
225
                current_header_hash
226
            );
227

228
            let inputs_in_block = self
×
229
                .db
×
230
                .fetch_inputs_in_block(current_header_hash)
×
231
                .await
×
232
                .rpc_status_internal_error(LOG_TARGET)?;
×
233
            if tx.is_closed() {
×
234
                debug!(target: LOG_TARGET, "Peer '{}' exited TXO sync session early", self.peer_node_id);
×
235
                break;
×
236
            }
×
237

238
            let mut inputs = Vec::with_capacity(inputs_in_block.len());
×
239
            for input in inputs_in_block {
×
240
                let output_from_current_tranche = if let Some(mined_info) = self
×
241
                    .db
×
242
                    .fetch_output(input.output_hash())
×
243
                    .await
×
244
                    .rpc_status_internal_error(LOG_TARGET)?
×
245
                {
246
                    mined_info.mined_height >= start_header.height
×
247
                } else {
248
                    false
×
249
                };
250

251
                if output_from_current_tranche {
×
252
                    trace!(target: LOG_TARGET, "Spent TXO (hash '{}') not sent to peer", input.output_hash().to_hex());
×
253
                } else {
254
                    let input_commitment = match self.db.fetch_output(input.output_hash()).await {
×
255
                        Ok(Some(o)) => o.output.commitment,
×
256
                        Ok(None) => {
257
                            return Err(RpcStatus::general(&format!(
×
258
                                "Mined info for input '{}' not found",
×
259
                                input.output_hash().to_hex()
×
NEW
260
                            )));
×
261
                        },
262
                        Err(e) => {
×
263
                            return Err(RpcStatus::general(&format!(
×
264
                                "Input '{}' not found ({})",
×
265
                                input.output_hash().to_hex(),
×
266
                                e
×
NEW
267
                            )));
×
268
                        },
269
                    };
270
                    trace!(target: LOG_TARGET, "Spent TXO (commitment '{}') to peer", input_commitment.to_hex());
×
271
                    inputs.push(Ok(SyncUtxosResponse {
×
272
                        txo: Some(Txo::Commitment(input_commitment.as_bytes().to_vec())),
×
273
                        mined_header: current_header_hash.to_vec(),
×
274
                    }));
×
275
                }
276
            }
277
            debug!(
×
278
                target: LOG_TARGET,
×
279
                "Adding {} inputs in response for block #{} '{}'", inputs.len(),
×
280
                current_header.height,
281
                current_header_hash
282
            );
283

284
            let mut txos = Vec::with_capacity(outputs.len() + inputs.len());
×
285
            txos.append(&mut outputs);
×
286
            txos.append(&mut inputs);
×
287
            if start_header == current_header {
×
288
                debug!(
×
289
                    target: LOG_TARGET,
×
290
                    "Adding {} genesis block pruned inputs in response for block #{} '{}'", pruned_genesis_block_outputs.len(),
×
291
                    current_header.height,
292
                    current_header_hash
293
                );
294
                txos.append(&mut pruned_genesis_block_outputs);
×
295
            }
×
296
            let txos = txos.into_iter();
×
297

298
            // Ensure task stops if the peer prematurely stops their RPC session
299
            let txos_len = txos.len();
×
300
            if utils::mpsc::send_all(tx, txos).await.is_err() {
×
301
                break;
×
302
            }
×
303

304
            debug!(
×
305
                target: LOG_TARGET,
×
306
                "Streamed {} TXOs in {:.2?} (including stream backpressure)",
307
                txos_len,
308
                timer.elapsed()
×
309
            );
310

311
            if current_header.height + 1 > end_header.height {
×
312
                break;
×
313
            }
×
314

315
            current_header = self
×
316
                .db
×
317
                .fetch_header(current_header.height + 1)
×
318
                .await
×
319
                .rpc_status_internal_error(LOG_TARGET)?
×
320
                .ok_or_else(|| {
×
321
                    RpcStatus::general(&format!(
×
322
                        "Potential data consistency issue: header {} not found",
×
323
                        current_header.height + 1
×
324
                    ))
×
325
                })?;
×
326
        }
327

328
        debug!(
×
329
            target: LOG_TARGET,
×
330
            "TXO sync completed to Header hash = {}",
331
            current_header.hash().to_hex()
×
332
        );
333

334
        Ok(())
×
335
    }
×
336
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc