• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 8232002250

11 Mar 2024 11:26AM UTC coverage: 75.965% (-0.04%) from 76.004%
8232002250

push

github

web-flow
chore: new release (#6201)

Description
---
New release for esme with new gen block

75261 of 99073 relevant lines covered (75.97%)

305118.63 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.79
/base_layer/core/src/base_node/sync/rpc/sync_utxos_task.rs
1
// Copyright 2021. The Tari Project
2
//
3
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
// following conditions are met:
5
//
6
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
// disclaimer.
8
//
9
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
// following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
// products derived from this software without specific prior written permission.
14
//
15
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
use std::{
24
    convert::{TryFrom, TryInto},
25
    sync::Arc,
26
    time::Instant,
27
};
28

29
use log::*;
30
use tari_comms::{
31
    peer_manager::NodeId,
32
    protocol::rpc::{Request, RpcStatus, RpcStatusResultExt},
33
    utils,
34
};
35
use tari_utilities::{hex::Hex, ByteArray};
36
use tokio::{sync::mpsc, task};
37

38
#[cfg(feature = "metrics")]
39
use crate::base_node::metrics;
40
use crate::{
41
    blocks::BlockHeader,
42
    chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend},
43
    proto,
44
    proto::base_node::{sync_utxos_response::Txo, SyncUtxosRequest, SyncUtxosResponse},
45
};
46

47
const LOG_TARGET: &str = "c::base_node::sync_rpc::sync_utxo_task";
48

49
pub(crate) struct SyncUtxosTask<B> {
50
    db: AsyncBlockchainDb<B>,
51
    peer_node_id: Arc<NodeId>,
52
}
53

54
impl<B> SyncUtxosTask<B>
55
where B: BlockchainBackend + 'static
56
{
57
    pub(crate) fn new(db: AsyncBlockchainDb<B>, peer_node_id: Arc<NodeId>) -> Self {
12✔
58
        Self { db, peer_node_id }
12✔
59
    }
12✔
60

61
    pub(crate) async fn run(
12✔
62
        self,
12✔
63
        request: Request<SyncUtxosRequest>,
12✔
64
        mut tx: mpsc::Sender<Result<SyncUtxosResponse, RpcStatus>>,
12✔
65
    ) -> Result<(), RpcStatus> {
12✔
66
        let msg = request.into_message();
12✔
67
        let start_hash = msg
12✔
68
            .start_header_hash
12✔
69
            .clone()
12✔
70
            .try_into()
12✔
71
            .rpc_status_bad_request("Invalid header hash")?;
12✔
72

73
        let start_header = self
12✔
74
            .db
12✔
75
            .fetch_header_by_block_hash(start_hash)
12✔
76
            .await
12✔
77
            .rpc_status_internal_error(LOG_TARGET)?
12✔
78
            .ok_or_else(|| RpcStatus::not_found("Start header hash was not found"))?;
12✔
79

80
        let end_hash = msg
11✔
81
            .end_header_hash
11✔
82
            .clone()
11✔
83
            .try_into()
11✔
84
            .rpc_status_bad_request("Invalid header hash")?;
11✔
85

86
        let end_header = self
11✔
87
            .db
11✔
88
            .fetch_header_by_block_hash(end_hash)
11✔
89
            .await
11✔
90
            .rpc_status_internal_error(LOG_TARGET)?
11✔
91
            .ok_or_else(|| RpcStatus::not_found("End header hash was not found"))?;
11✔
92
        if start_header.height > end_header.height {
10✔
93
            return Err(RpcStatus::bad_request(&format!(
×
94
                "Start header height({}) cannot be greater than the end header height({})",
×
95
                start_header.height, end_header.height
×
96
            )));
×
97
        }
10✔
98

10✔
99
        task::spawn(async move {
10✔
100
            debug!(
10✔
101
                target: LOG_TARGET,
102
                "Starting UTXO stream for peer '{}'", self.peer_node_id
×
103
            );
104
            if let Err(err) = self.start_streaming(&mut tx, start_header, end_header).await {
289✔
105
                debug!(
×
106
                    target: LOG_TARGET,
107
                    "UTXO stream errored for peer '{}': {}", self.peer_node_id, err
×
108
                );
109
                let _result = tx.send(Err(err)).await;
×
110
            }
10✔
111
            debug!(
10✔
112
                target: LOG_TARGET,
113
                "UTXO stream completed for peer '{}'", self.peer_node_id
×
114
            );
115
            #[cfg(feature = "metrics")]
116
            metrics::active_sync_peers().dec();
10✔
117
        });
10✔
118

10✔
119
        Ok(())
10✔
120
    }
12✔
121

122
    #[allow(clippy::too_many_lines)]
123
    async fn start_streaming(
10✔
124
        &self,
10✔
125
        tx: &mut mpsc::Sender<Result<SyncUtxosResponse, RpcStatus>>,
10✔
126
        mut current_header: BlockHeader,
10✔
127
        end_header: BlockHeader,
10✔
128
    ) -> Result<(), RpcStatus> {
10✔
129
        debug!(
130
            target: LOG_TARGET,
131
            "Starting stream task with current_header: {}, end_header: {}",
×
132
            current_header.hash().to_hex(),
×
133
            end_header.hash().to_hex(),
×
134
        );
135

136
        // If this is a pruned node and outputs have been requested for an initial sync, we need to discover and send
137
        // the outputs from the genesis block that have been pruned as well
138
        let mut pruned_genesis_block_outputs = Vec::new();
10✔
139
        let metadata = self
10✔
140
            .db
10✔
141
            .get_chain_metadata()
10✔
142
            .await
10✔
143
            .rpc_status_internal_error(LOG_TARGET)?;
10✔
144
        if current_header.height == 1 && metadata.is_pruned_node() {
10✔
145
            let genesis_block = self.db.fetch_genesis_block();
1✔
146
            for output in genesis_block.block().body.outputs() {
1✔
147
                let output_hash = output.hash();
1✔
148
                if self
1✔
149
                    .db
1✔
150
                    .fetch_output(output_hash)
1✔
151
                    .await
1✔
152
                    .rpc_status_internal_error(LOG_TARGET)?
1✔
153
                    .is_none()
1✔
154
                {
155
                    trace!(
156
                        target: LOG_TARGET,
157
                        "Spent genesis TXO (commitment '{}') to peer",
×
158
                        output.commitment.to_hex()
×
159
                    );
160
                    pruned_genesis_block_outputs.push(Ok(SyncUtxosResponse {
1✔
161
                        txo: Some(Txo::Commitment(output.commitment.as_bytes().to_vec())),
1✔
162
                        mined_header: current_header.hash().to_vec(),
1✔
163
                    }));
1✔
164
                }
×
165
            }
166
        }
9✔
167

168
        let start_header = current_header.clone();
10✔
169
        loop {
170
            let timer = Instant::now();
80✔
171
            let current_header_hash = current_header.hash();
80✔
172
            debug!(
173
                target: LOG_TARGET,
174
                "Streaming TXO(s) for block #{} ({})",
×
175
                current_header.height,
×
176
                current_header_hash.to_hex()
×
177
            );
178
            if tx.is_closed() {
80✔
179
                debug!(target: LOG_TARGET, "Peer '{}' exited TXO sync session early", self.peer_node_id);
×
180
                break;
×
181
            }
80✔
182

183
            let outputs_with_statuses = self
80✔
184
                .db
80✔
185
                .fetch_outputs_in_block_with_spend_state(current_header_hash, Some(end_header.hash()))
80✔
186
                .await
80✔
187
                .rpc_status_internal_error(LOG_TARGET)?;
80✔
188
            if tx.is_closed() {
80✔
189
                debug!(target: LOG_TARGET, "Peer '{}' exited TXO sync session early", self.peer_node_id);
×
190
                break;
×
191
            }
80✔
192

80✔
193
            let mut outputs = Vec::with_capacity(outputs_with_statuses.len());
80✔
194
            for (output, spent) in outputs_with_statuses {
294✔
195
                if output.is_burned() {
214✔
196
                    continue;
×
197
                }
214✔
198
                if !spent {
214✔
199
                    match proto::types::TransactionOutput::try_from(output.clone()) {
204✔
200
                        Ok(tx_ouput) => {
204✔
201
                            trace!(
202
                                target: LOG_TARGET,
203
                                "Unspent TXO (commitment '{}') to peer",
×
204
                                output.commitment.to_hex()
×
205
                            );
206
                            outputs.push(Ok(SyncUtxosResponse {
204✔
207
                                txo: Some(Txo::Output(tx_ouput)),
204✔
208
                                mined_header: current_header_hash.to_vec(),
204✔
209
                            }));
204✔
210
                        },
211
                        Err(e) => {
×
212
                            return Err(RpcStatus::general(&format!(
×
213
                                "Output '{}' RPC conversion error ({})",
×
214
                                output.hash().to_hex(),
×
215
                                e
×
216
                            )))
×
217
                        },
218
                    }
219
                }
10✔
220
            }
221
            debug!(
222
                target: LOG_TARGET,
223
                "Adding {} outputs in response for block #{} '{}'", outputs.len(),
×
224
                current_header.height,
225
                current_header_hash
226
            );
227

228
            let inputs_in_block = self
80✔
229
                .db
80✔
230
                .fetch_inputs_in_block(current_header_hash)
80✔
231
                .await
80✔
232
                .rpc_status_internal_error(LOG_TARGET)?;
80✔
233
            if tx.is_closed() {
80✔
234
                debug!(target: LOG_TARGET, "Peer '{}' exited TXO sync session early", self.peer_node_id);
×
235
                break;
×
236
            }
80✔
237

80✔
238
            let mut inputs = Vec::with_capacity(inputs_in_block.len());
80✔
239
            for input in inputs_in_block {
109✔
240
                let output_from_current_tranche = if let Some(mined_info) = self
29✔
241
                    .db
29✔
242
                    .fetch_output(input.output_hash())
29✔
243
                    .await
29✔
244
                    .rpc_status_internal_error(LOG_TARGET)?
29✔
245
                {
246
                    mined_info.mined_height >= start_header.height
29✔
247
                } else {
248
                    false
×
249
                };
250

251
                if output_from_current_tranche {
29✔
252
                    trace!(target: LOG_TARGET, "Spent TXO (hash '{}') not sent to peer", input.output_hash().to_hex());
×
253
                } else {
254
                    let input_commitment = match self.db.fetch_output(input.output_hash()).await {
19✔
255
                        Ok(Some(o)) => o.output.commitment,
19✔
256
                        Ok(None) => {
257
                            return Err(RpcStatus::general(&format!(
×
258
                                "Mined info for input '{}' not found",
×
259
                                input.output_hash().to_hex()
×
260
                            )))
×
261
                        },
262
                        Err(e) => {
×
263
                            return Err(RpcStatus::general(&format!(
×
264
                                "Input '{}' not found ({})",
×
265
                                input.output_hash().to_hex(),
×
266
                                e
×
267
                            )))
×
268
                        },
269
                    };
270
                    trace!(target: LOG_TARGET, "Spent TXO (commitment '{}') to peer", input_commitment.to_hex());
×
271
                    inputs.push(Ok(SyncUtxosResponse {
19✔
272
                        txo: Some(Txo::Commitment(input_commitment.as_bytes().to_vec())),
19✔
273
                        mined_header: current_header_hash.to_vec(),
19✔
274
                    }));
19✔
275
                }
276
            }
277
            debug!(
278
                target: LOG_TARGET,
279
                "Adding {} inputs in response for block #{} '{}'", inputs.len(),
×
280
                current_header.height,
281
                current_header_hash
282
            );
283

284
            let mut txos = Vec::with_capacity(outputs.len() + inputs.len());
80✔
285
            txos.append(&mut outputs);
80✔
286
            txos.append(&mut inputs);
80✔
287
            if start_header == current_header {
80✔
288
                debug!(
289
                    target: LOG_TARGET,
290
                    "Adding {} genesis block pruned inputs in response for block #{} '{}'", pruned_genesis_block_outputs.len(),
×
291
                    current_header.height,
292
                    current_header_hash
293
                );
294
                txos.append(&mut pruned_genesis_block_outputs);
10✔
295
            }
70✔
296
            let txos = txos.into_iter();
80✔
297

80✔
298
            // Ensure task stops if the peer prematurely stops their RPC session
80✔
299
            let txos_len = txos.len();
80✔
300
            if utils::mpsc::send_all(tx, txos).await.is_err() {
80✔
301
                break;
×
302
            }
80✔
303

304
            debug!(
305
                target: LOG_TARGET,
306
                "Streamed {} TXOs in {:.2?} (including stream backpressure)",
×
307
                txos_len,
×
308
                timer.elapsed()
×
309
            );
310

311
            if current_header.height + 1 > end_header.height {
80✔
312
                break;
10✔
313
            }
70✔
314

315
            current_header = self
70✔
316
                .db
70✔
317
                .fetch_header(current_header.height + 1)
70✔
318
                .await
70✔
319
                .rpc_status_internal_error(LOG_TARGET)?
70✔
320
                .ok_or_else(|| {
70✔
321
                    RpcStatus::general(&format!(
×
322
                        "Potential data consistency issue: header {} not found",
×
323
                        current_header.height + 1
×
324
                    ))
×
325
                })?;
70✔
326
        }
327

328
        debug!(
329
            target: LOG_TARGET,
330
            "TXO sync completed to Header hash = {}",
×
331
            current_header.hash().to_hex()
×
332
        );
333

334
        Ok(())
10✔
335
    }
10✔
336
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc