• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 19929707289

04 Dec 2025 12:55PM UTC coverage: 60.418% (-0.1%) from 60.517%
19929707289

push

github

SWvheerden
chore: new release v5.2.0-pre.7

70282 of 116327 relevant lines covered (60.42%)

225067.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

46.7
/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs
1
// Copyright 2019. The Tari Project
2
//
3
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
// following conditions are met:
5
//
6
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
// disclaimer.
8
//
9
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
// following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
// products derived from this software without specific prior written permission.
14
//
15
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
#[cfg(feature = "metrics")]
24
use std::convert::{TryFrom, TryInto};
25
use std::{cmp::max, collections::HashSet, sync::Arc, time::Instant};
26

27
use log::*;
28
use strum_macros::Display;
29
use tari_common_types::types::{BlockHash, FixedHash, HashOutput};
30
use tari_comms::{connectivity::ConnectivityRequester, peer_manager::NodeId};
31
use tari_node_components::blocks::{
32
    Block,
33
    BlockBuilder,
34
    BlockHeader,
35
    BlockHeaderValidationError,
36
    ChainBlock,
37
    NewBlock,
38
    NewBlockTemplate,
39
};
40
use tari_transaction_components::{
41
    aggregated_body::AggregateBody,
42
    consensus::ConsensusConstants,
43
    tari_proof_of_work::{Difficulty, PowAlgorithm, PowError},
44
};
45
use tari_utilities::hex::Hex;
46
use tokio::sync::RwLock;
47

48
#[cfg(feature = "metrics")]
49
use crate::base_node::metrics;
50
use crate::{
51
    base_node::comms_interface::{
52
        comms_response::ValidatorNodeChange,
53
        error::CommsInterfaceError,
54
        local_interface::BlockEventSender,
55
        FetchMempoolTransactionsResponse,
56
        NodeCommsRequest,
57
        NodeCommsResponse,
58
        OutboundNodeCommsInterface,
59
    },
60
    chain_storage::{async_db::AsyncBlockchainDb, BlockAddResult, BlockchainBackend, ChainStorageError},
61
    consensus::BaseNodeConsensusManager,
62
    mempool::Mempool,
63
    proof_of_work::{
64
        cuckaroo_pow::cuckaroo_difficulty,
65
        monero_randomx_difficulty,
66
        randomx_factory::RandomXFactory,
67
        sha3x_difficulty,
68
        tari_randomx_difficulty,
69
    },
70
    validation::{helpers, tari_rx_vm_key_height, ValidationError},
71
};
72

73
const LOG_TARGET: &str = "c::bn::comms_interface::inbound_handler";
74
const MAX_REQUEST_BY_BLOCK_HASHES: usize = 100;
75
const MAX_REQUEST_BY_KERNEL_EXCESS_SIGS: usize = 100;
76
const MAX_REQUEST_BY_UTXO_HASHES: usize = 100;
77
const MAX_MEMPOOL_TIMEOUT: u64 = 150;
78
#[cfg(feature = "metrics")]
79
const DIFF_INDICATOR_LAG: u64 = 25;
80

81
/// Events that can be published on the Validated Block Event Stream
82
/// Broadcast is to notify subscribers if this is a valid propagated block event
83
#[derive(Debug, Clone, Display)]
84
pub enum BlockEvent {
85
    ValidBlockAdded(Arc<Block>, BlockAddResult),
86
    AddBlockValidationFailed {
87
        block: Arc<Block>,
88
        source_peer: Option<NodeId>,
89
    },
90
    AddBlockErrored {
91
        block: Arc<Block>,
92
    },
93
    BlockSyncComplete(Arc<ChainBlock>, u64),
94
    BlockSyncRewind(Vec<Arc<ChainBlock>>),
95
}
96

97
/// The InboundNodeCommsInterface is used to handle all received inbound requests from remote nodes.
98
pub struct InboundNodeCommsHandlers<B> {
99
    block_event_sender: BlockEventSender,
100
    blockchain_db: AsyncBlockchainDb<B>,
101
    mempool: Mempool,
102
    consensus_manager: BaseNodeConsensusManager,
103
    list_of_reconciling_blocks: Arc<RwLock<HashSet<HashOutput>>>,
104
    outbound_nci: OutboundNodeCommsInterface,
105
    connectivity: ConnectivityRequester,
106
    randomx_factory: RandomXFactory,
107
}
108

109
impl<B> InboundNodeCommsHandlers<B>
110
where B: BlockchainBackend + 'static
111
{
112
    /// Construct a new InboundNodeCommsInterface.
113
    pub fn new(
53✔
114
        block_event_sender: BlockEventSender,
53✔
115
        blockchain_db: AsyncBlockchainDb<B>,
53✔
116
        mempool: Mempool,
53✔
117
        consensus_manager: BaseNodeConsensusManager,
53✔
118
        outbound_nci: OutboundNodeCommsInterface,
53✔
119
        connectivity: ConnectivityRequester,
53✔
120
        randomx_factory: RandomXFactory,
53✔
121
    ) -> Self {
53✔
122
        Self {
53✔
123
            block_event_sender,
53✔
124
            blockchain_db,
53✔
125
            mempool,
53✔
126
            consensus_manager,
53✔
127
            list_of_reconciling_blocks: Arc::new(RwLock::new(HashSet::new())),
53✔
128
            outbound_nci,
53✔
129
            connectivity,
53✔
130
            randomx_factory,
53✔
131
        }
53✔
132
    }
53✔
133

134
    /// Handle inbound node comms requests from remote nodes and local services.
135
    #[allow(clippy::too_many_lines)]
136
    pub async fn handle_request(&self, request: NodeCommsRequest) -> Result<NodeCommsResponse, CommsInterfaceError> {
94✔
137
        trace!(target: LOG_TARGET, "Handling remote request {request}");
94✔
138
        match request {
94✔
139
            NodeCommsRequest::GetChainMetadata => Ok(NodeCommsResponse::ChainMetadata(
140
                self.blockchain_db.get_chain_metadata().await?,
74✔
141
            )),
142
            NodeCommsRequest::GetTargetDifficultyNextBlock(algo) => {
×
143
                let header = self.blockchain_db.fetch_tip_header().await?;
×
144
                let constants = self.consensus_manager.consensus_constants(header.header().height);
×
145
                let target_difficulty = self
×
146
                    .get_target_difficulty_for_next_block(algo, constants, *header.hash())
×
147
                    .await?;
×
148
                Ok(NodeCommsResponse::TargetDifficulty(target_difficulty))
×
149
            },
150
            NodeCommsRequest::FetchHeaders(range) => {
1✔
151
                let headers = self.blockchain_db.fetch_chain_headers(range).await?;
1✔
152
                Ok(NodeCommsResponse::BlockHeaders(headers))
1✔
153
            },
154
            NodeCommsRequest::FetchHeadersByHashes(block_hashes) => {
×
155
                if block_hashes.len() > MAX_REQUEST_BY_BLOCK_HASHES {
×
156
                    return Err(CommsInterfaceError::InvalidRequest {
×
157
                        request: "FetchHeadersByHashes",
×
158
                        details: format!(
×
159
                            "Exceeded maximum block hashes request (max: {}, got:{})",
×
160
                            MAX_REQUEST_BY_BLOCK_HASHES,
×
161
                            block_hashes.len()
×
162
                        ),
×
163
                    });
×
164
                }
×
165
                let mut block_headers = Vec::with_capacity(block_hashes.len());
×
166
                for block_hash in block_hashes {
×
167
                    let block_hex = block_hash.to_hex();
×
168
                    match self.blockchain_db.fetch_chain_header_by_block_hash(block_hash).await? {
×
169
                        Some(block_header) => {
×
170
                            block_headers.push(block_header);
×
171
                        },
×
172
                        None => {
173
                            error!(target: LOG_TARGET, "Could not fetch headers with hashes:{block_hex}");
×
174
                            return Err(CommsInterfaceError::InternalError(format!(
×
175
                                "Could not fetch headers with hashes:{block_hex}"
×
176
                            )));
×
177
                        },
178
                    }
179
                }
180
                Ok(NodeCommsResponse::BlockHeaders(block_headers))
×
181
            },
182
            NodeCommsRequest::FetchMatchingUtxos(utxo_hashes) => {
1✔
183
                let mut res = Vec::with_capacity(utxo_hashes.len());
1✔
184
                for (output, spent) in (self
1✔
185
                    .blockchain_db
1✔
186
                    .fetch_outputs_with_spend_status_at_tip(utxo_hashes)
1✔
187
                    .await?)
1✔
188
                    .into_iter()
1✔
189
                    .flatten()
1✔
190
                {
191
                    if !spent {
1✔
192
                        res.push(output);
1✔
193
                    }
1✔
194
                }
195
                Ok(NodeCommsResponse::TransactionOutputs(res))
1✔
196
            },
197
            NodeCommsRequest::FetchMatchingBlocks { range, compact } => {
3✔
198
                let blocks = self.blockchain_db.fetch_blocks(range, compact).await?;
3✔
199
                Ok(NodeCommsResponse::HistoricalBlocks(blocks))
3✔
200
            },
201
            NodeCommsRequest::FetchBlocksByKernelExcessSigs(excess_sigs) => {
×
202
                if excess_sigs.len() > MAX_REQUEST_BY_KERNEL_EXCESS_SIGS {
×
203
                    return Err(CommsInterfaceError::InvalidRequest {
×
204
                        request: "FetchBlocksByKernelExcessSigs",
×
205
                        details: format!(
×
206
                            "Exceeded maximum number of kernel excess sigs in request (max: {}, got:{})",
×
207
                            MAX_REQUEST_BY_KERNEL_EXCESS_SIGS,
×
208
                            excess_sigs.len()
×
209
                        ),
×
210
                    });
×
211
                }
×
212
                let mut blocks = Vec::with_capacity(excess_sigs.len());
×
213
                for sig in excess_sigs {
×
214
                    let sig_hex = sig.get_signature().to_hex();
×
215
                    debug!(
×
216
                        target: LOG_TARGET,
×
217
                        "A peer has requested a block with kernel with sig {sig_hex}"
×
218
                    );
219
                    match self.blockchain_db.fetch_block_with_kernel(sig).await {
×
220
                        Ok(Some(block)) => blocks.push(block),
×
221
                        Ok(None) => warn!(
×
222
                            target: LOG_TARGET,
×
223
                            "Could not provide requested block containing kernel with sig {sig_hex} to peer because not \
×
224
                             stored"
×
225
                        ),
226
                        Err(e) => warn!(
×
227
                            target: LOG_TARGET,
×
228
                            "Could not provide requested block containing kernel with sig {sig_hex} to peer because: {e}"
×
229
                        ),
230
                    }
231
                }
232
                Ok(NodeCommsResponse::HistoricalBlocks(blocks))
×
233
            },
234
            NodeCommsRequest::FetchBlocksByUtxos(commitments) => {
×
235
                if commitments.len() > MAX_REQUEST_BY_UTXO_HASHES {
×
236
                    return Err(CommsInterfaceError::InvalidRequest {
×
237
                        request: "FetchBlocksByUtxos",
×
238
                        details: format!(
×
239
                            "Exceeded maximum number of utxo hashes in request (max: {}, got:{})",
×
240
                            MAX_REQUEST_BY_UTXO_HASHES,
×
241
                            commitments.len()
×
242
                        ),
×
243
                    });
×
244
                }
×
245
                let mut blocks = Vec::with_capacity(commitments.len());
×
246
                for commitment in commitments {
×
247
                    let commitment_hex = commitment.to_hex();
×
248
                    debug!(
×
249
                        target: LOG_TARGET,
×
250
                        "A peer has requested a block with commitment {commitment_hex}",
×
251
                    );
252
                    match self.blockchain_db.fetch_block_with_utxo(commitment).await {
×
253
                        Ok(Some(block)) => blocks.push(block),
×
254
                        Ok(None) => warn!(
×
255
                            target: LOG_TARGET,
×
256
                            "Could not provide requested block with commitment {commitment_hex} because not stored"
×
257
                        ),
258
                        Err(e) => warn!(
×
259
                            target: LOG_TARGET,
×
260
                            "Could not provide requested block with commitment {commitment_hex} because: {e}"
×
261
                        ),
262
                    }
263
                }
264
                Ok(NodeCommsResponse::HistoricalBlocks(blocks))
×
265
            },
266
            NodeCommsRequest::GetHeaderByHash(hash) => {
×
267
                let header = self.blockchain_db.fetch_chain_header_by_block_hash(hash).await?;
×
268
                Ok(NodeCommsResponse::BlockHeader(header))
×
269
            },
270
            NodeCommsRequest::GetBlockByHash(hash) => {
×
271
                let block = self.blockchain_db.fetch_block_by_hash(hash, false).await?;
×
272
                Ok(NodeCommsResponse::HistoricalBlock(Box::new(block)))
×
273
            },
274
            NodeCommsRequest::GetNewBlockTemplate(request) => {
3✔
275
                let best_block_header = self.blockchain_db.fetch_tip_header().await?;
3✔
276
                let mut last_seen_hash = self.mempool.get_last_seen_hash().await?;
3✔
277
                let mut is_mempool_synced = false;
3✔
278
                let start = Instant::now();
3✔
279
                // this will wait a max of 150ms by default before returning anyway with a potential broken template
280
                // We need to ensure the mempool has seen the latest base node height before we can be confident the
281
                // template is correct
282
                while !is_mempool_synced && start.elapsed().as_millis() < MAX_MEMPOOL_TIMEOUT.into() {
6✔
283
                    if best_block_header.hash() == &last_seen_hash || last_seen_hash == FixedHash::default() {
3✔
284
                        is_mempool_synced = true;
3✔
285
                    } else {
3✔
286
                        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
×
287
                        last_seen_hash = self.mempool.get_last_seen_hash().await?;
×
288
                    }
289
                }
290

291
                if !is_mempool_synced {
3✔
292
                    warn!(
×
293
                        target: LOG_TARGET,
×
294
                        "Mempool out of sync - last seen hash '{}' does not match the tip hash '{}'. This condition \
×
295
                         should auto correct with the next block template request",
×
296
                        last_seen_hash, best_block_header.hash()
×
297
                    );
298
                }
3✔
299
                let mut header = BlockHeader::from_previous(best_block_header.header());
3✔
300
                let constants = self.consensus_manager.consensus_constants(header.height);
3✔
301
                header.version = constants.blockchain_version().into();
3✔
302
                header.pow.pow_algo = request.algo;
3✔
303

304
                let constants_weight = constants.max_block_transaction_weight();
3✔
305
                let asking_weight = if request.max_weight > constants_weight || request.max_weight == 0 {
3✔
306
                    constants_weight
3✔
307
                } else {
308
                    request.max_weight
×
309
                };
310

311
                debug!(
3✔
312
                    target: LOG_TARGET,
×
313
                    "Fetching transactions with a maximum weight of {asking_weight} for the template"
×
314
                );
315
                let transactions = self
3✔
316
                    .mempool
3✔
317
                    .retrieve(asking_weight)
3✔
318
                    .await?
3✔
319
                    .into_iter()
3✔
320
                    .map(|tx| Arc::try_unwrap(tx).unwrap_or_else(|tx| (*tx).clone()))
8✔
321
                    .collect::<Vec<_>>();
3✔
322

323
                debug!(
3✔
324
                    target: LOG_TARGET,
×
325
                    "Adding {} transaction(s) to new block template",
×
326
                    transactions.len(),
×
327
                );
328

329
                let prev_hash = header.prev_hash;
3✔
330
                let height = header.height;
3✔
331

332
                let block = header.into_builder().with_transactions(transactions).build();
3✔
333
                let block_hash = block.hash();
3✔
334
                let block_template = NewBlockTemplate::from_block(
3✔
335
                    block,
3✔
336
                    self.get_target_difficulty_for_next_block(request.algo, constants, prev_hash)
3✔
337
                        .await?,
3✔
338
                    self.consensus_manager.get_block_reward_at(height),
3✔
339
                    is_mempool_synced,
3✔
340
                )?;
×
341

342
                debug!(target: LOG_TARGET,
3✔
343
                    "New block template requested and prepared at height: #{}, target difficulty: {}, block hash: `{}`, weight: {}, {}",
×
344
                    block_template.header.height,
345
                    block_template.target_difficulty,
346
                    block_hash.to_hex(),
×
347
                    block_template
×
348
                        .body
×
349
                        .calculate_weight(constants.transaction_weight_params())
×
350
                        .map_err(|e| CommsInterfaceError::InternalError(e.to_string()))?,
×
351
                    block_template.body.to_counts_string()
×
352
                );
353

354
                Ok(NodeCommsResponse::NewBlockTemplate(block_template))
3✔
355
            },
356
            NodeCommsRequest::GetNewBlock(block_template) => {
3✔
357
                let height = block_template.header.height;
3✔
358
                let target_difficulty = block_template.target_difficulty;
3✔
359
                let block = self.blockchain_db.prepare_new_block(block_template).await?;
3✔
360
                let constants = self.consensus_manager.consensus_constants(block.header.height);
3✔
361
                debug!(target: LOG_TARGET,
3✔
362
                    "Prepared block: #{}, target difficulty: {}, block hash: `{}`, weight: {}, {}",
×
363
                    height,
364
                    target_difficulty,
365
                    block.hash().to_hex(),
×
366
                    block
×
367
                        .body
×
368
                        .calculate_weight(constants.transaction_weight_params())
×
369
                        .map_err(|e| CommsInterfaceError::InternalError(e.to_string()))?,
×
370
                    block.body.to_counts_string()
×
371
                );
372
                Ok(NodeCommsResponse::NewBlock {
3✔
373
                    success: true,
3✔
374
                    error: None,
3✔
375
                    block: Some(block),
3✔
376
                })
3✔
377
            },
378
            NodeCommsRequest::GetBlockFromAllChains(hash) => {
4✔
379
                let block_hex = hash.to_hex();
4✔
380
                debug!(
4✔
381
                    target: LOG_TARGET,
×
382
                    "A peer has requested a block with hash {block_hex}"
×
383
                );
384

385
                #[allow(clippy::blocks_in_conditions)]
386
                let maybe_block = match self
4✔
387
                    .blockchain_db
4✔
388
                    .fetch_block_by_hash(hash, true)
4✔
389
                    .await
4✔
390
                    .unwrap_or_else(|e| {
4✔
391
                        warn!(
×
392
                            target: LOG_TARGET,
×
393
                            "Could not provide requested block {block_hex} to peer because: {e}",
×
394
                        );
395

396
                        None
×
397
                    }) {
×
398
                    None => self.blockchain_db.fetch_orphan(hash).await.map_or_else(
1✔
399
                        |e| {
1✔
400
                            warn!(
1✔
401
                                target: LOG_TARGET,
×
402
                                "Could not provide requested block {block_hex} to peer because: {e}"
×
403
                            );
404

405
                            None
1✔
406
                        },
1✔
407
                        Some,
408
                    ),
409
                    Some(block) => Some(block.into_block()),
3✔
410
                };
411

412
                Ok(NodeCommsResponse::Block(Box::new(maybe_block)))
4✔
413
            },
414
            NodeCommsRequest::FetchKernelByExcessSig(signature) => {
1✔
415
                let kernels = match self.blockchain_db.fetch_kernel_by_excess_sig(signature).await {
1✔
416
                    Ok(Some((kernel, _))) => vec![kernel],
1✔
417
                    Ok(None) => vec![],
×
418
                    Err(err) => {
×
419
                        error!(target: LOG_TARGET, "Could not fetch kernel {err}");
×
420
                        return Err(err.into());
×
421
                    },
422
                };
423

424
                Ok(NodeCommsResponse::TransactionKernels(kernels))
1✔
425
            },
426
            NodeCommsRequest::FetchMempoolTransactionsByExcessSigs { excess_sigs } => {
4✔
427
                let (transactions, not_found) = self.mempool.retrieve_by_excess_sigs(excess_sigs).await?;
4✔
428
                Ok(NodeCommsResponse::FetchMempoolTransactionsByExcessSigsResponse(
4✔
429
                    FetchMempoolTransactionsResponse {
4✔
430
                        transactions,
4✔
431
                        not_found,
4✔
432
                    },
4✔
433
                ))
4✔
434
            },
435
            NodeCommsRequest::FetchValidatorNodesKeys {
436
                height,
×
437
                validator_network,
×
438
            } => {
439
                let active_validator_nodes = self
×
440
                    .blockchain_db
×
441
                    .fetch_active_validator_nodes(height, validator_network)
×
442
                    .await?;
×
443
                Ok(NodeCommsResponse::FetchValidatorNodesKeysResponse(
×
444
                    active_validator_nodes,
×
445
                ))
×
446
            },
447
            NodeCommsRequest::GetValidatorNode {
448
                sidechain_id,
×
449
                public_key,
×
450
            } => {
451
                let vn = self.blockchain_db.get_validator_node(sidechain_id, public_key).await?;
×
452
                Ok(NodeCommsResponse::GetValidatorNode(vn))
×
453
            },
454
            NodeCommsRequest::FetchTemplateRegistrations {
455
                start_height,
×
456
                end_height,
×
457
            } => {
458
                let template_registrations = self
×
459
                    .blockchain_db
×
460
                    .fetch_template_registrations(start_height..=end_height)
×
461
                    .await?;
×
462
                Ok(NodeCommsResponse::FetchTemplateRegistrationsResponse(
×
463
                    template_registrations,
×
464
                ))
×
465
            },
466
            NodeCommsRequest::FetchUnspentUtxosInBlock { block_hash } => {
×
467
                let utxos = self.blockchain_db.fetch_outputs_in_block(block_hash).await?;
×
468
                Ok(NodeCommsResponse::TransactionOutputs(utxos))
×
469
            },
470
            NodeCommsRequest::FetchMinedInfoByPayRef(payref) => {
×
471
                let output_info = self.blockchain_db.fetch_mined_info_by_payref(payref).await?;
×
472
                Ok(NodeCommsResponse::MinedInfo(output_info))
×
473
            },
474
            NodeCommsRequest::FetchMinedInfoByOutputHash(output_hash) => {
×
475
                let output_info = self.blockchain_db.fetch_mined_info_by_output_hash(output_hash).await?;
×
476
                Ok(NodeCommsResponse::MinedInfo(output_info))
×
477
            },
478
            NodeCommsRequest::FetchOutputMinedInfo(output_hash) => {
×
479
                let output_info = self.blockchain_db.fetch_output(output_hash).await?;
×
480
                Ok(NodeCommsResponse::OutputMinedInfo(output_info))
×
481
            },
482
            NodeCommsRequest::CheckOutputSpentStatus(output_hash) => {
×
483
                let input_info = self.blockchain_db.fetch_input(output_hash).await?;
×
484
                Ok(NodeCommsResponse::InputMinedInfo(input_info))
×
485
            },
486
            NodeCommsRequest::FetchValidatorNodeChanges { epoch, sidechain_id } => {
×
487
                let added_validators = self
×
488
                    .blockchain_db
×
489
                    .fetch_validators_activating_in_epoch(sidechain_id.clone(), epoch)
×
490
                    .await?;
×
491

492
                let exit_validators = self
×
493
                    .blockchain_db
×
494
                    .fetch_validators_exiting_in_epoch(sidechain_id.clone(), epoch)
×
495
                    .await?;
×
496

497
                info!(
×
498
                    target: LOG_TARGET,
×
499
                    "Fetched {} validators activating and {} validators exiting in epoch {}",
×
500
                    added_validators.len(),
×
501
                    exit_validators.len(),
×
502
                    epoch,
503
                );
504

505
                let mut node_changes = Vec::with_capacity(added_validators.len() + exit_validators.len());
×
506

507
                node_changes.extend(added_validators.into_iter().map(|vn| ValidatorNodeChange::Add {
×
508
                    registration: vn.original_registration.into(),
×
509
                    activation_epoch: vn.activation_epoch,
×
510
                    minimum_value_promise: vn.minimum_value_promise,
×
511
                    shard_key: vn.shard_key,
×
512
                }));
×
513

514
                node_changes.extend(exit_validators.into_iter().map(|vn| ValidatorNodeChange::Remove {
×
515
                    public_key: vn.public_key,
×
516
                }));
×
517

518
                Ok(NodeCommsResponse::FetchValidatorNodeChangesResponse(node_changes))
×
519
            },
520
        }
521
    }
94✔
522

523
    /// Handles a `NewBlock` message. Only a single `NewBlock` message can be handled at once to prevent extraneous
524
    /// requests for the full block.
525
    /// This may (asynchronously) block until the other request(s) complete or time out and so should typically be
526
    /// executed in a dedicated task.
527
    pub async fn handle_new_block_message(
21✔
528
        &mut self,
21✔
529
        new_block: NewBlock,
21✔
530
        source_peer: NodeId,
21✔
531
    ) -> Result<(), CommsInterfaceError> {
21✔
532
        let block_hash = new_block.header.hash();
21✔
533

534
        if self.blockchain_db.inner().is_add_block_disabled() {
21✔
535
            info!(
×
536
                target: LOG_TARGET,
×
537
                "Ignoring block message ({}) because add_block is locked",
×
538
                block_hash.to_hex()
×
539
            );
540
            return Ok(());
×
541
        }
21✔
542

543
        // Lets check if the block exists before we try and ask for a complete block
544
        if self.check_exists_and_not_bad_block(block_hash).await? {
21✔
545
            return Ok(());
×
546
        }
21✔
547

548
        // lets check that the difficulty at least matches 50% of the tip header. The max difficulty drop is 16%, thus
549
        // 50% is way more than that and in order to attack the node, you need 50% of the mining power. We cannot check
550
        // the target difficulty as orphan blocks dont have a target difficulty. All we care here is that bad
551
        // blocks are not free to make, and that they are more expensive to make then they are to validate. As
552
        // soon as a block can be linked to the main chain, a proper full proof of work check will
553
        // be done before any other validation.
554
        self.check_min_block_difficulty(&new_block).await?;
21✔
555

556
        {
557
            // we use a double lock to make sure we can only reconcile one unique block at a time. We may receive the
558
            // same block from multiple peer near simultaneously. We should only reconcile each unique block once.
559
            let read_lock = self.list_of_reconciling_blocks.read().await;
21✔
560
            if read_lock.contains(&block_hash) {
21✔
561
                debug!(
×
562
                    target: LOG_TARGET,
×
563
                    "Block with hash `{}` is already being reconciled",
×
564
                    block_hash.to_hex()
×
565
                );
566
                return Ok(());
×
567
            }
21✔
568
        }
569
        {
570
            let mut write_lock = self.list_of_reconciling_blocks.write().await;
21✔
571
            if self.check_exists_and_not_bad_block(block_hash).await? {
21✔
572
                return Ok(());
×
573
            }
21✔
574

575
            if !write_lock.insert(block_hash) {
21✔
576
                debug!(
×
577
                    target: LOG_TARGET,
×
578
                    "Block with hash `{}` is already being reconciled",
×
579
                    block_hash.to_hex()
×
580
                );
581
                return Ok(());
×
582
            }
21✔
583
        }
584

585
        debug!(
21✔
586
            target: LOG_TARGET,
×
587
            "Block with hash `{}` is unknown. Constructing block from known mempool transactions / requesting missing \
×
588
             transactions from peer '{}'.",
×
589
            block_hash.to_hex(),
×
590
            source_peer
591
        );
592

593
        let result = self.reconcile_and_add_block(source_peer.clone(), new_block).await;
21✔
594

595
        {
596
            let mut write_lock = self.list_of_reconciling_blocks.write().await;
21✔
597
            write_lock.remove(&block_hash);
21✔
598
        }
599
        result?;
21✔
600
        Ok(())
18✔
601
    }
21✔
602

603
    async fn check_min_block_difficulty(&self, new_block: &NewBlock) -> Result<(), CommsInterfaceError> {
21✔
604
        let constants = self.consensus_manager.consensus_constants(new_block.header.height);
21✔
605
        let gen_hash = *self.consensus_manager.get_genesis_block().hash();
21✔
606
        let mut min_difficulty = constants.min_pow_difficulty(new_block.header.pow.pow_algo);
21✔
607
        let mut header = self.blockchain_db.fetch_last_chain_header().await?;
21✔
608
        loop {
609
            if new_block.header.pow_algo() == header.header().pow_algo() {
21✔
610
                min_difficulty = max(
21✔
611
                    header
21✔
612
                        .accumulated_data()
21✔
613
                        .target_difficulty
21✔
614
                        .checked_div_u64(2)
21✔
615
                        .unwrap_or(min_difficulty),
21✔
616
                    min_difficulty,
21✔
617
                );
21✔
618
                break;
21✔
619
            }
×
620
            if header.height() == 0 {
×
621
                break;
×
622
            }
×
623
            // we have not reached gen block, and the pow algo does not match, so lets go further back
624
            header = self
×
625
                .blockchain_db
×
626
                .fetch_chain_header(header.height().saturating_sub(1))
×
627
                .await?;
×
628
        }
629
        let achieved = match new_block.header.pow_algo() {
21✔
630
            PowAlgorithm::RandomXM => monero_randomx_difficulty(
×
631
                &new_block.header,
×
632
                &self.randomx_factory,
×
633
                &gen_hash,
×
634
                &self.consensus_manager,
×
635
            )?,
×
636
            PowAlgorithm::Sha3x => sha3x_difficulty(&new_block.header)?,
21✔
637
            PowAlgorithm::RandomXT => {
638
                let vm_key = *self
×
639
                    .blockchain_db
×
640
                    .fetch_chain_header(tari_rx_vm_key_height(header.height()))
×
641
                    .await?
×
642
                    .hash();
×
643
                tari_randomx_difficulty(&new_block.header, &self.randomx_factory, &vm_key)?
×
644
            },
645
            PowAlgorithm::Cuckaroo => {
646
                let constants = self.consensus_manager.consensus_constants(new_block.header.height);
×
647
                let cuckaroo_cycle = constants.cuckaroo_cycle_length();
×
648
                let edge_bits = constants.cuckaroo_edge_bits();
×
649
                cuckaroo_difficulty(&new_block.header, cuckaroo_cycle, edge_bits)?
×
650
            },
651
        };
652
        if achieved < min_difficulty {
21✔
653
            debug!(
×
654
                target: LOG_TARGET,
×
655
                "Block failed with invalid pow: {new_block}"
×
656
            );
657
            return Err(CommsInterfaceError::InvalidBlockHeader(
×
658
                BlockHeaderValidationError::ProofOfWorkError(PowError::AchievedDifficultyBelowMin {
×
659
                    minimum: min_difficulty,
×
660
                    achieved,
×
661
                }),
×
662
            ));
×
663
        }
21✔
664
        Ok(())
21✔
665
    }
21✔
666

667
    async fn check_exists_and_not_bad_block(&self, block: FixedHash) -> Result<bool, CommsInterfaceError> {
42✔
668
        if self.blockchain_db.chain_header_or_orphan_exists(block).await? {
42✔
669
            debug!(
×
670
                target: LOG_TARGET,
×
671
                "Block with hash `{}` already stored",
×
672
                block.to_hex()
×
673
            );
674
            return Ok(true);
×
675
        }
42✔
676
        let (is_bad_block, reason) = self.blockchain_db.bad_block_exists(block).await?;
42✔
677
        if is_bad_block {
42✔
678
            debug!(
×
679
                target: LOG_TARGET,
×
680
                "Block with hash `{}` already validated as a bad block due to `{}`",
×
681
                block.to_hex(), reason
×
682
            );
683
            return Err(CommsInterfaceError::ChainStorageError(
×
684
                ChainStorageError::ValidationError {
×
685
                    source: ValidationError::BadBlockFound {
×
686
                        hash: block.to_hex(),
×
687
                        reason,
×
688
                    },
×
689
                },
×
690
            ));
×
691
        }
42✔
692
        Ok(false)
42✔
693
    }
42✔
694

695
    async fn reconcile_and_add_block(
21✔
696
        &mut self,
21✔
697
        source_peer: NodeId,
21✔
698
        new_block: NewBlock,
21✔
699
    ) -> Result<(), CommsInterfaceError> {
21✔
700
        let block = self.reconcile_block(source_peer.clone(), new_block).await?;
21✔
701
        self.handle_block(block, Some(source_peer)).await?;
20✔
702
        Ok(())
18✔
703
    }
21✔
704

705
    #[allow(clippy::too_many_lines)]
706
    async fn reconcile_block(
21✔
707
        &mut self,
21✔
708
        source_peer: NodeId,
21✔
709
        new_block: NewBlock,
21✔
710
    ) -> Result<Block, CommsInterfaceError> {
21✔
711
        let NewBlock {
712
            header,
21✔
713
            coinbase_kernels,
21✔
714
            coinbase_outputs,
21✔
715
            kernel_excess_sigs: excess_sigs,
21✔
716
        } = new_block;
21✔
717
        // If the block is empty, we dont have to ask for the block, as we already have the full block available
718
        // to us.
719
        if excess_sigs.is_empty() {
21✔
720
            let block = BlockBuilder::new(header.version)
17✔
721
                .add_outputs(coinbase_outputs)
17✔
722
                .add_kernels(coinbase_kernels)
17✔
723
                .with_header(header)
17✔
724
                .build();
17✔
725
            return Ok(block);
17✔
726
        }
4✔
727

728
        let block_hash = header.hash();
4✔
729
        // We check the current tip and orphan status of the block because we cannot guarantee that mempool state is
730
        // correct and the mmr root calculation is only valid if the block is building on the tip.
731
        let current_meta = self.blockchain_db.get_chain_metadata().await?;
4✔
732
        if header.prev_hash != *current_meta.best_block_hash() {
4✔
733
            debug!(
×
734
                target: LOG_TARGET,
×
735
                "Orphaned block #{}: ({}), current tip is: #{} ({}). We need to fetch the complete block from peer: \
×
736
                 ({})",
×
737
                header.height,
738
                block_hash.to_hex(),
×
739
                current_meta.best_block_height(),
×
740
                current_meta.best_block_hash().to_hex(),
×
741
                source_peer,
742
            );
743
            #[allow(clippy::cast_possible_wrap)]
744
            #[cfg(feature = "metrics")]
745
            metrics::compact_block_tx_misses(header.height).set(excess_sigs.len() as i64);
×
746
            let block = self.request_full_block_from_peer(source_peer, block_hash).await?;
×
747
            return Ok(block);
×
748
        }
4✔
749

750
        // We know that the block is neither and orphan or a coinbase, so lets ask our mempool for the transactions
751
        let (known_transactions, missing_excess_sigs) = self.mempool.retrieve_by_excess_sigs(excess_sigs).await?;
4✔
752
        let known_transactions = known_transactions.into_iter().map(|tx| (*tx).clone()).collect();
4✔
753

754
        #[allow(clippy::cast_possible_wrap)]
755
        #[cfg(feature = "metrics")]
756
        metrics::compact_block_tx_misses(header.height).set(missing_excess_sigs.len() as i64);
4✔
757

758
        let mut builder = BlockBuilder::new(header.version)
4✔
759
            .add_outputs(coinbase_outputs)
4✔
760
            .add_kernels(coinbase_kernels)
4✔
761
            .with_transactions(known_transactions);
4✔
762

763
        if missing_excess_sigs.is_empty() {
4✔
764
            debug!(
×
765
                target: LOG_TARGET,
×
766
                "All transactions for block #{} ({}) found in mempool",
×
767
                header.height,
768
                block_hash.to_hex()
×
769
            );
770
        } else {
771
            debug!(
4✔
772
                target: LOG_TARGET,
×
773
                "Requesting {} unknown transaction(s) from peer '{}'.",
×
774
                missing_excess_sigs.len(),
×
775
                source_peer
776
            );
777

778
            let FetchMempoolTransactionsResponse {
779
                transactions,
4✔
780
                not_found,
4✔
781
            } = self
4✔
782
                .outbound_nci
4✔
783
                .request_transactions_by_excess_sig(source_peer.clone(), missing_excess_sigs)
4✔
784
                .await?;
4✔
785

786
            // Add returned transactions to unconfirmed pool
787
            if !transactions.is_empty() {
4✔
788
                self.mempool.insert_all(transactions.clone()).await?;
×
789
            }
4✔
790

791
            if !not_found.is_empty() {
4✔
792
                warn!(
4✔
793
                    target: LOG_TARGET,
×
794
                    "Peer {} was not able to return all transactions for block #{} ({}). {} transaction(s) not found. \
×
795
                     Requesting full block.",
×
796
                    source_peer,
797
                    header.height,
798
                    block_hash.to_hex(),
×
799
                    not_found.len()
×
800
                );
801

802
                #[cfg(feature = "metrics")]
803
                metrics::compact_block_full_misses(header.height).inc();
4✔
804
                let block = self.request_full_block_from_peer(source_peer, block_hash).await?;
4✔
805
                return Ok(block);
3✔
806
            }
×
807

808
            builder = builder.with_transactions(
×
809
                transactions
×
810
                    .into_iter()
×
811
                    .map(|tx| Arc::try_unwrap(tx).unwrap_or_else(|tx| (*tx).clone()))
×
812
                    .collect(),
×
813
            );
814
        }
815

816
        // NB: Add the header last because `with_transactions` etc updates the current header, but we have the final one
817
        // already
818
        builder = builder.with_header(header.clone());
×
819
        let block = builder.build();
×
820

821
        // Perform a sanity check on the reconstructed block, if the MMR roots don't match then it's possible one or
822
        // more transactions in our mempool had the same excess/signature for a *different* transaction.
823
        // This is extremely unlikely, but still possible. In case of a mismatch, request the full block from the peer.
824
        let (block, mmr_roots) = match self.blockchain_db.calculate_mmr_roots(block).await {
×
825
            Err(_) => {
826
                let block = self.request_full_block_from_peer(source_peer, block_hash).await?;
×
827
                return Ok(block);
×
828
            },
829
            Ok(v) => v,
×
830
        };
831
        if let Err(e) = helpers::check_mmr_roots(&header, &mmr_roots) {
×
832
            warn!(
×
833
                target: LOG_TARGET,
×
834
                "Reconstructed block #{} ({}) failed MMR check validation!. Requesting full block. Error: {}",
×
835
                header.height,
836
                block_hash.to_hex(),
×
837
                e,
838
            );
839

840
            #[cfg(feature = "metrics")]
841
            metrics::compact_block_mmr_mismatch(header.height).inc();
×
842
            let block = self.request_full_block_from_peer(source_peer, block_hash).await?;
×
843
            return Ok(block);
×
844
        }
×
845

846
        Ok(block)
×
847
    }
21✔
848

849
    async fn request_full_block_from_peer(
4✔
850
        &mut self,
4✔
851
        source_peer: NodeId,
4✔
852
        block_hash: BlockHash,
4✔
853
    ) -> Result<Block, CommsInterfaceError> {
4✔
854
        match self
4✔
855
            .outbound_nci
4✔
856
            .request_blocks_by_hashes_from_peer(block_hash, Some(source_peer.clone()))
4✔
857
            .await
4✔
858
        {
859
            Ok(Some(block)) => Ok(block),
3✔
860
            Ok(None) => {
861
                debug!(
1✔
862
                    target: LOG_TARGET,
×
863
                    "Peer `{source_peer}` failed to return the block that was requested."
×
864
                );
865
                Err(CommsInterfaceError::InvalidPeerResponse(format!(
1✔
866
                    "Invalid response from peer `{source_peer}`: Peer failed to provide the block that was propagated"
1✔
867
                )))
1✔
868
            },
869
            Err(CommsInterfaceError::UnexpectedApiResponse) => {
870
                debug!(
×
871
                    target: LOG_TARGET,
×
872
                    "Peer `{source_peer}` sent unexpected API response."
×
873
                );
874
                Err(CommsInterfaceError::UnexpectedApiResponse)
×
875
            },
876
            Err(e) => Err(e),
×
877
        }
878
    }
4✔
879

880
    /// Handle inbound blocks from remote nodes and local services.
881
    ///
882
    /// ## Arguments
883
    /// block - the block to store
884
    /// new_block_msg - propagate this new block message
885
    /// source_peer - the peer that sent this new block message, or None if the block was generated by a local miner
886
    pub async fn handle_block(
27✔
887
        &mut self,
27✔
888
        block: Block,
27✔
889
        source_peer: Option<NodeId>,
27✔
890
    ) -> Result<BlockHash, CommsInterfaceError> {
27✔
891
        let block_hash = block.hash();
27✔
892
        let block_height = block.header.height;
27✔
893

894
        info!(
27✔
895
            target: LOG_TARGET,
×
896
            "Block #{} ({}) received from {}",
×
897
            block_height,
898
            block_hash.to_hex(),
×
899
            source_peer
×
900
                .as_ref()
×
901
                .map(|p| format!("remote peer: {p}"))
×
902
                .unwrap_or_else(|| "local services".to_string())
×
903
        );
904
        debug!(target: LOG_TARGET, "Incoming block: {block}");
27✔
905
        let timer = Instant::now();
27✔
906
        let block = self.hydrate_block(block).await?;
27✔
907

908
        let add_block_result = self.blockchain_db.add_block(block.clone()).await;
27✔
909
        // Create block event on block event stream
910
        match add_block_result {
2✔
911
            Ok(block_add_result) => {
25✔
912
                debug!(
25✔
913
                    target: LOG_TARGET,
×
914
                    "Block #{} ({}) added ({}) to blockchain in {:.2?}",
×
915
                    block_height,
916
                    block_hash.to_hex(),
×
917
                    block_add_result,
918
                    timer.elapsed()
×
919
                );
920

921
                let should_propagate = match &block_add_result {
25✔
922
                    BlockAddResult::Ok(_) => true,
25✔
923
                    BlockAddResult::BlockExists => false,
×
924
                    BlockAddResult::OrphanBlock => false,
×
925
                    BlockAddResult::ChainReorg { .. } => true,
×
926
                };
927

928
                #[cfg(feature = "metrics")]
929
                self.update_block_result_metrics(&block_add_result).await?;
25✔
930

931
                self.publish_block_event(BlockEvent::ValidBlockAdded(block.clone(), block_add_result));
25✔
932

933
                if should_propagate {
25✔
934
                    debug!(
25✔
935
                        target: LOG_TARGET,
×
936
                        "Propagate block ({}) to network.",
×
937
                        block_hash.to_hex()
×
938
                    );
939
                    let exclude_peers = source_peer.into_iter().collect();
25✔
940
                    let new_block_msg = NewBlock::from(&*block);
25✔
941
                    if let Err(e) = self.outbound_nci.propagate_block(new_block_msg, exclude_peers).await {
25✔
942
                        warn!(
×
943
                            target: LOG_TARGET,
×
944
                            "Failed to propagate block ({}) to network: {}.",
×
945
                            block_hash.to_hex(), e
×
946
                        );
947
                    }
25✔
948
                }
×
949
                Ok(block_hash)
25✔
950
            },
951

952
            Err(e @ ChainStorageError::ValidationError { .. }) => {
2✔
953
                #[cfg(feature = "metrics")]
954
                {
2✔
955
                    let block_hash = block.hash();
2✔
956
                    metrics::rejected_blocks(block.header.height, &block_hash).inc();
2✔
957
                }
2✔
958
                warn!(
2✔
959
                    target: LOG_TARGET,
×
960
                    "Peer {} sent an invalid block: {}",
×
961
                    source_peer
×
962
                        .as_ref()
×
963
                        .map(ToString::to_string)
×
964
                        .unwrap_or_else(|| "<local request>".to_string()),
×
965
                    e
966
                );
967
                self.publish_block_event(BlockEvent::AddBlockValidationFailed { block, source_peer });
2✔
968
                Err(e.into())
2✔
969
            },
970

971
            Err(e) => {
×
972
                #[cfg(feature = "metrics")]
973
                metrics::rejected_blocks(block.header.height, &block.hash()).inc();
×
974

975
                self.publish_block_event(BlockEvent::AddBlockErrored { block });
×
976
                Err(e.into())
×
977
            },
978
        }
979
    }
27✔
980

981
    async fn hydrate_block(&mut self, block: Block) -> Result<Arc<Block>, CommsInterfaceError> {
27✔
982
        let block_hash = block.hash();
27✔
983
        let block_height = block.header.height;
27✔
984
        if block.body.inputs().is_empty() {
27✔
985
            debug!(
23✔
986
                target: LOG_TARGET,
×
987
                "Block #{} ({}) contains no inputs so nothing to hydrate",
×
988
                block_height,
989
                block_hash.to_hex(),
×
990
            );
991
            return Ok(Arc::new(block));
23✔
992
        }
4✔
993

994
        let timer = Instant::now();
4✔
995
        let (header, mut inputs, outputs, kernels) = block.dissolve();
4✔
996

997
        let db = self.blockchain_db.inner().db_read_access()?;
4✔
998
        for input in &mut inputs {
8✔
999
            if !input.is_compact() {
4✔
1000
                continue;
1✔
1001
            }
3✔
1002

1003
            let output_mined_info =
3✔
1004
                db.fetch_output(&input.output_hash())?
3✔
1005
                    .ok_or_else(|| CommsInterfaceError::InvalidFullBlock {
3✔
1006
                        hash: block_hash,
×
1007
                        details: format!("Output {} to be spent does not exist in db", input.output_hash()),
×
1008
                    })?;
×
1009

1010
            input.add_output_data(output_mined_info.output);
3✔
1011
        }
1012
        debug!(
4✔
1013
            target: LOG_TARGET,
×
1014
            "Hydrated block #{} ({}) with {} input(s) in {:.2?}",
×
1015
            block_height,
1016
            block_hash.to_hex(),
×
1017
            inputs.len(),
×
1018
            timer.elapsed()
×
1019
        );
1020
        let block = Block::new(header, AggregateBody::new(inputs, outputs, kernels));
4✔
1021
        Ok(Arc::new(block))
4✔
1022
    }
27✔
1023

1024
    fn publish_block_event(&self, event: BlockEvent) {
27✔
1025
        if let Err(event) = self.block_event_sender.send(Arc::new(event)) {
27✔
1026
            debug!(target: LOG_TARGET, "No event subscribers. Event {} dropped.", event.0)
×
1027
        }
27✔
1028
    }
27✔
1029

1030
    #[cfg(feature = "metrics")]
1031
    async fn update_block_result_metrics(&self, block_add_result: &BlockAddResult) -> Result<(), CommsInterfaceError> {
25✔
1032
        fn update_target_difficulty(block: &ChainBlock) {
25✔
1033
            match block.header().pow_algo() {
25✔
1034
                PowAlgorithm::Sha3x => {
25✔
1035
                    metrics::target_difficulty_sha()
25✔
1036
                        .set(i64::try_from(block.accumulated_data().target_difficulty.as_u64()).unwrap_or(i64::MAX));
25✔
1037
                },
25✔
1038
                PowAlgorithm::RandomXM => {
×
1039
                    metrics::target_difficulty_monero_randomx()
×
1040
                        .set(i64::try_from(block.accumulated_data().target_difficulty.as_u64()).unwrap_or(i64::MAX));
×
1041
                },
×
1042
                PowAlgorithm::RandomXT => {
×
1043
                    metrics::target_difficulty_tari_randomx()
×
1044
                        .set(i64::try_from(block.accumulated_data().target_difficulty.as_u64()).unwrap_or(i64::MAX));
×
1045
                },
×
1046
                PowAlgorithm::Cuckaroo => {
×
1047
                    metrics::target_difficulty_cuckaroo()
×
1048
                        .set(i64::try_from(block.accumulated_data().target_difficulty.as_u64()).unwrap_or(i64::MAX));
×
1049
                },
×
1050
            }
1051
        }
25✔
1052

1053
        match block_add_result {
25✔
1054
            BlockAddResult::Ok(ref block) => {
25✔
1055
                update_target_difficulty(block);
25✔
1056
                self.update_difficulty_indicators(block.height()).await?;
25✔
1057
                #[allow(clippy::cast_possible_wrap)]
1058
                metrics::tip_height().set(block.height() as i64);
25✔
1059
                let utxo_set_size = self.blockchain_db.utxo_count().await?;
25✔
1060
                metrics::utxo_set_size().set(utxo_set_size.try_into().unwrap_or(i64::MAX));
25✔
1061
            },
1062
            BlockAddResult::ChainReorg { added, removed } => {
×
1063
                if let Some(fork_height) = added.last().map(|b| b.height()) {
×
1064
                    #[allow(clippy::cast_possible_wrap)]
1065
                    metrics::tip_height().set(fork_height as i64);
×
1066
                    metrics::reorg(fork_height, added.len(), removed.len()).inc();
×
1067

1068
                    let utxo_set_size = self.blockchain_db.utxo_count().await?;
×
1069
                    metrics::utxo_set_size().set(utxo_set_size.try_into().unwrap_or(i64::MAX));
×
1070
                }
×
1071
                for block in added {
×
1072
                    update_target_difficulty(block);
×
1073
                    self.update_difficulty_indicators(block.height()).await?;
×
1074
                }
1075
            },
1076
            BlockAddResult::OrphanBlock => {
×
1077
                metrics::orphaned_blocks().inc();
×
1078
            },
×
1079
            _ => {},
×
1080
        }
1081
        Ok(())
25✔
1082
    }
25✔
1083

1084
    #[cfg(feature = "metrics")]
1085
    async fn update_difficulty_indicators(&self, tip: u64) -> Result<(), CommsInterfaceError> {
25✔
1086
        // Use canonical height from tip where reorgs are highly unlikely
1087
        if tip <= DIFF_INDICATOR_LAG {
25✔
1088
            // Not enough history yet; clear or skip
1089
            metrics::accumulated_difficulty_indicator().set(0);
25✔
1090
            metrics::target_difficulty_indicator().set(0);
25✔
1091
            metrics::difficulty_indicator_height().set(0);
25✔
1092
            metrics::target_difficulty().set(0);
25✔
1093
            metrics::accumulated_difficulty_exp2().set(0);
25✔
1094
            metrics::accumulated_difficulty_sig53().set(0);
25✔
1095
            metrics::accumulated_difficulty_as_f64().set(0.0);
25✔
1096
            return Ok(());
25✔
1097
        }
×
1098
        let height = tip - DIFF_INDICATOR_LAG;
×
1099
        let chain_header = self.blockchain_db.fetch_chain_header(height).await?;
×
1100

1101
        // Compute indicators in millibits as `logâ‚‚(value) * 1000` to make huge numbers fathomable in a time-series
1102
        // graph with enough granularity
1103
        let acc_diff_milli_bits = metrics::log2_u512(&chain_header.accumulated_data().total_accumulated_difficulty)
×
1104
            .map(metrics::milli_bits)
×
1105
            .unwrap_or(0);
×
1106
        let target_diff_milli_bits =
×
1107
            metrics::log2_u128(u128::from(chain_header.accumulated_data().target_difficulty.as_u64()))
×
1108
                .map(metrics::milli_bits)
×
1109
                .unwrap_or(0);
×
1110
        let (acc_diff_exp2, acc_diff_sig53) =
×
1111
            metrics::u512_exp2_sig53(&chain_header.accumulated_data().total_accumulated_difficulty).unwrap_or((0, 0));
×
1112
        let acc_diff_as_f64 =
×
1113
            metrics::approximate_u512_with_f64(&chain_header.accumulated_data().total_accumulated_difficulty)
×
1114
                .unwrap_or(0.0);
×
1115

1116
        // Publish
1117
        metrics::accumulated_difficulty_indicator().set(acc_diff_milli_bits);
×
1118
        metrics::target_difficulty_indicator().set(target_diff_milli_bits);
×
1119
        #[allow(clippy::cast_possible_wrap)]
1120
        metrics::difficulty_indicator_height().set(height as i64);
×
1121
        #[allow(clippy::cast_possible_wrap)]
1122
        metrics::target_difficulty()
×
1123
            .set(i64::try_from(chain_header.accumulated_data().target_difficulty.as_u64()).unwrap_or(i64::MAX));
×
1124
        metrics::accumulated_difficulty_exp2().set(acc_diff_exp2);
×
1125
        metrics::accumulated_difficulty_sig53().set(acc_diff_sig53);
×
1126
        metrics::accumulated_difficulty_as_f64().set(acc_diff_as_f64);
×
1127

1128
        Ok(())
×
1129
    }
25✔
1130

1131
    async fn get_target_difficulty_for_next_block(
3✔
1132
        &self,
3✔
1133
        pow_algo: PowAlgorithm,
3✔
1134
        constants: &ConsensusConstants,
3✔
1135
        current_block_hash: HashOutput,
3✔
1136
    ) -> Result<Difficulty, CommsInterfaceError> {
3✔
1137
        let target_difficulty = self
3✔
1138
            .blockchain_db
3✔
1139
            .fetch_target_difficulty_for_next_block(pow_algo, current_block_hash)
3✔
1140
            .await?;
3✔
1141

1142
        let target = target_difficulty.calculate(
3✔
1143
            constants.min_pow_difficulty(pow_algo),
3✔
1144
            constants.max_pow_difficulty(pow_algo),
3✔
1145
        );
1146
        trace!(target: LOG_TARGET, "Target difficulty {target} for PoW {pow_algo}");
3✔
1147
        Ok(target)
3✔
1148
    }
3✔
1149

1150
    pub async fn get_last_seen_hash(&self) -> Result<FixedHash, CommsInterfaceError> {
×
1151
        self.mempool.get_last_seen_hash().await.map_err(|e| e.into())
×
1152
    }
×
1153
}
1154

1155
impl<B> Clone for InboundNodeCommsHandlers<B> {
1156
    fn clone(&self) -> Self {
115✔
1157
        Self {
115✔
1158
            block_event_sender: self.block_event_sender.clone(),
115✔
1159
            blockchain_db: self.blockchain_db.clone(),
115✔
1160
            mempool: self.mempool.clone(),
115✔
1161
            consensus_manager: self.consensus_manager.clone(),
115✔
1162
            list_of_reconciling_blocks: self.list_of_reconciling_blocks.clone(),
115✔
1163
            outbound_nci: self.outbound_nci.clone(),
115✔
1164
            connectivity: self.connectivity.clone(),
115✔
1165
            randomx_factory: self.randomx_factory.clone(),
115✔
1166
        }
115✔
1167
    }
115✔
1168
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc