• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 16933396277

13 Aug 2025 09:35AM UTC coverage: 54.463% (+0.2%) from 54.254%
16933396277

push

github

web-flow
feat: add seed peer exclusion to the proactive dialer (#7396)

Description
---
Added seed peer exclusion to proactive dialing when selecting available
candidates from the peer_db.

Motivation and Context
---
Seed peers are known entities; they have been dialled during initial
seed_strap, and a well-connected network should try to learn about and
connect to other peers as well.

How Has This Been Tested?
---
System-level testing.

What process can a PR reviewer use to test or verify this change?
---
Code review.

<!-- Checklist -->
<!-- 1. Is the title of your PR in the form that would make nice release
notes? The title, excluding the conventional commit
tag, will be included exactly as is in the CHANGELOG, so please think
about it carefully. -->


Breaking Changes
---

- [x] None
- [ ] Requires data directory on base node to be deleted
- [ ] Requires hard fork
- [ ] Other - Please specify

<!-- Does this include a breaking change? If so, include this line as a
footer -->
<!-- BREAKING CHANGE: Description what the user should do, e.g. delete a
database, resync the chain -->


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

* **Bug Fixes**
* Improved connection management by excluding seed peers from proactive
dialing candidates, enhancing network stability and reducing unnecessary
connection attempts and failed dials.

* **Documentation**
* Added a brief doc comment describing how to retrieve the list of seed
peers.

* **Tests**
* Expanded test coverage to validate discovery and syncing behavior when
seed peers are present and when filtering by external addresses.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

41 of 42 new or added lines in 3 files covered. (97.62%)

1673 existing lines in 28 files now uncovered.

76415 of 140305 relevant lines covered (54.46%)

194087.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs
1
// Copyright 2019. The Tari Project
2
//
3
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
// following conditions are met:
5
//
6
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
// disclaimer.
8
//
9
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
// following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
// products derived from this software without specific prior written permission.
14
//
15
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
#[cfg(feature = "metrics")]
24
use std::convert::{TryFrom, TryInto};
25
use std::{cmp::max, collections::HashSet, sync::Arc, time::Instant};
26

27
use log::*;
28
use strum_macros::Display;
29
use tari_common_types::types::{BlockHash, FixedHash, HashOutput};
30
use tari_comms::{connectivity::ConnectivityRequester, peer_manager::NodeId};
31
use tari_utilities::hex::Hex;
32
use tokio::sync::RwLock;
33

34
#[cfg(feature = "metrics")]
35
use crate::base_node::metrics;
36
use crate::{
37
    base_node::comms_interface::{
38
        comms_response::ValidatorNodeChange,
39
        error::CommsInterfaceError,
40
        local_interface::BlockEventSender,
41
        FetchMempoolTransactionsResponse,
42
        NodeCommsRequest,
43
        NodeCommsResponse,
44
        OutboundNodeCommsInterface,
45
    },
46
    blocks::{Block, BlockBuilder, BlockHeader, BlockHeaderValidationError, ChainBlock, NewBlock, NewBlockTemplate},
47
    chain_storage::{async_db::AsyncBlockchainDb, BlockAddResult, BlockchainBackend, ChainStorageError},
48
    consensus::{ConsensusConstants, ConsensusManager},
49
    mempool::Mempool,
50
    proof_of_work::{
51
        cuckaroo_pow::cuckaroo_difficulty,
52
        monero_randomx_difficulty,
53
        randomx_factory::RandomXFactory,
54
        sha3x_difficulty,
55
        tari_randomx_difficulty,
56
        Difficulty,
57
        PowAlgorithm,
58
        PowError,
59
    },
60
    transactions::aggregated_body::AggregateBody,
61
    validation::{helpers, tari_rx_vm_key_height, ValidationError},
62
};
63

64
const LOG_TARGET: &str = "c::bn::comms_interface::inbound_handler";
65
const MAX_REQUEST_BY_BLOCK_HASHES: usize = 100;
66
const MAX_REQUEST_BY_KERNEL_EXCESS_SIGS: usize = 100;
67
const MAX_REQUEST_BY_UTXO_HASHES: usize = 100;
68
const MAX_MEMPOOL_TIMEOUT: u64 = 150;
69

70
/// Events that can be published on the Validated Block Event Stream
71
/// Broadcast is to notify subscribers if this is a valid propagated block event
72
#[derive(Debug, Clone, Display)]
73
pub enum BlockEvent {
74
    ValidBlockAdded(Arc<Block>, BlockAddResult),
75
    AddBlockValidationFailed {
76
        block: Arc<Block>,
77
        source_peer: Option<NodeId>,
78
    },
79
    AddBlockErrored {
80
        block: Arc<Block>,
81
    },
82
    BlockSyncComplete(Arc<ChainBlock>, u64),
83
    BlockSyncRewind(Vec<Arc<ChainBlock>>),
84
}
85

86
/// The InboundNodeCommsInterface is used to handle all received inbound requests from remote nodes.
87
pub struct InboundNodeCommsHandlers<B> {
88
    block_event_sender: BlockEventSender,
89
    blockchain_db: AsyncBlockchainDb<B>,
90
    mempool: Mempool,
91
    consensus_manager: ConsensusManager,
92
    list_of_reconciling_blocks: Arc<RwLock<HashSet<HashOutput>>>,
93
    outbound_nci: OutboundNodeCommsInterface,
94
    connectivity: ConnectivityRequester,
95
    randomx_factory: RandomXFactory,
96
}
97

98
impl<B> InboundNodeCommsHandlers<B>
99
where B: BlockchainBackend + 'static
100
{
101
    /// Construct a new InboundNodeCommsInterface.
102
    pub fn new(
×
103
        block_event_sender: BlockEventSender,
×
104
        blockchain_db: AsyncBlockchainDb<B>,
×
105
        mempool: Mempool,
×
106
        consensus_manager: ConsensusManager,
×
107
        outbound_nci: OutboundNodeCommsInterface,
×
108
        connectivity: ConnectivityRequester,
×
109
        randomx_factory: RandomXFactory,
×
110
    ) -> Self {
×
111
        Self {
×
112
            block_event_sender,
×
113
            blockchain_db,
×
114
            mempool,
×
115
            consensus_manager,
×
116
            list_of_reconciling_blocks: Arc::new(RwLock::new(HashSet::new())),
×
117
            outbound_nci,
×
118
            connectivity,
×
119
            randomx_factory,
×
120
        }
×
UNCOV
121
    }
×
122

123
    /// Handle inbound node comms requests from remote nodes and local services.
124
    #[allow(clippy::too_many_lines)]
125
    pub async fn handle_request(&self, request: NodeCommsRequest) -> Result<NodeCommsResponse, CommsInterfaceError> {
×
126
        trace!(target: LOG_TARGET, "Handling remote request {}", request);
×
UNCOV
127
        match request {
×
128
            NodeCommsRequest::GetChainMetadata => Ok(NodeCommsResponse::ChainMetadata(
UNCOV
129
                self.blockchain_db.get_chain_metadata().await?,
×
130
            )),
131
            NodeCommsRequest::GetTargetDifficultyNextBlock(algo) => {
×
132
                let header = self.blockchain_db.fetch_tip_header().await?;
×
133
                let constants = self.consensus_manager.consensus_constants(header.header().height);
×
134
                let target_difficulty = self
×
135
                    .get_target_difficulty_for_next_block(algo, constants, *header.hash())
×
136
                    .await?;
×
UNCOV
137
                Ok(NodeCommsResponse::TargetDifficulty(target_difficulty))
×
138
            },
139
            NodeCommsRequest::FetchHeaders(range) => {
×
140
                let headers = self.blockchain_db.fetch_chain_headers(range).await?;
×
UNCOV
141
                Ok(NodeCommsResponse::BlockHeaders(headers))
×
142
            },
143
            NodeCommsRequest::FetchHeadersByHashes(block_hashes) => {
×
144
                if block_hashes.len() > MAX_REQUEST_BY_BLOCK_HASHES {
×
145
                    return Err(CommsInterfaceError::InvalidRequest {
×
146
                        request: "FetchHeadersByHashes",
×
147
                        details: format!(
×
148
                            "Exceeded maximum block hashes request (max: {}, got:{})",
×
149
                            MAX_REQUEST_BY_BLOCK_HASHES,
×
150
                            block_hashes.len()
×
151
                        ),
×
152
                    });
×
153
                }
×
154
                let mut block_headers = Vec::with_capacity(block_hashes.len());
×
155
                for block_hash in block_hashes {
×
156
                    let block_hex = block_hash.to_hex();
×
157
                    match self.blockchain_db.fetch_chain_header_by_block_hash(block_hash).await? {
×
158
                        Some(block_header) => {
×
159
                            block_headers.push(block_header);
×
UNCOV
160
                        },
×
161
                        None => {
162
                            error!(target: LOG_TARGET, "Could not fetch headers with hashes:{}", block_hex);
×
163
                            return Err(CommsInterfaceError::InternalError(format!(
×
164
                                "Could not fetch headers with hashes:{}",
×
165
                                block_hex
×
UNCOV
166
                            )));
×
167
                        },
168
                    }
169
                }
UNCOV
170
                Ok(NodeCommsResponse::BlockHeaders(block_headers))
×
171
            },
172
            NodeCommsRequest::FetchMatchingUtxos(utxo_hashes) => {
×
173
                let mut res = Vec::with_capacity(utxo_hashes.len());
×
174
                for (output, spent) in (self
×
175
                    .blockchain_db
×
176
                    .fetch_outputs_with_spend_status_at_tip(utxo_hashes)
×
177
                    .await?)
×
178
                    .into_iter()
×
UNCOV
179
                    .flatten()
×
180
                {
181
                    if !spent {
×
182
                        res.push(output);
×
UNCOV
183
                    }
×
184
                }
UNCOV
185
                Ok(NodeCommsResponse::TransactionOutputs(res))
×
186
            },
187
            NodeCommsRequest::FetchMatchingBlocks { range, compact } => {
×
188
                let blocks = self.blockchain_db.fetch_blocks(range, compact).await?;
×
UNCOV
189
                Ok(NodeCommsResponse::HistoricalBlocks(blocks))
×
190
            },
191
            NodeCommsRequest::FetchBlocksByKernelExcessSigs(excess_sigs) => {
×
192
                if excess_sigs.len() > MAX_REQUEST_BY_KERNEL_EXCESS_SIGS {
×
193
                    return Err(CommsInterfaceError::InvalidRequest {
×
194
                        request: "FetchBlocksByKernelExcessSigs",
×
195
                        details: format!(
×
196
                            "Exceeded maximum number of kernel excess sigs in request (max: {}, got:{})",
×
197
                            MAX_REQUEST_BY_KERNEL_EXCESS_SIGS,
×
198
                            excess_sigs.len()
×
199
                        ),
×
200
                    });
×
201
                }
×
202
                let mut blocks = Vec::with_capacity(excess_sigs.len());
×
203
                for sig in excess_sigs {
×
204
                    let sig_hex = sig.get_signature().to_hex();
×
205
                    debug!(
×
206
                        target: LOG_TARGET,
×
UNCOV
207
                        "A peer has requested a block with kernel with sig {}", sig_hex
×
208
                    );
209
                    match self.blockchain_db.fetch_block_with_kernel(sig).await {
×
210
                        Ok(Some(block)) => blocks.push(block),
×
211
                        Ok(None) => warn!(
×
212
                            target: LOG_TARGET,
×
213
                            "Could not provide requested block containing kernel with sig {} to peer because not \
×
UNCOV
214
                             stored",
×
215
                            sig_hex
216
                        ),
217
                        Err(e) => warn!(
×
218
                            target: LOG_TARGET,
×
UNCOV
219
                            "Could not provide requested block containing kernel with sig {} to peer because: {}",
×
220
                            sig_hex,
221
                            e
222
                        ),
223
                    }
224
                }
UNCOV
225
                Ok(NodeCommsResponse::HistoricalBlocks(blocks))
×
226
            },
227
            NodeCommsRequest::FetchBlocksByUtxos(commitments) => {
×
228
                if commitments.len() > MAX_REQUEST_BY_UTXO_HASHES {
×
229
                    return Err(CommsInterfaceError::InvalidRequest {
×
230
                        request: "FetchBlocksByUtxos",
×
231
                        details: format!(
×
232
                            "Exceeded maximum number of utxo hashes in request (max: {}, got:{})",
×
233
                            MAX_REQUEST_BY_UTXO_HASHES,
×
234
                            commitments.len()
×
235
                        ),
×
236
                    });
×
237
                }
×
238
                let mut blocks = Vec::with_capacity(commitments.len());
×
239
                for commitment in commitments {
×
240
                    let commitment_hex = commitment.to_hex();
×
241
                    debug!(
×
242
                        target: LOG_TARGET,
×
UNCOV
243
                        "A peer has requested a block with commitment {}", commitment_hex,
×
244
                    );
245
                    match self.blockchain_db.fetch_block_with_utxo(commitment).await {
×
246
                        Ok(Some(block)) => blocks.push(block),
×
247
                        Ok(None) => warn!(
×
248
                            target: LOG_TARGET,
×
UNCOV
249
                            "Could not provide requested block with commitment {} to peer because not stored",
×
250
                            commitment_hex,
251
                        ),
252
                        Err(e) => warn!(
×
253
                            target: LOG_TARGET,
×
UNCOV
254
                            "Could not provide requested block with commitment {} to peer because: {}",
×
255
                            commitment_hex,
256
                            e
257
                        ),
258
                    }
259
                }
UNCOV
260
                Ok(NodeCommsResponse::HistoricalBlocks(blocks))
×
261
            },
262
            NodeCommsRequest::GetHeaderByHash(hash) => {
×
263
                let header = self.blockchain_db.fetch_chain_header_by_block_hash(hash).await?;
×
UNCOV
264
                Ok(NodeCommsResponse::BlockHeader(header))
×
265
            },
266
            NodeCommsRequest::GetBlockByHash(hash) => {
×
267
                let block = self.blockchain_db.fetch_block_by_hash(hash, false).await?;
×
UNCOV
268
                Ok(NodeCommsResponse::HistoricalBlock(Box::new(block)))
×
269
            },
270
            NodeCommsRequest::GetNewBlockTemplate(request) => {
×
271
                let best_block_header = self.blockchain_db.fetch_tip_header().await?;
×
272
                let mut last_seen_hash = self.mempool.get_last_seen_hash().await?;
×
273
                let mut is_mempool_synced = false;
×
UNCOV
274
                let start = Instant::now();
×
275
                // this will wait a max of 150ms by default before returning anyway with a potential broken template
276
                // We need to ensure the mempool has seen the latest base node height before we can be confident the
277
                // template is correct
278
                while !is_mempool_synced && start.elapsed().as_millis() < MAX_MEMPOOL_TIMEOUT.into() {
×
279
                    if best_block_header.hash() == &last_seen_hash || last_seen_hash == FixedHash::default() {
×
280
                        is_mempool_synced = true;
×
281
                    } else {
×
282
                        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
×
UNCOV
283
                        last_seen_hash = self.mempool.get_last_seen_hash().await?;
×
284
                    }
285
                }
286

287
                if !is_mempool_synced {
×
288
                    warn!(
×
289
                        target: LOG_TARGET,
×
290
                        "Mempool out of sync - last seen hash '{}' does not match the tip hash '{}'. This condition \
×
291
                         should auto correct with the next block template request",
×
UNCOV
292
                        last_seen_hash, best_block_header.hash()
×
293
                    );
294
                }
×
295
                let mut header = BlockHeader::from_previous(best_block_header.header());
×
296
                let constants = self.consensus_manager.consensus_constants(header.height);
×
297
                header.version = constants.blockchain_version().into();
×
298
                header.pow.pow_algo = request.algo;
×
299

×
300
                let constants_weight = constants.max_block_transaction_weight();
×
301
                let asking_weight = if request.max_weight > constants_weight || request.max_weight == 0 {
×
UNCOV
302
                    constants_weight
×
303
                } else {
UNCOV
304
                    request.max_weight
×
305
                };
306

307
                debug!(
×
308
                    target: LOG_TARGET,
×
UNCOV
309
                    "Fetching transactions with a maximum weight of {} for the template", asking_weight
×
310
                );
311
                let transactions = self
×
312
                    .mempool
×
313
                    .retrieve(asking_weight)
×
314
                    .await?
×
315
                    .into_iter()
×
316
                    .map(|tx| Arc::try_unwrap(tx).unwrap_or_else(|tx| (*tx).clone()))
×
317
                    .collect::<Vec<_>>();
×
318

×
319
                debug!(
×
320
                    target: LOG_TARGET,
×
321
                    "Adding {} transaction(s) to new block template",
×
UNCOV
322
                    transactions.len(),
×
323
                );
324

325
                let prev_hash = header.prev_hash;
×
326
                let height = header.height;
×
327

×
328
                let block = header.into_builder().with_transactions(transactions).build();
×
329
                let block_hash = block.hash();
×
330
                let block_template = NewBlockTemplate::from_block(
×
331
                    block,
×
332
                    self.get_target_difficulty_for_next_block(request.algo, constants, prev_hash)
×
333
                        .await?,
×
334
                    self.consensus_manager.get_block_reward_at(height),
×
335
                    is_mempool_synced,
×
UNCOV
336
                )?;
×
337

338
                debug!(target: LOG_TARGET,
×
339
                    "New block template requested and prepared at height: #{}, target difficulty: {}, block hash: `{}`, weight: {}, {}",
×
340
                    block_template.header.height,
×
341
                    block_template.target_difficulty,
×
342
                    block_hash.to_hex(),
×
343
                    block_template
×
344
                        .body
×
345
                        .calculate_weight(constants.transaction_weight_params())
×
346
                        .map_err(|e| CommsInterfaceError::InternalError(e.to_string()))?,
×
UNCOV
347
                    block_template.body.to_counts_string()
×
348
                );
349

UNCOV
350
                Ok(NodeCommsResponse::NewBlockTemplate(block_template))
×
351
            },
352
            NodeCommsRequest::GetNewBlock(block_template) => {
×
353
                let height = block_template.header.height;
×
354
                let target_difficulty = block_template.target_difficulty;
×
355
                let block = self.blockchain_db.prepare_new_block(block_template).await?;
×
356
                let constants = self.consensus_manager.consensus_constants(block.header.height);
×
357
                debug!(target: LOG_TARGET,
×
358
                    "Prepared block: #{}, target difficulty: {}, block hash: `{}`, weight: {}, {}",
×
359
                    height,
×
360
                    target_difficulty,
×
361
                    block.hash().to_hex(),
×
362
                    block
×
363
                        .body
×
364
                        .calculate_weight(constants.transaction_weight_params())
×
365
                        .map_err(|e| CommsInterfaceError::InternalError(e.to_string()))?,
×
UNCOV
366
                    block.body.to_counts_string()
×
367
                );
368
                Ok(NodeCommsResponse::NewBlock {
×
369
                    success: true,
×
370
                    error: None,
×
371
                    block: Some(block),
×
UNCOV
372
                })
×
373
            },
374
            NodeCommsRequest::GetBlockFromAllChains(hash) => {
×
375
                let block_hex = hash.to_hex();
×
376
                debug!(
×
377
                    target: LOG_TARGET,
×
UNCOV
378
                    "A peer has requested a block with hash {}", block_hex
×
379
                );
380

381
                #[allow(clippy::blocks_in_conditions)]
382
                let maybe_block = match self
×
383
                    .blockchain_db
×
384
                    .fetch_block_by_hash(hash, true)
×
385
                    .await
×
386
                    .unwrap_or_else(|e| {
×
387
                        warn!(
×
388
                            target: LOG_TARGET,
×
UNCOV
389
                            "Could not provide requested block {} to peer because: {}",
×
390
                            block_hex,
391
                            e
392
                        );
393

394
                        None
×
395
                    }) {
×
396
                    None => self.blockchain_db.fetch_orphan(hash).await.map_or_else(
×
397
                        |e| {
×
398
                            warn!(
×
399
                                target: LOG_TARGET,
×
UNCOV
400
                                "Could not provide requested block {} to peer because: {}", block_hex, e,
×
401
                            );
402

403
                            None
×
404
                        },
×
405
                        Some,
×
406
                    ),
×
UNCOV
407
                    Some(block) => Some(block.into_block()),
×
408
                };
409

UNCOV
410
                Ok(NodeCommsResponse::Block(Box::new(maybe_block)))
×
411
            },
412
            NodeCommsRequest::FetchKernelByExcessSig(signature) => {
×
413
                let kernels = match self.blockchain_db.fetch_kernel_by_excess_sig(signature).await {
×
414
                    Ok(Some((kernel, _))) => vec![kernel],
×
415
                    Ok(None) => vec![],
×
416
                    Err(err) => {
×
417
                        error!(target: LOG_TARGET, "Could not fetch kernel {}", err);
×
UNCOV
418
                        return Err(err.into());
×
419
                    },
420
                };
421

UNCOV
422
                Ok(NodeCommsResponse::TransactionKernels(kernels))
×
423
            },
424
            NodeCommsRequest::FetchMempoolTransactionsByExcessSigs { excess_sigs } => {
×
425
                let (transactions, not_found) = self.mempool.retrieve_by_excess_sigs(excess_sigs).await?;
×
426
                Ok(NodeCommsResponse::FetchMempoolTransactionsByExcessSigsResponse(
×
427
                    FetchMempoolTransactionsResponse {
×
428
                        transactions,
×
429
                        not_found,
×
430
                    },
×
UNCOV
431
                ))
×
432
            },
433
            NodeCommsRequest::FetchValidatorNodesKeys {
434
                height,
×
UNCOV
435
                validator_network,
×
436
            } => {
437
                let active_validator_nodes = self
×
438
                    .blockchain_db
×
439
                    .fetch_active_validator_nodes(height, validator_network)
×
440
                    .await?;
×
441
                Ok(NodeCommsResponse::FetchValidatorNodesKeysResponse(
×
442
                    active_validator_nodes,
×
UNCOV
443
                ))
×
444
            },
445
            NodeCommsRequest::GetValidatorNode {
446
                sidechain_id,
×
UNCOV
447
                public_key,
×
448
            } => {
449
                let vn = self.blockchain_db.get_validator_node(sidechain_id, public_key).await?;
×
UNCOV
450
                Ok(NodeCommsResponse::GetValidatorNode(vn))
×
451
            },
452
            NodeCommsRequest::FetchTemplateRegistrations {
453
                start_height,
×
UNCOV
454
                end_height,
×
455
            } => {
456
                let template_registrations = self
×
457
                    .blockchain_db
×
458
                    .fetch_template_registrations(start_height..=end_height)
×
459
                    .await?;
×
460
                Ok(NodeCommsResponse::FetchTemplateRegistrationsResponse(
×
461
                    template_registrations,
×
UNCOV
462
                ))
×
463
            },
464
            NodeCommsRequest::FetchUnspentUtxosInBlock { block_hash } => {
×
465
                let utxos = self.blockchain_db.fetch_outputs_in_block(block_hash).await?;
×
UNCOV
466
                Ok(NodeCommsResponse::TransactionOutputs(utxos))
×
467
            },
468
            NodeCommsRequest::FetchMinedInfoByPayRef(payref) => {
×
469
                let output_info = self.blockchain_db.fetch_mined_info_by_payref(payref).await?;
×
UNCOV
470
                Ok(NodeCommsResponse::MinedInfo(output_info))
×
471
            },
472
            NodeCommsRequest::FetchMinedInfoByOutputHash(output_hash) => {
×
473
                let output_info = self.blockchain_db.fetch_mined_info_by_output_hash(output_hash).await?;
×
UNCOV
474
                Ok(NodeCommsResponse::MinedInfo(output_info))
×
475
            },
476
            NodeCommsRequest::FetchOutputMinedInfo(output_hash) => {
×
477
                let output_info = self.blockchain_db.fetch_output(output_hash).await?;
×
UNCOV
478
                Ok(NodeCommsResponse::OutputMinedInfo(output_info))
×
479
            },
480
            NodeCommsRequest::CheckOutputSpentStatus(output_hash) => {
×
481
                let input_info = self.blockchain_db.fetch_input(output_hash).await?;
×
UNCOV
482
                Ok(NodeCommsResponse::InputMinedInfo(input_info))
×
483
            },
484
            NodeCommsRequest::FetchValidatorNodeChanges { epoch, sidechain_id } => {
×
485
                let added_validators = self
×
486
                    .blockchain_db
×
487
                    .fetch_validators_activating_in_epoch(sidechain_id.clone(), epoch)
×
UNCOV
488
                    .await?;
×
489

490
                let exit_validators = self
×
491
                    .blockchain_db
×
492
                    .fetch_validators_exiting_in_epoch(sidechain_id.clone(), epoch)
×
UNCOV
493
                    .await?;
×
494

495
                info!(
×
496
                    target: LOG_TARGET,
×
497
                    "Fetched {} validators activating and {} validators exiting in epoch {}",
×
498
                    added_validators.len(),
×
UNCOV
499
                    exit_validators.len(),
×
500
                    epoch,
501
                );
502

503
                let mut node_changes = Vec::with_capacity(added_validators.len() + exit_validators.len());
×
504

×
505
                node_changes.extend(added_validators.into_iter().map(|vn| ValidatorNodeChange::Add {
×
506
                    registration: vn.original_registration.into(),
×
507
                    activation_epoch: vn.activation_epoch,
×
508
                    minimum_value_promise: vn.minimum_value_promise,
×
509
                    shard_key: vn.shard_key,
×
510
                }));
×
511

×
512
                node_changes.extend(exit_validators.into_iter().map(|vn| ValidatorNodeChange::Remove {
×
513
                    public_key: vn.public_key,
×
514
                }));
×
515

×
UNCOV
516
                Ok(NodeCommsResponse::FetchValidatorNodeChangesResponse(node_changes))
×
517
            },
518
        }
UNCOV
519
    }
×
520

521
    /// Handles a `NewBlock` message. Only a single `NewBlock` message can be handled at once to prevent extraneous
522
    /// requests for the full block.
523
    /// This may (asynchronously) block until the other request(s) complete or time out and so should typically be
524
    /// executed in a dedicated task.
525
    pub async fn handle_new_block_message(
×
526
        &mut self,
×
527
        new_block: NewBlock,
×
528
        source_peer: NodeId,
×
529
    ) -> Result<(), CommsInterfaceError> {
×
530
        let block_hash = new_block.header.hash();
×
531

×
532
        if self.blockchain_db.inner().is_add_block_disabled() {
×
533
            info!(
×
534
                target: LOG_TARGET,
×
535
                "Ignoring block message ({}) because add_block is locked",
×
UNCOV
536
                block_hash.to_hex()
×
537
            );
538
            return Ok(());
×
539
        }
×
540

×
541
        // Lets check if the block exists before we try and ask for a complete block
×
542
        if self.check_exists_and_not_bad_block(block_hash).await? {
×
543
            return Ok(());
×
544
        }
×
545

×
546
        // lets check that the difficulty at least matches 50% of the tip header. The max difficulty drop is 16%, thus
×
547
        // 50% is way more than that and in order to attack the node, you need 50% of the mining power. We cannot check
×
548
        // the target difficulty as orphan blocks dont have a target difficulty. All we care here is that bad
×
549
        // blocks are not free to make, and that they are more expensive to make then they are to validate. As
×
550
        // soon as a block can be linked to the main chain, a proper full proof of work check will
×
551
        // be done before any other validation.
×
UNCOV
552
        self.check_min_block_difficulty(&new_block).await?;
×
553

554
        {
555
            // we use a double lock to make sure we can only reconcile one unique block at a time. We may receive the
556
            // same block from multiple peer near simultaneously. We should only reconcile each unique block once.
557
            let read_lock = self.list_of_reconciling_blocks.read().await;
×
558
            if read_lock.contains(&block_hash) {
×
559
                debug!(
×
560
                    target: LOG_TARGET,
×
561
                    "Block with hash `{}` is already being reconciled",
×
UNCOV
562
                    block_hash.to_hex()
×
563
                );
564
                return Ok(());
×
UNCOV
565
            }
×
566
        }
567
        {
568
            let mut write_lock = self.list_of_reconciling_blocks.write().await;
×
569
            if self.check_exists_and_not_bad_block(block_hash).await? {
×
570
                return Ok(());
×
571
            }
×
572

×
573
            if !write_lock.insert(block_hash) {
×
574
                debug!(
×
575
                    target: LOG_TARGET,
×
576
                    "Block with hash `{}` is already being reconciled",
×
UNCOV
577
                    block_hash.to_hex()
×
578
                );
579
                return Ok(());
×
580
            }
×
581
        }
×
582

×
583
        debug!(
×
584
            target: LOG_TARGET,
×
585
            "Block with hash `{}` is unknown. Constructing block from known mempool transactions / requesting missing \
×
586
             transactions from peer '{}'.",
×
UNCOV
587
            block_hash.to_hex(),
×
588
            source_peer
589
        );
590

UNCOV
591
        let result = self.reconcile_and_add_block(source_peer.clone(), new_block).await;
×
592

593
        {
594
            let mut write_lock = self.list_of_reconciling_blocks.write().await;
×
595
            write_lock.remove(&block_hash);
×
596
        }
×
597
        result?;
×
598
        Ok(())
×
UNCOV
599
    }
×
600

601
    async fn check_min_block_difficulty(&self, new_block: &NewBlock) -> Result<(), CommsInterfaceError> {
×
602
        let constants = self.consensus_manager.consensus_constants(new_block.header.height);
×
603
        let gen_hash = *self.consensus_manager.get_genesis_block().hash();
×
604
        let mut min_difficulty = constants.min_pow_difficulty(new_block.header.pow.pow_algo);
×
UNCOV
605
        let mut header = self.blockchain_db.fetch_last_chain_header().await?;
×
606
        loop {
607
            if new_block.header.pow_algo() == header.header().pow_algo() {
×
608
                min_difficulty = max(
×
609
                    header
×
610
                        .accumulated_data()
×
611
                        .target_difficulty
×
612
                        .checked_div_u64(2)
×
613
                        .unwrap_or(min_difficulty),
×
614
                    min_difficulty,
×
615
                );
×
616
                break;
×
617
            }
×
618
            if header.height() == 0 {
×
619
                break;
×
UNCOV
620
            }
×
621
            // we have not reached gen block, and the pow algo does not match, so lets go further back
622
            header = self
×
623
                .blockchain_db
×
624
                .fetch_chain_header(header.height().saturating_sub(1))
×
UNCOV
625
                .await?;
×
626
        }
627
        let achieved = match new_block.header.pow_algo() {
×
628
            PowAlgorithm::RandomXM => monero_randomx_difficulty(
×
629
                &new_block.header,
×
630
                &self.randomx_factory,
×
631
                &gen_hash,
×
632
                &self.consensus_manager,
×
633
            )?,
×
UNCOV
634
            PowAlgorithm::Sha3x => sha3x_difficulty(&new_block.header)?,
×
635
            PowAlgorithm::RandomXT => {
636
                let vm_key = *self
×
637
                    .blockchain_db
×
638
                    .fetch_chain_header(tari_rx_vm_key_height(header.height()))
×
639
                    .await?
×
640
                    .hash();
×
UNCOV
641
                tari_randomx_difficulty(&new_block.header, &self.randomx_factory, &vm_key)?
×
642
            },
643
            PowAlgorithm::Cuckaroo => {
644
                let constants = self.consensus_manager.consensus_constants(new_block.header.height);
×
645
                let cuckaroo_cycle = constants.cuckaroo_cycle_length();
×
646
                let edge_bits = constants.cuckaroo_edge_bits();
×
647
                cuckaroo_difficulty(&new_block.header, cuckaroo_cycle, edge_bits)?
×
648
            },
649
        };
UNCOV
650
        if achieved < min_difficulty {
×
651
            return Err(CommsInterfaceError::InvalidBlockHeader(
×
652
                BlockHeaderValidationError::ProofOfWorkError(PowError::AchievedDifficultyBelowMin),
×
653
            ));
×
654
        }
×
655
        Ok(())
×
656
    }
×
657

658
    async fn check_exists_and_not_bad_block(&self, block: FixedHash) -> Result<bool, CommsInterfaceError> {
×
659
        if self.blockchain_db.chain_header_or_orphan_exists(block).await? {
×
660
            debug!(
×
661
                target: LOG_TARGET,
×
662
                "Block with hash `{}` already stored",
×
663
                block.to_hex()
×
664
            );
665
            return Ok(true);
×
UNCOV
666
        }
×
667
        let (is_bad_block, reason) = self.blockchain_db.bad_block_exists(block).await?;
×
668
        if is_bad_block {
×
669
            debug!(
×
670
                target: LOG_TARGET,
×
671
                "Block with hash `{}` already validated as a bad block due to `{}`",
×
672
                block.to_hex(), reason
×
673
            );
674
            return Err(CommsInterfaceError::ChainStorageError(
×
675
                ChainStorageError::ValidationError {
×
676
                    source: ValidationError::BadBlockFound {
×
677
                        hash: block.to_hex(),
×
UNCOV
678
                        reason,
×
679
                    },
×
680
                },
×
681
            ));
×
682
        }
×
683
        Ok(false)
×
684
    }
×
685

686
    async fn reconcile_and_add_block(
×
687
        &mut self,
×
UNCOV
688
        source_peer: NodeId,
×
UNCOV
689
        new_block: NewBlock,
×
690
    ) -> Result<(), CommsInterfaceError> {
×
691
        let block = self.reconcile_block(source_peer.clone(), new_block).await?;
×
692
        self.handle_block(block, Some(source_peer)).await?;
×
693
        Ok(())
×
694
    }
×
695

696
    #[allow(clippy::too_many_lines)]
697
    async fn reconcile_block(
×
698
        &mut self,
×
699
        source_peer: NodeId,
×
700
        new_block: NewBlock,
×
701
    ) -> Result<Block, CommsInterfaceError> {
×
702
        let NewBlock {
×
703
            header,
×
704
            coinbase_kernels,
×
705
            coinbase_outputs,
×
706
            kernel_excess_sigs: excess_sigs,
×
707
        } = new_block;
×
708
        // If the block is empty, we dont have to ask for the block, as we already have the full block available
×
709
        // to us.
×
710
        if excess_sigs.is_empty() {
×
711
            let block = BlockBuilder::new(header.version)
×
712
                .add_outputs(coinbase_outputs)
×
UNCOV
713
                .add_kernels(coinbase_kernels)
×
UNCOV
714
                .with_header(header)
×
715
                .build();
×
716
            return Ok(block);
×
717
        }
×
718

×
719
        let block_hash = header.hash();
×
720
        // We check the current tip and orphan status of the block because we cannot guarantee that mempool state is
721
        // correct and the mmr root calculation is only valid if the block is building on the tip.
722
        let current_meta = self.blockchain_db.get_chain_metadata().await?;
×
723
        if header.prev_hash != *current_meta.best_block_hash() {
×
724
            debug!(
×
UNCOV
725
                target: LOG_TARGET,
×
UNCOV
726
                "Orphaned block #{}: ({}), current tip is: #{} ({}). We need to fetch the complete block from peer: \
×
UNCOV
727
                 ({})",
×
UNCOV
728
                header.height,
×
729
                block_hash.to_hex(),
×
730
                current_meta.best_block_height(),
×
731
                current_meta.best_block_hash().to_hex(),
×
732
                source_peer,
733
            );
734
            #[allow(clippy::cast_possible_wrap)]
735
            #[cfg(feature = "metrics")]
736
            metrics::compact_block_tx_misses(header.height).set(excess_sigs.len() as i64);
×
737
            let block = self.request_full_block_from_peer(source_peer, block_hash).await?;
×
738
            return Ok(block);
×
739
        }
×
740

741
        // We know that the block is neither and orphan or a coinbase, so lets ask our mempool for the transactions
742
        let (known_transactions, missing_excess_sigs) = self.mempool.retrieve_by_excess_sigs(excess_sigs).await?;
×
743
        let known_transactions = known_transactions.into_iter().map(|tx| (*tx).clone()).collect();
×
744

×
745
        #[allow(clippy::cast_possible_wrap)]
×
746
        #[cfg(feature = "metrics")]
×
747
        metrics::compact_block_tx_misses(header.height).set(missing_excess_sigs.len() as i64);
×
748

×
749
        let mut builder = BlockBuilder::new(header.version)
×
750
            .add_outputs(coinbase_outputs)
×
751
            .add_kernels(coinbase_kernels)
×
752
            .with_transactions(known_transactions);
×
UNCOV
753

×
UNCOV
754
        if missing_excess_sigs.is_empty() {
×
755
            debug!(
×
756
                target: LOG_TARGET,
×
757
                "All transactions for block #{} ({}) found in mempool",
×
758
                header.height,
×
UNCOV
759
                block_hash.to_hex()
×
760
            );
761
        } else {
UNCOV
762
            debug!(
×
763
                target: LOG_TARGET,
×
764
                "Requesting {} unknown transaction(s) from peer '{}'.",
×
765
                missing_excess_sigs.len(),
×
766
                source_peer
767
            );
768

769
            let FetchMempoolTransactionsResponse {
UNCOV
770
                transactions,
×
771
                not_found,
×
772
            } = self
×
773
                .outbound_nci
×
UNCOV
774
                .request_transactions_by_excess_sig(source_peer.clone(), missing_excess_sigs)
×
775
                .await?;
×
776

777
            // Add returned transactions to unconfirmed pool
778
            if !transactions.is_empty() {
×
779
                self.mempool.insert_all(transactions.clone()).await?;
×
780
            }
×
781

782
            if !not_found.is_empty() {
×
783
                warn!(
×
UNCOV
784
                    target: LOG_TARGET,
×
UNCOV
785
                    "Peer {} was not able to return all transactions for block #{} ({}). {} transaction(s) not found. \
×
UNCOV
786
                     Requesting full block.",
×
787
                    source_peer,
×
788
                    header.height,
×
789
                    block_hash.to_hex(),
×
790
                    not_found.len()
×
791
                );
792

793
                #[cfg(feature = "metrics")]
794
                metrics::compact_block_full_misses(header.height).inc();
×
795
                let block = self.request_full_block_from_peer(source_peer, block_hash).await?;
×
796
                return Ok(block);
×
797
            }
×
798

×
UNCOV
799
            builder = builder.with_transactions(
×
UNCOV
800
                transactions
×
UNCOV
801
                    .into_iter()
×
802
                    .map(|tx| Arc::try_unwrap(tx).unwrap_or_else(|tx| (*tx).clone()))
×
803
                    .collect(),
×
UNCOV
804
            );
×
UNCOV
805
        }
×
806

807
        // NB: Add the header last because `with_transactions` etc updates the current header, but we have the final one
808
        // already
UNCOV
809
        builder = builder.with_header(header.clone());
×
810
        let block = builder.build();
×
811

812
        // Perform a sanity check on the reconstructed block, if the MMR roots don't match then it's possible one or
813
        // more transactions in our mempool had the same excess/signature for a *different* transaction.
814
        // This is extremely unlikely, but still possible. In case of a mismatch, request the full block from the peer.
815
        let (block, mmr_roots) = match self.blockchain_db.calculate_mmr_roots(block).await {
×
816
            Err(_) => {
817
                let block = self.request_full_block_from_peer(source_peer, block_hash).await?;
×
818
                return Ok(block);
×
819
            },
820
            Ok(v) => v,
×
821
        };
UNCOV
822
        if let Err(e) = helpers::check_mmr_roots(&header, &mmr_roots) {
×
UNCOV
823
            warn!(
×
UNCOV
824
                target: LOG_TARGET,
×
825
                "Reconstructed block #{} ({}) failed MMR check validation!. Requesting full block. Error: {}",
×
826
                header.height,
×
827
                block_hash.to_hex(),
×
828
                e,
829
            );
830

831
            #[cfg(feature = "metrics")]
UNCOV
832
            metrics::compact_block_mmr_mismatch(header.height).inc();
×
833
            let block = self.request_full_block_from_peer(source_peer, block_hash).await?;
×
834
            return Ok(block);
×
835
        }
×
836

×
837
        Ok(block)
×
838
    }
×
839

840
    async fn request_full_block_from_peer(
×
841
        &mut self,
×
UNCOV
842
        source_peer: NodeId,
×
843
        block_hash: BlockHash,
×
UNCOV
844
    ) -> Result<Block, CommsInterfaceError> {
×
845
        match self
×
846
            .outbound_nci
×
847
            .request_blocks_by_hashes_from_peer(block_hash, Some(source_peer.clone()))
×
UNCOV
848
            .await
×
849
        {
850
            Ok(Some(block)) => Ok(block),
×
851
            Ok(None) => {
852
                debug!(
×
UNCOV
853
                    target: LOG_TARGET,
×
UNCOV
854
                    "Peer `{}` failed to return the block that was requested.", source_peer
×
855
                );
856
                Err(CommsInterfaceError::InvalidPeerResponse(format!(
×
857
                    "Invalid response from peer `{}`: Peer failed to provide the block that was propagated",
×
UNCOV
858
                    source_peer
×
859
                )))
×
860
            },
861
            Err(CommsInterfaceError::UnexpectedApiResponse) => {
UNCOV
862
                debug!(
×
863
                    target: LOG_TARGET,
×
UNCOV
864
                    "Peer `{}` sent unexpected API response.", source_peer
×
865
                );
UNCOV
866
                Err(CommsInterfaceError::UnexpectedApiResponse)
×
867
            },
UNCOV
868
            Err(e) => Err(e),
×
869
        }
UNCOV
870
    }
×
871

872
    /// Handle inbound blocks from remote nodes and local services.
873
    ///
874
    /// ## Arguments
875
    /// block - the block to store
876
    /// new_block_msg - propagate this new block message
877
    /// source_peer - the peer that sent this new block message, or None if the block was generated by a local miner
878
    pub async fn handle_block(
×
879
        &mut self,
×
880
        block: Block,
×
881
        source_peer: Option<NodeId>,
×
882
    ) -> Result<BlockHash, CommsInterfaceError> {
×
883
        let block_hash = block.hash();
×
884
        let block_height = block.header.height;
×
885

×
886
        info!(
×
887
            target: LOG_TARGET,
×
UNCOV
888
            "Block #{} ({}) received from {}",
×
889
            block_height,
×
890
            block_hash.to_hex(),
×
891
            source_peer
×
UNCOV
892
                .as_ref()
×
893
                .map(|p| format!("remote peer: {}", p))
×
UNCOV
894
                .unwrap_or_else(|| "local services".to_string())
×
895
        );
896
        debug!(target: LOG_TARGET, "Incoming block: {}", block);
×
897
        let timer = Instant::now();
×
898
        let block = self.hydrate_block(block).await?;
×
899

900
        let add_block_result = self.blockchain_db.add_block(block.clone()).await;
×
901
        // Create block event on block event stream
902
        match add_block_result {
×
903
            Ok(block_add_result) => {
×
UNCOV
904
                debug!(
×
UNCOV
905
                    target: LOG_TARGET,
×
906
                    "Block #{} ({}) added ({}) to blockchain in {:.2?}",
×
907
                    block_height,
×
908
                    block_hash.to_hex(),
×
909
                    block_add_result,
×
910
                    timer.elapsed()
×
911
                );
912

UNCOV
913
                let should_propagate = match &block_add_result {
×
914
                    BlockAddResult::Ok(_) => true,
×
UNCOV
915
                    BlockAddResult::BlockExists => false,
×
916
                    BlockAddResult::OrphanBlock => false,
×
917
                    BlockAddResult::ChainReorg { .. } => true,
×
918
                };
919

920
                #[cfg(feature = "metrics")]
921
                self.update_block_result_metrics(&block_add_result).await?;
×
922

UNCOV
923
                self.publish_block_event(BlockEvent::ValidBlockAdded(block.clone(), block_add_result));
×
924

×
925
                if should_propagate {
×
926
                    debug!(
×
927
                        target: LOG_TARGET,
×
928
                        "Propagate block ({}) to network.",
×
929
                        block_hash.to_hex()
×
930
                    );
UNCOV
931
                    let exclude_peers = source_peer.into_iter().collect();
×
932
                    let new_block_msg = NewBlock::from(&*block);
×
933
                    if let Err(e) = self.outbound_nci.propagate_block(new_block_msg, exclude_peers).await {
×
934
                        warn!(
×
UNCOV
935
                            target: LOG_TARGET,
×
UNCOV
936
                            "Failed to propagate block ({}) to network: {}.",
×
937
                            block_hash.to_hex(), e
×
938
                        );
939
                    }
×
940
                }
×
941
                Ok(block_hash)
×
942
            },
943

944
            Err(e @ ChainStorageError::ValidationError { .. }) => {
×
945
                #[cfg(feature = "metrics")]
×
946
                {
×
947
                    let block_hash = block.hash();
×
948
                    metrics::rejected_blocks(block.header.height, &block_hash).inc();
×
949
                }
×
UNCOV
950
                warn!(
×
UNCOV
951
                    target: LOG_TARGET,
×
952
                    "Peer {} sent an invalid block: {}",
×
953
                    source_peer
×
UNCOV
954
                        .as_ref()
×
UNCOV
955
                        .map(ToString::to_string)
×
956
                        .unwrap_or_else(|| "<local request>".to_string()),
×
957
                    e
958
                );
959
                self.publish_block_event(BlockEvent::AddBlockValidationFailed { block, source_peer });
×
960
                Err(e.into())
×
961
            },
962

UNCOV
963
            Err(e) => {
×
964
                #[cfg(feature = "metrics")]
×
UNCOV
965
                metrics::rejected_blocks(block.header.height, &block.hash()).inc();
×
966

×
967
                self.publish_block_event(BlockEvent::AddBlockErrored { block });
×
968
                Err(e.into())
×
969
            },
970
        }
971
    }
×
972

973
    async fn hydrate_block(&mut self, block: Block) -> Result<Arc<Block>, CommsInterfaceError> {
×
974
        let block_hash = block.hash();
×
UNCOV
975
        let block_height = block.header.height;
×
976
        if block.body.inputs().is_empty() {
×
977
            debug!(
×
978
                target: LOG_TARGET,
×
979
                "Block #{} ({}) contains no inputs so nothing to hydrate",
×
980
                block_height,
×
UNCOV
981
                block_hash.to_hex(),
×
982
            );
983
            return Ok(Arc::new(block));
×
984
        }
×
985

×
986
        let timer = Instant::now();
×
UNCOV
987
        let (header, mut inputs, outputs, kernels) = block.dissolve();
×
988

989
        let db = self.blockchain_db.inner().db_read_access()?;
×
990
        for input in &mut inputs {
×
991
            if !input.is_compact() {
×
992
                continue;
×
993
            }
×
994

995
            let output_mined_info =
×
UNCOV
996
                db.fetch_output(&input.output_hash())?
×
997
                    .ok_or_else(|| CommsInterfaceError::InvalidFullBlock {
×
998
                        hash: block_hash,
×
999
                        details: format!("Output {} to be spent does not exist in db", input.output_hash()),
×
1000
                    })?;
×
1001

1002
            input.add_output_data(output_mined_info.output);
×
1003
        }
UNCOV
1004
        debug!(
×
1005
            target: LOG_TARGET,
×
1006
            "Hydrated block #{} ({}) with {} input(s) in {:.2?}",
×
1007
            block_height,
×
UNCOV
1008
            block_hash.to_hex(),
×
1009
            inputs.len(),
×
1010
            timer.elapsed()
×
1011
        );
1012
        let block = Block::new(header, AggregateBody::new(inputs, outputs, kernels));
×
1013
        Ok(Arc::new(block))
×
UNCOV
1014
    }
×
1015

1016
    fn publish_block_event(&self, event: BlockEvent) {
×
1017
        if let Err(event) = self.block_event_sender.send(Arc::new(event)) {
×
1018
            debug!(target: LOG_TARGET, "No event subscribers. Event {} dropped.", event.0)
×
1019
        }
×
1020
    }
×
1021

1022
    #[cfg(feature = "metrics")]
1023
    async fn update_block_result_metrics(&self, block_add_result: &BlockAddResult) -> Result<(), CommsInterfaceError> {
×
1024
        fn update_target_difficulty(block: &ChainBlock) {
×
1025
            match block.header().pow_algo() {
×
1026
                PowAlgorithm::Sha3x => {
×
1027
                    metrics::target_difficulty_sha()
×
1028
                        .set(i64::try_from(block.accumulated_data().target_difficulty.as_u64()).unwrap_or(i64::MAX));
×
1029
                },
×
1030
                PowAlgorithm::RandomXM => {
×
UNCOV
1031
                    metrics::target_difficulty_monero_randomx()
×
1032
                        .set(i64::try_from(block.accumulated_data().target_difficulty.as_u64()).unwrap_or(i64::MAX));
×
UNCOV
1033
                },
×
1034
                PowAlgorithm::RandomXT => {
×
1035
                    metrics::target_difficulty_tari_randomx()
×
1036
                        .set(i64::try_from(block.accumulated_data().target_difficulty.as_u64()).unwrap_or(i64::MAX));
×
1037
                },
×
1038
                PowAlgorithm::Cuckaroo => {
×
1039
                    metrics::target_difficulty_cuckaroo()
×
1040
                        .set(i64::try_from(block.accumulated_data().target_difficulty.as_u64()).unwrap_or(i64::MAX));
×
UNCOV
1041
                },
×
1042
            }
1043
        }
×
1044

1045
        match block_add_result {
×
1046
            BlockAddResult::Ok(ref block) => {
×
UNCOV
1047
                update_target_difficulty(block);
×
1048
                #[allow(clippy::cast_possible_wrap)]
×
1049
                metrics::tip_height().set(block.height() as i64);
×
1050
                let utxo_set_size = self.blockchain_db.utxo_count().await?;
×
1051
                metrics::utxo_set_size().set(utxo_set_size.try_into().unwrap_or(i64::MAX));
×
1052
            },
1053
            BlockAddResult::ChainReorg { added, removed } => {
×
UNCOV
1054
                if let Some(fork_height) = added.last().map(|b| b.height()) {
×
1055
                    #[allow(clippy::cast_possible_wrap)]
1056
                    metrics::tip_height().set(fork_height as i64);
×
1057
                    metrics::reorg(fork_height, added.len(), removed.len()).inc();
×
1058

UNCOV
1059
                    let utxo_set_size = self.blockchain_db.utxo_count().await?;
×
1060
                    metrics::utxo_set_size().set(utxo_set_size.try_into().unwrap_or(i64::MAX));
×
1061
                }
×
UNCOV
1062
                for block in added {
×
1063
                    update_target_difficulty(block);
×
1064
                }
×
1065
            },
1066
            BlockAddResult::OrphanBlock => {
×
1067
                metrics::orphaned_blocks().inc();
×
1068
            },
×
1069
            _ => {},
×
1070
        }
1071
        Ok(())
×
1072
    }
×
1073

1074
    async fn get_target_difficulty_for_next_block(
×
1075
        &self,
×
1076
        pow_algo: PowAlgorithm,
×
1077
        constants: &ConsensusConstants,
×
1078
        current_block_hash: HashOutput,
×
1079
    ) -> Result<Difficulty, CommsInterfaceError> {
×
1080
        let target_difficulty = self
×
UNCOV
1081
            .blockchain_db
×
1082
            .fetch_target_difficulty_for_next_block(pow_algo, current_block_hash)
×
1083
            .await?;
×
1084

UNCOV
1085
        let target = target_difficulty.calculate(
×
UNCOV
1086
            constants.min_pow_difficulty(pow_algo),
×
UNCOV
1087
            constants.max_pow_difficulty(pow_algo),
×
1088
        );
×
1089
        trace!(target: LOG_TARGET, "Target difficulty {} for PoW {}", target, pow_algo);
×
1090
        Ok(target)
×
1091
    }
×
1092

1093
    pub async fn get_last_seen_hash(&self) -> Result<FixedHash, CommsInterfaceError> {
×
1094
        self.mempool.get_last_seen_hash().await.map_err(|e| e.into())
×
1095
    }
×
1096
}
1097

1098
impl<B> Clone for InboundNodeCommsHandlers<B> {
1099
    fn clone(&self) -> Self {
×
UNCOV
1100
        Self {
×
UNCOV
1101
            block_event_sender: self.block_event_sender.clone(),
×
UNCOV
1102
            blockchain_db: self.blockchain_db.clone(),
×
UNCOV
1103
            mempool: self.mempool.clone(),
×
UNCOV
1104
            consensus_manager: self.consensus_manager.clone(),
×
UNCOV
1105
            list_of_reconciling_blocks: self.list_of_reconciling_blocks.clone(),
×
UNCOV
1106
            outbound_nci: self.outbound_nci.clone(),
×
UNCOV
1107
            connectivity: self.connectivity.clone(),
×
UNCOV
1108
            randomx_factory: self.randomx_factory.clone(),
×
UNCOV
1109
        }
×
UNCOV
1110
    }
×
1111
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc