• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 28244404192

26 Jun 2026 02:26PM UTC coverage: 60.886% (-0.9%) from 61.821%
28244404192

push

github

web-flow
feat: harden prune mode (#7902)

Description
---
Harden prune mode sync, it can fail and when the last block in the
tranch as 0 outputs.

0 of 18 new or added lines in 2 files covered. (0.0%)

1209 existing lines in 23 files now uncovered.

70905 of 116455 relevant lines covered (60.89%)

537864.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

14.48
/base_layer/core/src/base_node/sync/rpc/sync_utxos_task.rs
1
// Copyright 2021. The Tari Project
2
//
3
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
// following conditions are met:
5
//
6
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
// disclaimer.
8
//
9
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
// following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
// products derived from this software without specific prior written permission.
14
//
15
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
use std::{
24
    convert::{TryFrom, TryInto},
25
    sync::Arc,
26
    time::Instant,
27
};
28

29
use log::*;
30
use tari_comms::{
31
    peer_manager::NodeId,
32
    protocol::rpc::{Request, RpcStatus, RpcStatusResultExt},
33
    utils,
34
};
35
use tari_node_components::blocks::BlockHeader;
36
use tari_utilities::{ByteArray, hex::Hex};
37
use tokio::{sync::mpsc, task};
38

39
#[cfg(feature = "metrics")]
40
use crate::base_node::metrics;
41
use crate::{
42
    chain_storage::{BlockchainBackend, async_db::AsyncBlockchainDb},
43
    proto,
44
    proto::base_node::{SyncUtxosRequest, SyncUtxosResponse, sync_utxos_response::Txo},
45
};
46

47
const LOG_TARGET: &str = "c::base_node::sync_rpc::sync_utxo_task";
48

49
pub(crate) struct SyncUtxosTask<B> {
50
    db: AsyncBlockchainDb<B>,
51
    peer_node_id: Arc<NodeId>,
52
}
53

54
impl<B> SyncUtxosTask<B>
55
where B: BlockchainBackend + 'static
56
{
57
    pub(crate) fn new(db: AsyncBlockchainDb<B>, peer_node_id: Arc<NodeId>) -> Self {
1✔
58
        Self { db, peer_node_id }
1✔
59
    }
1✔
60

61
    pub(crate) async fn run(
1✔
62
        self,
1✔
63
        request: Request<SyncUtxosRequest>,
1✔
64
        mut tx: mpsc::Sender<Result<SyncUtxosResponse, RpcStatus>>,
1✔
65
    ) -> Result<(), RpcStatus> {
1✔
66
        let msg = request.into_message();
1✔
67
        let start_hash = msg
1✔
68
            .start_header_hash
1✔
69
            .clone()
1✔
70
            .try_into()
1✔
71
            .rpc_status_bad_request("Invalid header hash")?;
1✔
72

73
        let start_header = self
1✔
74
            .db
1✔
75
            .fetch_header_by_block_hash(start_hash)
1✔
76
            .await
1✔
77
            .rpc_status_internal_error(LOG_TARGET)?
1✔
78
            .ok_or_else(|| RpcStatus::not_found("Start header hash was not found"))?;
1✔
79

80
        let end_hash = msg
1✔
81
            .end_header_hash
1✔
82
            .clone()
1✔
83
            .try_into()
1✔
84
            .rpc_status_bad_request("Invalid header hash")?;
1✔
85

86
        let end_header = self
1✔
87
            .db
1✔
88
            .fetch_header_by_block_hash(end_hash)
1✔
89
            .await
1✔
90
            .rpc_status_internal_error(LOG_TARGET)?
1✔
91
            .ok_or_else(|| RpcStatus::not_found("End header hash was not found"))?;
1✔
92
        if start_header.height > end_header.height {
×
93
            return Err(RpcStatus::bad_request(&format!(
×
94
                "Start header height({}) cannot be greater than the end header height({})",
×
95
                start_header.height, end_header.height
×
96
            )));
×
97
        }
×
98

99
        task::spawn(async move {
×
100
            debug!(
×
101
                target: LOG_TARGET,
×
102
                "Starting UTXO stream for peer '{}'", self.peer_node_id
103
            );
104
            if let Err(err) = self.start_streaming(&mut tx, start_header, end_header).await {
×
105
                debug!(
×
106
                    target: LOG_TARGET,
×
107
                    "UTXO stream errored for peer '{}': {}", self.peer_node_id, err
108
                );
109
                let _result = tx.send(Err(err)).await;
×
110
            }
×
111
            debug!(
×
112
                target: LOG_TARGET,
×
113
                "UTXO stream completed for peer '{}'", self.peer_node_id
114
            );
115
            #[cfg(feature = "metrics")]
116
            metrics::active_sync_peers().dec();
×
117
        });
×
118

119
        Ok(())
×
120
    }
1✔
121

122
    #[allow(clippy::too_many_lines)]
123
    async fn start_streaming(
×
124
        &self,
×
125
        tx: &mut mpsc::Sender<Result<SyncUtxosResponse, RpcStatus>>,
×
126
        mut current_header: BlockHeader,
×
127
        end_header: BlockHeader,
×
128
    ) -> Result<(), RpcStatus> {
×
129
        debug!(
×
130
            target: LOG_TARGET,
×
131
            "Starting stream task with current_header: {}, end_header: {}",
132
            current_header.hash().to_hex(),
×
133
            end_header.hash().to_hex(),
×
134
        );
135

136
        // If this is a pruned node and outputs have been requested for an initial sync, we need to discover and send
137
        // the outputs from the genesis block that have been pruned as well
138
        let mut pruned_genesis_block_outputs = Vec::new();
×
139
        let metadata = self
×
140
            .db
×
141
            .get_chain_metadata()
×
142
            .await
×
143
            .rpc_status_internal_error(LOG_TARGET)?;
×
144
        if current_header.height == 1 && metadata.is_pruned_node() {
×
145
            let genesis_block = self.db.fetch_genesis_block();
×
146
            for output in genesis_block.block().body.outputs() {
×
147
                let output_hash = output.hash();
×
148
                if self
×
149
                    .db
×
150
                    .fetch_output(output_hash)
×
151
                    .await
×
152
                    .rpc_status_internal_error(LOG_TARGET)?
×
153
                    .is_none()
×
154
                {
155
                    trace!(
×
156
                        target: LOG_TARGET,
×
157
                        "Spent genesis TXO (commitment '{}') to peer",
158
                        output.commitment.to_hex()
×
159
                    );
160
                    pruned_genesis_block_outputs.push(Ok(SyncUtxosResponse {
×
161
                        txo: Some(Txo::Commitment(output.commitment.as_bytes().to_vec())),
×
162
                        mined_header: current_header.hash().to_vec(),
×
163
                    }));
×
164
                }
×
165
            }
166
        }
×
167

168
        let start_header = current_header.clone();
×
169
        // Tracks whether the stream walked all the way to `end_header`. Only on a natural completion
170
        // do we emit the terminator below; an early break (peer left, send failed) must not, so that
171
        // a truncated stream is not mistaken for a complete one by the consumer.
NEW
172
        let mut reached_end = false;
×
173
        loop {
174
            let timer = Instant::now();
×
175
            let current_header_hash = current_header.hash();
×
176
            debug!(
×
177
                target: LOG_TARGET,
×
178
                "Streaming TXO(s) for block #{} ({})",
179
                current_header.height,
180
                current_header_hash.to_hex()
×
181
            );
182
            if tx.is_closed() {
×
183
                debug!(target: LOG_TARGET, "Peer '{}' exited TXO sync session early", self.peer_node_id);
×
184
                break;
×
185
            }
×
186

187
            let outputs_with_statuses = self
×
188
                .db
×
189
                .fetch_outputs_in_block_with_spend_state(current_header_hash, Some(end_header.hash()))
×
190
                .await
×
191
                .rpc_status_internal_error(LOG_TARGET)?;
×
192
            if tx.is_closed() {
×
193
                debug!(target: LOG_TARGET, "Peer '{}' exited TXO sync session early", self.peer_node_id);
×
194
                break;
×
195
            }
×
196

197
            let mut outputs = Vec::with_capacity(outputs_with_statuses.len());
×
198
            for (output, spent) in outputs_with_statuses {
×
199
                if output.is_burned() {
×
200
                    continue;
×
201
                }
×
202
                if !spent {
×
203
                    match proto::types::TransactionOutput::try_from(output.clone()) {
×
204
                        Ok(tx_ouput) => {
×
205
                            trace!(
×
206
                                target: LOG_TARGET,
×
207
                                "Unspent TXO (commitment '{}') to peer",
208
                                output.commitment.to_hex()
×
209
                            );
210
                            outputs.push(Ok(SyncUtxosResponse {
×
211
                                txo: Some(Txo::Output(tx_ouput)),
×
212
                                mined_header: current_header_hash.to_vec(),
×
213
                            }));
×
214
                        },
215
                        Err(e) => {
×
216
                            return Err(RpcStatus::general(&format!(
×
217
                                "Output '{}' RPC conversion error ({})",
×
218
                                output.hash().to_hex(),
×
219
                                e
×
220
                            )));
×
221
                        },
222
                    }
223
                }
×
224
            }
225
            debug!(
×
226
                target: LOG_TARGET,
×
227
                "Adding {} outputs in response for block #{} '{}'", outputs.len(),
×
228
                current_header.height,
229
                current_header_hash
230
            );
231

232
            let inputs_in_block = self
×
233
                .db
×
234
                .fetch_inputs_in_block(current_header_hash)
×
235
                .await
×
236
                .rpc_status_internal_error(LOG_TARGET)?;
×
237
            if tx.is_closed() {
×
238
                debug!(target: LOG_TARGET, "Peer '{}' exited TXO sync session early", self.peer_node_id);
×
239
                break;
×
240
            }
×
241

242
            let mut inputs = Vec::with_capacity(inputs_in_block.len());
×
243
            for input in inputs_in_block {
×
244
                let output_from_current_tranche = if let Some(mined_info) = self
×
245
                    .db
×
246
                    .fetch_output(input.output_hash())
×
247
                    .await
×
248
                    .rpc_status_internal_error(LOG_TARGET)?
×
249
                {
250
                    mined_info.mined_height >= start_header.height
×
251
                } else {
252
                    false
×
253
                };
254

255
                if output_from_current_tranche {
×
256
                    trace!(target: LOG_TARGET, "Spent TXO (hash '{}') not sent to peer", input.output_hash().to_hex());
×
257
                } else {
258
                    let input_commitment = match self.db.fetch_output(input.output_hash()).await {
×
259
                        Ok(Some(o)) => o.output.commitment,
×
260
                        Ok(None) => {
261
                            return Err(RpcStatus::general(&format!(
×
262
                                "Mined info for input '{}' not found",
×
263
                                input.output_hash().to_hex()
×
264
                            )));
×
265
                        },
266
                        Err(e) => {
×
267
                            return Err(RpcStatus::general(&format!(
×
268
                                "Input '{}' not found ({})",
×
269
                                input.output_hash().to_hex(),
×
270
                                e
×
271
                            )));
×
272
                        },
273
                    };
274
                    trace!(target: LOG_TARGET, "Spent TXO (commitment '{}') to peer", input_commitment.to_hex());
×
275
                    inputs.push(Ok(SyncUtxosResponse {
×
276
                        txo: Some(Txo::Commitment(input_commitment.as_bytes().to_vec())),
×
277
                        mined_header: current_header_hash.to_vec(),
×
278
                    }));
×
279
                }
280
            }
281
            debug!(
×
282
                target: LOG_TARGET,
×
283
                "Adding {} inputs in response for block #{} '{}'", inputs.len(),
×
284
                current_header.height,
285
                current_header_hash
286
            );
287

288
            let mut txos = Vec::with_capacity(outputs.len() + inputs.len());
×
289
            txos.append(&mut outputs);
×
290
            txos.append(&mut inputs);
×
291
            if start_header == current_header {
×
292
                debug!(
×
293
                    target: LOG_TARGET,
×
294
                    "Adding {} genesis block pruned inputs in response for block #{} '{}'", pruned_genesis_block_outputs.len(),
×
295
                    current_header.height,
296
                    current_header_hash
297
                );
298
                txos.append(&mut pruned_genesis_block_outputs);
×
299
            }
×
300
            let txos = txos.into_iter();
×
301

302
            // Ensure task stops if the peer prematurely stops their RPC session
303
            let txos_len = txos.len();
×
304
            if utils::mpsc::send_all(tx, txos).await.is_err() {
×
305
                break;
×
306
            }
×
307

308
            debug!(
×
309
                target: LOG_TARGET,
×
310
                "Streamed {} TXOs in {:.2?} (including stream backpressure)",
311
                txos_len,
312
                timer.elapsed()
×
313
            );
314

315
            if current_header.height + 1 > end_header.height {
×
NEW
316
                reached_end = true;
×
317
                break;
×
318
            }
×
319

320
            current_header = self
×
321
                .db
×
322
                .fetch_header(current_header.height + 1)
×
323
                .await
×
324
                .rpc_status_internal_error(LOG_TARGET)?
×
325
                .ok_or_else(|| {
×
326
                    RpcStatus::general(&format!(
×
327
                        "Potential data consistency issue: header {} not found",
×
328
                        current_header.height + 1
×
329
                    ))
×
330
                })?;
×
331
        }
332

333
        // Always send a final terminator response tagged with the end header. It carries no TXO, so the
334
        // consumer can verify the stream reached the requested end even when the final block(s) stream no
335
        // outputs (e.g. an empty block, or a coinbase that was later spent and pruned on this node). Without
336
        // it, the consumer would infer completeness from the last data-bearing message, which may belong to
337
        // an earlier block.
NEW
338
        if reached_end {
×
NEW
339
            let end_header_hash = end_header.hash();
×
NEW
340
            if tx
×
NEW
341
                .send(Ok(SyncUtxosResponse {
×
NEW
342
                    txo: None,
×
NEW
343
                    mined_header: end_header_hash.to_vec(),
×
NEW
344
                }))
×
NEW
345
                .await
×
NEW
346
                .is_err()
×
347
            {
NEW
348
                debug!(
×
NEW
349
                    target: LOG_TARGET,
×
350
                    "Peer '{}' exited TXO sync session before terminator could be sent", self.peer_node_id
351
                );
NEW
352
            }
×
NEW
353
        }
×
354

355
        debug!(
×
356
            target: LOG_TARGET,
×
357
            "TXO sync completed to Header hash = {}",
358
            current_header.hash().to_hex()
×
359
        );
360

361
        Ok(())
×
362
    }
×
363
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc