• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 19571297598

21 Nov 2025 01:01PM UTC coverage: 61.372% (+0.01%) from 61.362%
19571297598

push

github

web-flow
fix: store completed transactions with corresponding sent_output_hash (#7595)

Description
---

Updates `send_many_one_sided_transactions` so that completed
transactions, which are stored for each recipient, have a single
corresponding output hash.

This fix simplifies the code, as well as fixes a possible bug of how
corresponding outputs where found. They were looked up by `value`, which
would yield incorrect results, if multiple payments with the same value
would be sent using this function.

I inspected the **Transaction Builder**, its `add_stealth_recipient()`
and `build()`.
The order of `sent_output_hashes` and `sent_outputs` is exactly the same
as they were originally added in transaction builder.

Motivation and Context
---

GRPC `transfer` call has a feature (enabled by setting `single_tx=true`)
to send a single transaction to multiple recipients.
But when the resulting transaction is queried for each recipient, it
returned all `sent_output_hashes` instead of the one, which belongs to
that particular recipient.

This change makes a fix, so that `sent_output_hashes` will always
contain a single entry with corresponding output hash.

How Has This Been Tested?
---

What process can a PR reviewer use to test or verify this change?
---

<!-- Checklist -->
<!-- 1. Is the title of your PR in the form that would make nice release
notes? The title, excluding the conventional commit
tag, will be included exactly as is in the CHANGELOG, so please think
about it carefully. -->


Breaking Changes
---

- [x] None
- [ ] Requires data directory on base node to be deleted
- [ ] Requires hard fork
- [ ] Other - Please specify

<!-- Does this include a breaking change? If so, include this line as a
footer -->
<!-- BREAKING CHANGE: Description what the user should do, e.g. delete a
database, resync the chain -->


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

* **Refactor**
* Improved multi-recipient one-side... (continued)

0 of 29 new or added lines in 1 file covered. (0.0%)

25 existing lines in 8 files now uncovered.

70439 of 114774 relevant lines covered (61.37%)

303323.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.14
/base_layer/core/src/base_node/sync/block_sync/synchronizer.rs
1
//  Copyright 2020, The Tari Project
2
//
3
//  Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
//  following conditions are met:
5
//
6
//  1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
//  disclaimer.
8
//
9
//  2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
//  following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
//  3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
//  products derived from this software without specific prior written permission.
14
//
15
//  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
//  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
//  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
//  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
//  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
//  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
//  USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
use std::{
24
    convert::{TryFrom, TryInto},
25
    sync::Arc,
26
    time::{Duration, Instant},
27
};
28

29
use futures::StreamExt;
30
use log::*;
31
use tari_comms::{connectivity::ConnectivityRequester, peer_manager::NodeId, protocol::rpc::RpcClient, PeerConnection};
32
use tari_node_components::blocks::{Block, ChainBlock};
33
use tari_transaction_components::{aggregated_body::AggregateBody, BanPeriod};
34
use tari_utilities::hex::Hex;
35

36
use super::error::BlockSyncError;
37
use crate::{
38
    base_node::{
39
        sync::{ban::PeerBanManager, hooks::Hooks, rpc, SyncPeer},
40
        BlockchainSyncConfig,
41
    },
42
    chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend},
43
    common::rolling_avg::RollingAverageTime,
44
    proto::base_node::SyncBlocksRequest,
45
    validation::{BlockBodyValidator, ValidationError},
46
};
47
const LOG_TARGET: &str = "c::bn::block_sync";
48

49
const MAX_LATENCY_INCREASES: usize = 5;
50

51
pub struct BlockSynchronizer<'a, B> {
52
    config: BlockchainSyncConfig,
53
    db: AsyncBlockchainDb<B>,
54
    connectivity: ConnectivityRequester,
55
    sync_peers: &'a mut Vec<SyncPeer>,
56
    block_validator: Arc<dyn BlockBodyValidator<B>>,
57
    hooks: Hooks,
58
    peer_ban_manager: PeerBanManager,
59
}
60

61
impl<'a, B: BlockchainBackend + 'static> BlockSynchronizer<'a, B> {
62
    pub fn new(
2✔
63
        config: BlockchainSyncConfig,
2✔
64
        db: AsyncBlockchainDb<B>,
2✔
65
        connectivity: ConnectivityRequester,
2✔
66
        sync_peers: &'a mut Vec<SyncPeer>,
2✔
67
        block_validator: Arc<dyn BlockBodyValidator<B>>,
2✔
68
    ) -> Self {
2✔
69
        let peer_ban_manager = PeerBanManager::new(config.clone(), connectivity.clone());
2✔
70
        Self {
2✔
71
            config,
2✔
72
            db,
2✔
73
            connectivity,
2✔
74
            sync_peers,
2✔
75
            block_validator,
2✔
76
            hooks: Default::default(),
2✔
77
            peer_ban_manager,
2✔
78
        }
2✔
79
    }
2✔
80

81
    pub fn on_starting<H>(&mut self, hook: H)
2✔
82
    where for<'r> H: FnOnce(&SyncPeer) + Send + Sync + 'static {
2✔
83
        self.hooks.add_on_starting_hook(hook);
2✔
84
    }
2✔
85

86
    pub fn on_progress<H>(&mut self, hook: H)
2✔
87
    where H: Fn(Arc<ChainBlock>, u64, &SyncPeer) + Send + Sync + 'static {
2✔
88
        self.hooks.add_on_progress_block_hook(hook);
2✔
89
    }
2✔
90

91
    pub fn on_complete<H>(&mut self, hook: H)
2✔
92
    where H: Fn(Arc<ChainBlock>, u64) + Send + Sync + 'static {
2✔
93
        self.hooks.add_on_complete_hook(hook);
2✔
94
    }
2✔
95

96
    pub async fn synchronize(&mut self) -> Result<(), BlockSyncError> {
2✔
97
        let mut max_latency = self.config.initial_max_sync_latency;
2✔
98
        let mut sync_round = 0;
2✔
99
        let mut latency_increases_counter = 0;
2✔
100
        loop {
101
            match self.attempt_block_sync(max_latency).await {
2✔
102
                Ok(_) => return Ok(()),
2✔
103
                Err(err @ BlockSyncError::AllSyncPeersExceedLatency) => {
×
104
                    warn!(target: LOG_TARGET, "{err}");
×
105
                    max_latency += self.config.max_latency_increase;
×
106
                    warn!(
×
107
                        target: LOG_TARGET,
×
108
                        "Retrying block sync with increased max latency {:.2?} with {} sync peers",
×
109
                        max_latency,
110
                        self.sync_peers.len()
×
111
                    );
112
                    latency_increases_counter += 1;
×
113
                    if latency_increases_counter > MAX_LATENCY_INCREASES {
×
114
                        return Err(err);
×
115
                    }
×
116
                    // Prohibit using a few slow sync peers only, rather get new sync peers assigned
117
                    if self.sync_peers.len() < 2 {
×
118
                        return Err(err);
×
119
                    } else {
120
                        continue;
×
121
                    }
122
                },
123
                Err(err @ BlockSyncError::SyncRoundFailed) => {
×
124
                    sync_round += 1;
×
125
                    warn!(target: LOG_TARGET, "{err} ({sync_round})");
×
126
                    continue;
×
127
                },
128
                Err(err) => {
×
129
                    return Err(err);
×
130
                },
131
            }
132
        }
133
    }
2✔
134

135
    async fn attempt_block_sync(&mut self, max_latency: Duration) -> Result<(), BlockSyncError> {
2✔
136
        let sync_peer_node_ids = self.sync_peers.iter().map(|p| p.node_id()).cloned().collect::<Vec<_>>();
2✔
137
        info!(
2✔
138
            target: LOG_TARGET,
×
139
            "Attempting to sync blocks({} sync peers)",
×
140
            sync_peer_node_ids.len()
×
141
        );
142
        let mut latency_counter = 0usize;
2✔
143
        for node_id in sync_peer_node_ids {
2✔
144
            let peer_index = self.get_sync_peer_index(&node_id).ok_or(BlockSyncError::PeerNotFound)?;
2✔
145
            let sync_peer = self.sync_peers.get(peer_index).expect("Already checked");
2✔
146
            self.hooks.call_on_starting_hook(sync_peer);
2✔
147
            let mut conn = match self.connect_to_sync_peer(node_id.clone()).await {
2✔
148
                Ok(val) => val,
2✔
149
                Err(e) => {
×
150
                    warn!(
×
151
                        target: LOG_TARGET,
×
152
                        "Failed to connect to sync peer `{node_id}`: {e}"
×
153
                    );
154
                    self.remove_sync_peer(&node_id);
×
155
                    continue;
×
156
                },
157
            };
158
            let config = RpcClient::builder()
2✔
159
                .with_deadline(self.config.rpc_deadline)
2✔
160
                .with_deadline_grace_period(Duration::from_secs(5));
2✔
161
            let mut client = match conn
2✔
162
                .connect_rpc_using_builder::<rpc::BaseNodeSyncRpcClient>(config)
2✔
163
                .await
2✔
164
            {
165
                Ok(val) => val,
2✔
166
                Err(e) => {
×
167
                    warn!(
×
168
                        target: LOG_TARGET,
×
169
                        "Failed to obtain RPC connection from sync peer `{node_id}`: {e}"
×
170
                    );
171
                    self.remove_sync_peer(&node_id);
×
172
                    continue;
×
173
                },
174
            };
175
            let latency = client
2✔
176
                .get_last_request_latency()
2✔
177
                .expect("unreachable panic: last request latency must be set after connect");
2✔
178
            self.sync_peers
2✔
179
                .get_mut(peer_index)
2✔
180
                .expect("Already checked")
2✔
181
                .set_latency(latency);
2✔
182
            let sync_peer = self.sync_peers.get(peer_index).expect("Already checked").clone();
2✔
183
            info!(
2✔
184
                target: LOG_TARGET,
×
185
                "Attempting to synchronize blocks with `{node_id}` latency: {latency:.2?}"
×
186
            );
187
            match self.synchronize_blocks(sync_peer, client, max_latency).await {
2✔
188
                Ok(_) => return Ok(()),
2✔
189
                Err(err) => {
×
190
                    warn!(target: LOG_TARGET, "{err}");
×
191
                    let ban_reason = BlockSyncError::get_ban_reason(&err);
×
192
                    if let Some(reason) = ban_reason {
×
193
                        let duration = match reason.ban_duration {
×
194
                            BanPeriod::Short => self.config.short_ban_period,
×
195
                            BanPeriod::Long => self.config.ban_period,
×
196
                        };
197
                        self.peer_ban_manager
×
198
                            .ban_peer_if_required(&node_id, reason.reason, duration)
×
199
                            .await;
×
200
                    }
×
201
                    if let BlockSyncError::MaxLatencyExceeded { .. } = err {
×
202
                        latency_counter += 1;
×
203
                    } else {
×
204
                        self.remove_sync_peer(&node_id);
×
205
                    }
×
206
                },
207
            }
208
        }
209

210
        if self.sync_peers.is_empty() {
×
211
            Err(BlockSyncError::NoMoreSyncPeers("Block sync failed".to_string()))
×
212
        } else if latency_counter >= self.sync_peers.len() {
×
213
            Err(BlockSyncError::AllSyncPeersExceedLatency)
×
214
        } else {
215
            Err(BlockSyncError::SyncRoundFailed)
×
216
        }
217
    }
2✔
218

219
    async fn connect_to_sync_peer(&self, peer: NodeId) -> Result<PeerConnection, BlockSyncError> {
2✔
220
        let connection = self.connectivity.dial_peer(peer).await?;
2✔
221
        Ok(connection)
2✔
222
    }
2✔
223

224
    #[allow(clippy::too_many_lines)]
225
    async fn synchronize_blocks(
2✔
226
        &mut self,
2✔
227
        mut sync_peer: SyncPeer,
2✔
228
        mut client: rpc::BaseNodeSyncRpcClient,
2✔
229
        max_latency: Duration,
2✔
230
    ) -> Result<(), BlockSyncError> {
2✔
231
        info!(target: LOG_TARGET, "Starting block sync from peer {}", sync_peer.node_id());
2✔
232

233
        let tip_header = self.db.fetch_last_header().await?;
2✔
234
        let local_metadata = self.db.get_chain_metadata().await?;
2✔
235

236
        if tip_header.height <= local_metadata.best_block_height() {
2✔
237
            debug!(
×
238
                target: LOG_TARGET,
×
239
                "Blocks already synchronized to height {}.", tip_header.height
×
240
            );
241
            return Ok(());
×
242
        }
2✔
243

244
        let tip_hash = tip_header.hash();
2✔
245
        let tip_height = tip_header.height;
2✔
246
        let best_height = local_metadata.best_block_height();
2✔
247
        let chain_header = self.db.fetch_chain_header(best_height).await?;
2✔
248

249
        let best_full_block_hash = chain_header.accumulated_data().hash;
2✔
250
        debug!(
2✔
251
            target: LOG_TARGET,
×
252
            "Starting block sync from peer `{}`. Current best block is #{} `{}`. Syncing to #{} ({}).",
×
253
            sync_peer,
254
            best_height,
255
            best_full_block_hash.to_hex(),
×
256
            tip_height,
257
            tip_hash.to_hex()
×
258
        );
259
        let request = SyncBlocksRequest {
2✔
260
            start_hash: best_full_block_hash.to_vec(),
2✔
261
            // To the tip!
2✔
262
            end_hash: tip_hash.to_vec(),
2✔
263
        };
2✔
264

265
        let mut block_stream = client.sync_blocks(request).await?;
2✔
266
        let mut prev_hash = best_full_block_hash;
2✔
267
        let mut current_block = None;
2✔
268
        let mut last_sync_timer = Instant::now();
2✔
269
        let mut avg_latency = RollingAverageTime::new(20);
2✔
270
        while let Some(block_result) = block_stream.next().await {
4✔
271
            let latency = last_sync_timer.elapsed();
2✔
272
            avg_latency.add_sample(latency);
2✔
273
            let block_body_response = block_result?;
2✔
274

275
            let header = self
2✔
276
                .db
2✔
277
                .fetch_chain_header_by_block_hash(block_body_response.hash.clone().try_into()?)
2✔
278
                .await?
2✔
279
                .ok_or_else(|| {
2✔
280
                    BlockSyncError::UnknownHeaderHash(format!(
×
281
                        "Peer sent hash ({}) for block header we do not have",
×
282
                        block_body_response.hash.to_hex()
×
283
                    ))
×
284
                })?;
×
285

286
            let current_height = header.height();
2✔
287
            let header_hash = *header.hash();
2✔
288
            let timestamp = header.timestamp();
2✔
289

290
            if header.header().prev_hash != prev_hash {
2✔
291
                return Err(BlockSyncError::BlockWithoutParent {
×
292
                    expected: prev_hash.to_hex(),
×
293
                    got: header.header().prev_hash.to_hex(),
×
294
                });
×
295
            }
2✔
296

297
            prev_hash = header_hash;
2✔
298

299
            let body = block_body_response
2✔
300
                .body
2✔
301
                .map(AggregateBody::try_from)
2✔
302
                .ok_or_else(|| BlockSyncError::InvalidBlockBody("Peer sent empty block".to_string()))?
2✔
303
                .map_err(BlockSyncError::InvalidBlockBody)?;
2✔
304

305
            debug!(
2✔
306
                target: LOG_TARGET,
×
307
                "Validating block body #{} (PoW = {}, {}, latency: {:.2?})",
×
308
                current_height,
309
                header.header().pow_algo(),
×
310
                body.to_counts_string(),
×
311
                latency
312
            );
313

314
            let timer = Instant::now();
2✔
315
            let (header, header_accum_data) = header.into_parts();
2✔
316
            let block = Block::new(header, body);
2✔
317

318
            // Validate the block inside a tokio task
319
            let task_block = block.clone();
2✔
320
            let db = self.db.inner().clone();
2✔
321
            let validator = self.block_validator.clone();
2✔
322
            let res = {
2✔
323
                let txn = db.db_read_access()?;
2✔
324
                validator.validate_body(&*txn, &task_block)
2✔
325
            };
326

327
            let block = match res {
2✔
328
                Ok(block) => block,
2✔
329
                Err(err @ ValidationError::BadBlockFound { .. }) | Err(err @ ValidationError::FatalStorageError(_)) => {
×
330
                    return Err(err.into());
×
331
                },
332
                Err(err) => {
×
333
                    // Add to bad blocks
334
                    if let Err(err) = self
×
335
                        .db
×
336
                        .write_transaction()
×
337
                        .delete_orphan(header_hash)
×
338
                        .insert_bad_block(header_hash, current_height, err.to_string())
×
339
                        .commit()
×
340
                        .await
×
341
                    {
342
                        error!(target: LOG_TARGET, "Failed to insert bad block: {err}");
×
343
                    }
×
344
                    return Err(err.into());
×
345
                },
346
            };
347

348
            let block = ChainBlock::try_construct(Arc::new(block), header_accum_data)
2✔
349
                .map(Arc::new)
2✔
350
                .ok_or(BlockSyncError::FailedToConstructChainBlock)?;
2✔
351

352
            debug!(
2✔
353
                target: LOG_TARGET,
×
354
                "Validated in {:.0?}. Storing block body #{} (PoW = {}, {})",
×
355
                timer.elapsed(),
×
356
                block.header().height,
×
357
                block.header().pow_algo(),
×
358
                block.block().body.to_counts_string(),
×
359
            );
360
            trace!(
2✔
361
                target: LOG_TARGET,
×
362
                "{block}"
×
363
            );
364

365
            self.db
2✔
366
                .write_transaction()
2✔
367
                .delete_orphan(header_hash)
2✔
368
                .insert_tip_block_body(block.clone())
2✔
369
                .set_best_block(
2✔
370
                    block.height(),
2✔
371
                    header_hash,
2✔
372
                    block.accumulated_data().total_accumulated_difficulty,
2✔
373
                    block.header().prev_hash,
2✔
374
                    timestamp,
2✔
375
                )
2✔
376
                .commit()
2✔
377
                .await?;
2✔
378

379
            // Average time between receiving blocks from the peer - used to detect a slow sync peer
380
            let last_avg_latency = avg_latency.calculate_average_with_min_samples(5);
2✔
381
            if let Some(latency) = last_avg_latency {
2✔
UNCOV
382
                sync_peer.set_latency(latency);
×
383
            }
2✔
384
            // Includes time to add block to database, used to show blocks/s on status line
385
            sync_peer.add_sample(last_sync_timer.elapsed());
2✔
386
            self.hooks
2✔
387
                .call_on_progress_block_hooks(block.clone(), tip_height, &sync_peer);
2✔
388

389
            if let Some(avg_latency) = last_avg_latency {
2✔
UNCOV
390
                if avg_latency > max_latency {
×
391
                    return Err(BlockSyncError::MaxLatencyExceeded {
×
392
                        peer: sync_peer.node_id().clone(),
×
393
                        latency: avg_latency,
×
394
                        max_latency,
×
395
                    });
×
UNCOV
396
                }
×
397
            }
2✔
398

399
            current_block = Some(block);
2✔
400
            last_sync_timer = Instant::now();
2✔
401
        }
402
        debug!(
2✔
403
            "Sync peer claim at start  - height: {}, accumulated difficulty: {}",
×
404
            sync_peer.claimed_chain_metadata().best_block_height(),
×
405
            sync_peer.claimed_chain_metadata().accumulated_difficulty(),
×
406
        );
407
        debug!(
2✔
408
            "Our best header at start  - height: {}, accumulated difficulty: {}",
×
409
            best_height,
410
            chain_header.accumulated_data().total_accumulated_difficulty,
×
411
        );
412
        let metadata_after_sync = self.db.get_chain_metadata().await?;
2✔
413
        debug!(
2✔
414
            "Our best block after sync - height: {}, accumulated difficulty: {}",
×
415
            metadata_after_sync.best_block_height(),
×
416
            metadata_after_sync.accumulated_difficulty(),
×
417
        );
418

419
        if metadata_after_sync.accumulated_difficulty() < sync_peer.claimed_chain_metadata().accumulated_difficulty() {
2✔
420
            return Err(BlockSyncError::PeerDidNotSupplyAllClaimedBlocks(format!(
×
421
                "Their claim - height: {}, accumulated difficulty: {}. Our status after block sync - height: {}, \
×
422
                 accumulated difficulty: {}",
×
423
                sync_peer.claimed_chain_metadata().best_block_height(),
×
424
                sync_peer.claimed_chain_metadata().accumulated_difficulty(),
×
425
                metadata_after_sync.best_block_height(),
×
426
                metadata_after_sync.accumulated_difficulty(),
×
427
            )));
×
428
        }
2✔
429

430
        if let Some(block) = current_block {
2✔
431
            self.hooks.call_on_complete_hooks(block, best_height);
2✔
432
        }
2✔
433

434
        debug!(target: LOG_TARGET, "Completed block sync with peer `{sync_peer}`");
2✔
435

436
        Ok(())
2✔
437
    }
2✔
438

439
    // Sync peers are also removed from the list of sync peers if the ban duration is longer than the short ban period.
440
    fn remove_sync_peer(&mut self, node_id: &NodeId) {
×
441
        if let Some(pos) = self.sync_peers.iter().position(|p| p.node_id() == node_id) {
×
442
            self.sync_peers.remove(pos);
×
443
        }
×
444
    }
×
445

446
    // Helper function to get the index to the node_id inside of the vec of peers
447
    fn get_sync_peer_index(&mut self, node_id: &NodeId) -> Option<usize> {
2✔
448
        self.sync_peers.iter().position(|p| p.node_id() == node_id)
2✔
449
    }
2✔
450
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc