• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 15998966652

01 Jul 2025 12:13PM UTC coverage: 71.633% (-0.05%) from 71.686%
15998966652

push

github

web-flow
fix!: payref migration and indexes, add grpc query via output hash (#7266)

Description
---
- Added gRPC method to retrieve output information via output hash
- Updated the gRPC payref search to return spent and unspent output
information
- Added migration to rebuild payref indexes due to missing payrefs
(_added periodic call to `LMDBStore::resize_if_required`_)
- Fixed error in lmdb migration counter where it recorded a higher
version done than what it should

Fixes #7263 

Motivation and Context
---
- Payref information was deleted when the outputs were spent.
- With migrations, payref information was only created for the current
output set, not all outputs.
- Payref lookups for spent outputs were not possible.
- Output and payref information was not possible via output hash.

How Has This Been Tested?
---
- New lmdb migration tested at the system level for fresh and existing
base nodes.
- New gRPC method system-level testing [_tested with block explorer
upgrade_].
- General system-level testing.

Fresh base node with a restart afterwards
```rust
2025-06-27 09:09:24.013840800 [c::cs::lmdb_db::lmdb_db] INFO  [MIGRATIONS] Blockchain database is at v0 (required version: 5)
2025-06-27 09:09:24.013873800 [c::cs::lmdb_db::lmdb_db] INFO  [MIGRATIONS] v1: No accumulated difficulty found for block height 14999
2025-06-27 09:09:24.013877800 [c::cs::lmdb_db::lmdb_db] INFO  [MIGRATIONS] v1: No migration to perform for version network
2025-06-27 09:09:24.013883100 [c::cs::lmdb_db::lmdb_db] INFO  [MIGRATIONS] v2: Starting PayRef migration
2025-06-27 09:09:24.013899700 [c::cs::lmdb_db::lmdb_db] INFO  [MIGRATIONS] v2: Cleared PayRef index
2025-06-27 09:09:24.038671500 [c::cs::lmdb_db::lmdb_db] INFO  [MIGRATIONS] v2: PayRef index rebuild completed
2025-06-27 09:09:24.038693200 [c::cs::lmdb_db::lmdb_db] INFO  [MIGRATIONS] Migrated database from version 2 to version 3
2025-06-27 09:09:24.039646900 [c::cs::lmdb_db::lmdb_db] INFO  [MIGRATIONS] Migrated database from ver... (continued)

24 of 136 new or added lines in 11 files covered. (17.65%)

68 existing lines in 17 files now uncovered.

82832 of 115634 relevant lines covered (71.63%)

239334.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

51.89
/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs
1
// Copyright 2019. The Tari Project
2
//
3
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
// following conditions are met:
5
//
6
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
// disclaimer.
8
//
9
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
// following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
// products derived from this software without specific prior written permission.
14
//
15
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
#[cfg(feature = "metrics")]
24
use std::convert::{TryFrom, TryInto};
25
use std::{cmp::max, collections::HashSet, sync::Arc, time::Instant};
26

27
use log::*;
28
use strum_macros::Display;
29
use tari_common_types::types::{BlockHash, FixedHash, HashOutput};
30
use tari_comms::{connectivity::ConnectivityRequester, peer_manager::NodeId};
31
use tari_utilities::hex::Hex;
32
use tokio::sync::RwLock;
33

34
#[cfg(feature = "metrics")]
35
use crate::base_node::metrics;
36
use crate::{
37
    base_node::comms_interface::{
38
        error::CommsInterfaceError,
39
        local_interface::BlockEventSender,
40
        FetchMempoolTransactionsResponse,
41
        NodeCommsRequest,
42
        NodeCommsResponse,
43
        OutboundNodeCommsInterface,
44
    },
45
    blocks::{Block, BlockBuilder, BlockHeader, BlockHeaderValidationError, ChainBlock, NewBlock, NewBlockTemplate},
46
    chain_storage::{async_db::AsyncBlockchainDb, BlockAddResult, BlockchainBackend, ChainStorageError},
47
    consensus::{ConsensusConstants, ConsensusManager},
48
    mempool::Mempool,
49
    proof_of_work::{
50
        monero_randomx_difficulty,
51
        randomx_factory::RandomXFactory,
52
        sha3x_difficulty,
53
        tari_randomx_difficulty,
54
        Difficulty,
55
        PowAlgorithm,
56
        PowError,
57
    },
58
    transactions::aggregated_body::AggregateBody,
59
    validation::{helpers, tari_rx_vm_key_height, ValidationError},
60
};
61

62
const LOG_TARGET: &str = "c::bn::comms_interface::inbound_handler";
63
const MAX_REQUEST_BY_BLOCK_HASHES: usize = 100;
64
const MAX_REQUEST_BY_KERNEL_EXCESS_SIGS: usize = 100;
65
const MAX_REQUEST_BY_UTXO_HASHES: usize = 100;
66
const MAX_MEMPOOL_TIMEOUT: u64 = 150;
67

68
/// Events that can be published on the Validated Block Event Stream
69
/// Broadcast is to notify subscribers if this is a valid propagated block event
70
#[derive(Debug, Clone, Display)]
71
pub enum BlockEvent {
72
    ValidBlockAdded(Arc<Block>, BlockAddResult),
73
    AddBlockValidationFailed {
74
        block: Arc<Block>,
75
        source_peer: Option<NodeId>,
76
    },
77
    AddBlockErrored {
78
        block: Arc<Block>,
79
    },
80
    BlockSyncComplete(Arc<ChainBlock>, u64),
81
    BlockSyncRewind(Vec<Arc<ChainBlock>>),
82
}
83

84
/// The InboundNodeCommsInterface is used to handle all received inbound requests from remote nodes.
85
pub struct InboundNodeCommsHandlers<B> {
86
    block_event_sender: BlockEventSender,
87
    blockchain_db: AsyncBlockchainDb<B>,
88
    mempool: Mempool,
89
    consensus_manager: ConsensusManager,
90
    list_of_reconciling_blocks: Arc<RwLock<HashSet<HashOutput>>>,
91
    outbound_nci: OutboundNodeCommsInterface,
92
    connectivity: ConnectivityRequester,
93
    randomx_factory: RandomXFactory,
94
}
95

96
impl<B> InboundNodeCommsHandlers<B>
97
where B: BlockchainBackend + 'static
98
{
99
    /// Construct a new InboundNodeCommsInterface.
100
    pub fn new(
56✔
101
        block_event_sender: BlockEventSender,
56✔
102
        blockchain_db: AsyncBlockchainDb<B>,
56✔
103
        mempool: Mempool,
56✔
104
        consensus_manager: ConsensusManager,
56✔
105
        outbound_nci: OutboundNodeCommsInterface,
56✔
106
        connectivity: ConnectivityRequester,
56✔
107
        randomx_factory: RandomXFactory,
56✔
108
    ) -> Self {
56✔
109
        Self {
56✔
110
            block_event_sender,
56✔
111
            blockchain_db,
56✔
112
            mempool,
56✔
113
            consensus_manager,
56✔
114
            list_of_reconciling_blocks: Arc::new(RwLock::new(HashSet::new())),
56✔
115
            outbound_nci,
56✔
116
            connectivity,
56✔
117
            randomx_factory,
56✔
118
        }
56✔
119
    }
56✔
120

121
    /// Handle inbound node comms requests from remote nodes and local services.
122
    #[allow(clippy::too_many_lines)]
123
    pub async fn handle_request(&self, request: NodeCommsRequest) -> Result<NodeCommsResponse, CommsInterfaceError> {
102✔
124
        trace!(target: LOG_TARGET, "Handling remote request {}", request);
102✔
125
        match request {
102✔
126
            NodeCommsRequest::GetChainMetadata => Ok(NodeCommsResponse::ChainMetadata(
127
                self.blockchain_db.get_chain_metadata().await?,
82✔
128
            )),
129
            NodeCommsRequest::GetTargetDifficultyNextBlock(algo) => {
×
130
                let header = self.blockchain_db.fetch_tip_header().await?;
×
131
                let constants = self.consensus_manager.consensus_constants(header.header().height);
×
132
                let target_difficulty = self
×
133
                    .get_target_difficulty_for_next_block(algo, constants, *header.hash())
×
134
                    .await?;
×
135
                Ok(NodeCommsResponse::TargetDifficulty(target_difficulty))
×
136
            },
137
            NodeCommsRequest::FetchHeaders(range) => {
1✔
138
                let headers = self.blockchain_db.fetch_chain_headers(range).await?;
1✔
139
                Ok(NodeCommsResponse::BlockHeaders(headers))
1✔
140
            },
141
            NodeCommsRequest::FetchHeadersByHashes(block_hashes) => {
×
142
                if block_hashes.len() > MAX_REQUEST_BY_BLOCK_HASHES {
×
143
                    return Err(CommsInterfaceError::InvalidRequest {
×
144
                        request: "FetchHeadersByHashes",
×
145
                        details: format!(
×
146
                            "Exceeded maximum block hashes request (max: {}, got:{})",
×
147
                            MAX_REQUEST_BY_BLOCK_HASHES,
×
148
                            block_hashes.len()
×
149
                        ),
×
150
                    });
×
151
                }
×
152
                let mut block_headers = Vec::with_capacity(block_hashes.len());
×
153
                for block_hash in block_hashes {
×
154
                    let block_hex = block_hash.to_hex();
×
155
                    match self.blockchain_db.fetch_chain_header_by_block_hash(block_hash).await? {
×
156
                        Some(block_header) => {
×
157
                            block_headers.push(block_header);
×
158
                        },
×
159
                        None => {
160
                            error!(target: LOG_TARGET, "Could not fetch headers with hashes:{}", block_hex);
×
161
                            return Err(CommsInterfaceError::InternalError(format!(
×
162
                                "Could not fetch headers with hashes:{}",
×
163
                                block_hex
×
164
                            )));
×
165
                        },
166
                    }
167
                }
168
                Ok(NodeCommsResponse::BlockHeaders(block_headers))
×
169
            },
170
            NodeCommsRequest::FetchMatchingUtxos(utxo_hashes) => {
1✔
171
                let mut res = Vec::with_capacity(utxo_hashes.len());
1✔
172
                for (output, spent) in (self
1✔
173
                    .blockchain_db
1✔
174
                    .fetch_outputs_with_spend_status_at_tip(utxo_hashes)
1✔
175
                    .await?)
1✔
176
                    .into_iter()
1✔
177
                    .flatten()
1✔
178
                {
179
                    if !spent {
1✔
180
                        res.push(output);
1✔
181
                    }
1✔
182
                }
183
                Ok(NodeCommsResponse::TransactionOutputs(res))
1✔
184
            },
185
            NodeCommsRequest::FetchMatchingBlocks { range, compact } => {
3✔
186
                let blocks = self.blockchain_db.fetch_blocks(range, compact).await?;
3✔
187
                Ok(NodeCommsResponse::HistoricalBlocks(blocks))
3✔
188
            },
189
            NodeCommsRequest::FetchBlocksByKernelExcessSigs(excess_sigs) => {
×
190
                if excess_sigs.len() > MAX_REQUEST_BY_KERNEL_EXCESS_SIGS {
×
191
                    return Err(CommsInterfaceError::InvalidRequest {
×
192
                        request: "FetchBlocksByKernelExcessSigs",
×
193
                        details: format!(
×
194
                            "Exceeded maximum number of kernel excess sigs in request (max: {}, got:{})",
×
195
                            MAX_REQUEST_BY_KERNEL_EXCESS_SIGS,
×
196
                            excess_sigs.len()
×
197
                        ),
×
198
                    });
×
199
                }
×
200
                let mut blocks = Vec::with_capacity(excess_sigs.len());
×
201
                for sig in excess_sigs {
×
202
                    let sig_hex = sig.get_signature().to_hex();
×
203
                    debug!(
×
204
                        target: LOG_TARGET,
×
205
                        "A peer has requested a block with kernel with sig {}", sig_hex
×
206
                    );
207
                    match self.blockchain_db.fetch_block_with_kernel(sig).await {
×
208
                        Ok(Some(block)) => blocks.push(block),
×
209
                        Ok(None) => warn!(
×
210
                            target: LOG_TARGET,
×
211
                            "Could not provide requested block containing kernel with sig {} to peer because not \
×
212
                             stored",
×
213
                            sig_hex
214
                        ),
215
                        Err(e) => warn!(
×
216
                            target: LOG_TARGET,
×
217
                            "Could not provide requested block containing kernel with sig {} to peer because: {}",
×
218
                            sig_hex,
×
219
                            e.to_string()
×
220
                        ),
221
                    }
222
                }
223
                Ok(NodeCommsResponse::HistoricalBlocks(blocks))
×
224
            },
225
            NodeCommsRequest::FetchBlocksByUtxos(commitments) => {
×
226
                if commitments.len() > MAX_REQUEST_BY_UTXO_HASHES {
×
227
                    return Err(CommsInterfaceError::InvalidRequest {
×
228
                        request: "FetchBlocksByUtxos",
×
229
                        details: format!(
×
230
                            "Exceeded maximum number of utxo hashes in request (max: {}, got:{})",
×
231
                            MAX_REQUEST_BY_UTXO_HASHES,
×
232
                            commitments.len()
×
233
                        ),
×
234
                    });
×
235
                }
×
236
                let mut blocks = Vec::with_capacity(commitments.len());
×
237
                for commitment in commitments {
×
238
                    let commitment_hex = commitment.to_hex();
×
239
                    debug!(
×
240
                        target: LOG_TARGET,
×
241
                        "A peer has requested a block with commitment {}", commitment_hex,
×
242
                    );
243
                    match self.blockchain_db.fetch_block_with_utxo(commitment).await {
×
244
                        Ok(Some(block)) => blocks.push(block),
×
245
                        Ok(None) => warn!(
×
246
                            target: LOG_TARGET,
×
247
                            "Could not provide requested block with commitment {} to peer because not stored",
×
248
                            commitment_hex,
249
                        ),
250
                        Err(e) => warn!(
×
251
                            target: LOG_TARGET,
×
252
                            "Could not provide requested block with commitment {} to peer because: {}",
×
253
                            commitment_hex,
×
254
                            e.to_string()
×
255
                        ),
256
                    }
257
                }
258
                Ok(NodeCommsResponse::HistoricalBlocks(blocks))
×
259
            },
260
            NodeCommsRequest::GetHeaderByHash(hash) => {
×
261
                let header = self.blockchain_db.fetch_chain_header_by_block_hash(hash).await?;
×
262
                Ok(NodeCommsResponse::BlockHeader(header))
×
263
            },
264
            NodeCommsRequest::GetBlockByHash(hash) => {
×
265
                let block = self.blockchain_db.fetch_block_by_hash(hash, false).await?;
×
266
                Ok(NodeCommsResponse::HistoricalBlock(Box::new(block)))
×
267
            },
268
            NodeCommsRequest::GetNewBlockTemplate(request) => {
3✔
269
                let best_block_header = self.blockchain_db.fetch_tip_header().await?;
3✔
270
                let mut last_seen_hash = self.mempool.get_last_seen_hash().await?;
3✔
271
                let mut is_mempool_synced = false;
3✔
272
                let start = Instant::now();
3✔
273
                // this will wait a max of 150ms by default before returning anyway with a potential broken template
274
                // We need to ensure the mempool has seen the latest base node height before we can be confident the
275
                // template is correct
276
                while !is_mempool_synced && start.elapsed().as_millis() < MAX_MEMPOOL_TIMEOUT.into() {
6✔
277
                    if best_block_header.hash() == &last_seen_hash || last_seen_hash == FixedHash::default() {
3✔
278
                        is_mempool_synced = true;
3✔
279
                    } else {
3✔
280
                        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
×
281
                        last_seen_hash = self.mempool.get_last_seen_hash().await?;
×
282
                    }
283
                }
284

285
                if !is_mempool_synced {
3✔
286
                    warn!(
×
287
                        target: LOG_TARGET,
×
288
                        "Mempool out of sync - last seen hash '{}' does not match the tip hash '{}'. This condition \
×
289
                         should auto correct with the next block template request",
×
290
                        last_seen_hash, best_block_header.hash()
×
291
                    );
292
                }
3✔
293
                let mut header = BlockHeader::from_previous(best_block_header.header());
3✔
294
                let constants = self.consensus_manager.consensus_constants(header.height);
3✔
295
                header.version = constants.blockchain_version();
3✔
296
                header.pow.pow_algo = request.algo;
3✔
297

3✔
298
                let constants_weight = constants.max_block_transaction_weight();
3✔
299
                let asking_weight = if request.max_weight > constants_weight || request.max_weight == 0 {
3✔
300
                    constants_weight
3✔
301
                } else {
302
                    request.max_weight
×
303
                };
304

305
                debug!(
3✔
306
                    target: LOG_TARGET,
×
307
                    "Fetching transactions with a maximum weight of {} for the template", asking_weight
×
308
                );
309
                let transactions = self
3✔
310
                    .mempool
3✔
311
                    .retrieve(asking_weight)
3✔
312
                    .await?
3✔
313
                    .into_iter()
3✔
314
                    .map(|tx| Arc::try_unwrap(tx).unwrap_or_else(|tx| (*tx).clone()))
8✔
315
                    .collect::<Vec<_>>();
3✔
316

3✔
317
                debug!(
3✔
318
                    target: LOG_TARGET,
×
319
                    "Adding {} transaction(s) to new block template",
×
320
                    transactions.len(),
×
321
                );
322

323
                let prev_hash = header.prev_hash;
3✔
324
                let height = header.height;
3✔
325

3✔
326
                let block = header.into_builder().with_transactions(transactions).build();
3✔
327
                let block_hash = block.hash();
3✔
328
                let block_template = NewBlockTemplate::from_block(
3✔
329
                    block,
3✔
330
                    self.get_target_difficulty_for_next_block(request.algo, constants, prev_hash)
3✔
331
                        .await?,
3✔
332
                    self.consensus_manager.get_block_reward_at(height),
3✔
333
                    is_mempool_synced,
3✔
334
                )?;
×
335

336
                debug!(target: LOG_TARGET,
3✔
337
                    "New block template requested and prepared at height: #{}, target difficulty: {}, block hash: `{}`, weight: {}, {}",
×
338
                    block_template.header.height,
×
339
                    block_template.target_difficulty,
×
340
                    block_hash.to_hex(),
×
341
                    block_template
×
342
                        .body
×
343
                        .calculate_weight(constants.transaction_weight_params())
×
344
                        .map_err(|e| CommsInterfaceError::InternalError(e.to_string()))?,
×
345
                    block_template.body.to_counts_string()
×
346
                );
347

348
                Ok(NodeCommsResponse::NewBlockTemplate(block_template))
3✔
349
            },
350
            NodeCommsRequest::GetNewBlock(block_template) => {
3✔
351
                let height = block_template.header.height;
3✔
352
                let target_difficulty = block_template.target_difficulty;
3✔
353
                let block = self.blockchain_db.prepare_new_block(block_template).await?;
3✔
354
                let constants = self.consensus_manager.consensus_constants(block.header.height);
3✔
355
                debug!(target: LOG_TARGET,
3✔
356
                    "Prepared block: #{}, target difficulty: {}, block hash: `{}`, weight: {}, {}",
×
357
                    height,
×
358
                    target_difficulty,
×
359
                    block.hash().to_hex(),
×
360
                    block
×
361
                        .body
×
362
                        .calculate_weight(constants.transaction_weight_params())
×
363
                        .map_err(|e| CommsInterfaceError::InternalError(e.to_string()))?,
×
364
                    block.body.to_counts_string()
×
365
                );
366
                Ok(NodeCommsResponse::NewBlock {
3✔
367
                    success: true,
3✔
368
                    error: None,
3✔
369
                    block: Some(block),
3✔
370
                })
3✔
371
            },
372
            NodeCommsRequest::GetBlockFromAllChains(hash) => {
4✔
373
                let block_hex = hash.to_hex();
4✔
374
                debug!(
4✔
375
                    target: LOG_TARGET,
×
376
                    "A peer has requested a block with hash {}", block_hex
×
377
                );
378

379
                #[allow(clippy::blocks_in_conditions)]
380
                let maybe_block = match self
4✔
381
                    .blockchain_db
4✔
382
                    .fetch_block_by_hash(hash, true)
4✔
383
                    .await
4✔
384
                    .unwrap_or_else(|e| {
4✔
385
                        warn!(
×
386
                            target: LOG_TARGET,
×
387
                            "Could not provide requested block {} to peer because: {}",
×
388
                            block_hex,
×
389
                            e.to_string()
×
390
                        );
391

392
                        None
×
393
                    }) {
4✔
394
                    None => self.blockchain_db.fetch_orphan(hash).await.map_or_else(
1✔
395
                        |e| {
1✔
396
                            warn!(
1✔
397
                                target: LOG_TARGET,
×
398
                                "Could not provide requested block {} to peer because: {}", block_hex, e,
×
399
                            );
400

401
                            None
1✔
402
                        },
1✔
403
                        Some,
1✔
404
                    ),
1✔
405
                    Some(block) => Some(block.into_block()),
3✔
406
                };
407

408
                Ok(NodeCommsResponse::Block(Box::new(maybe_block)))
4✔
409
            },
410
            NodeCommsRequest::FetchKernelByExcessSig(signature) => {
1✔
411
                let kernels = match self.blockchain_db.fetch_kernel_by_excess_sig(signature).await {
1✔
412
                    Ok(Some((kernel, _))) => vec![kernel],
1✔
413
                    Ok(None) => vec![],
×
414
                    Err(err) => {
×
415
                        error!(target: LOG_TARGET, "Could not fetch kernel {}", err.to_string());
×
416
                        return Err(err.into());
×
417
                    },
418
                };
419

420
                Ok(NodeCommsResponse::TransactionKernels(kernels))
1✔
421
            },
422
            NodeCommsRequest::FetchMempoolTransactionsByExcessSigs { excess_sigs } => {
4✔
423
                let (transactions, not_found) = self.mempool.retrieve_by_excess_sigs(excess_sigs).await?;
4✔
424
                Ok(NodeCommsResponse::FetchMempoolTransactionsByExcessSigsResponse(
4✔
425
                    FetchMempoolTransactionsResponse {
4✔
426
                        transactions,
4✔
427
                        not_found,
4✔
428
                    },
4✔
429
                ))
4✔
430
            },
431
            NodeCommsRequest::FetchValidatorNodesKeys { height } => {
×
432
                let active_validator_nodes = self.blockchain_db.fetch_active_validator_nodes(height).await?;
×
433
                Ok(NodeCommsResponse::FetchValidatorNodesKeysResponse(
×
434
                    active_validator_nodes,
×
435
                ))
×
436
            },
437
            NodeCommsRequest::GetShardKey { height, public_key } => {
×
438
                let shard_key = self.blockchain_db.get_shard_key(height, public_key).await?;
×
439
                Ok(NodeCommsResponse::GetShardKeyResponse(shard_key))
×
440
            },
441
            NodeCommsRequest::FetchTemplateRegistrations {
442
                start_height,
×
443
                end_height,
×
444
            } => {
445
                let template_registrations = self
×
446
                    .blockchain_db
×
447
                    .fetch_template_registrations(start_height..=end_height)
×
448
                    .await?;
×
449
                Ok(NodeCommsResponse::FetchTemplateRegistrationsResponse(
×
450
                    template_registrations,
×
451
                ))
×
452
            },
453
            NodeCommsRequest::FetchUnspentUtxosInBlock { block_hash } => {
×
454
                let utxos = self.blockchain_db.fetch_outputs_in_block(block_hash).await?;
×
455
                Ok(NodeCommsResponse::TransactionOutputs(utxos))
×
456
            },
NEW
457
            NodeCommsRequest::FetchMinedInfoByPayRef(payref) => {
×
NEW
458
                let output_info = self.blockchain_db.fetch_mined_info_by_payref(payref).await?;
×
NEW
459
                Ok(NodeCommsResponse::MinedInfo(output_info))
×
460
            },
NEW
461
            NodeCommsRequest::FetchMinedInfoByOutputHash(output_hash) => {
×
NEW
462
                let output_info = self.blockchain_db.fetch_mined_info_by_output_hash(output_hash).await?;
×
NEW
463
                Ok(NodeCommsResponse::MinedInfo(output_info))
×
464
            },
NEW
465
            NodeCommsRequest::FetchOutputMinedInfo(output_hash) => {
×
NEW
466
                let output_info = self.blockchain_db.fetch_output(output_hash).await?;
×
UNCOV
467
                Ok(NodeCommsResponse::OutputMinedInfo(output_info))
×
468
            },
469
            NodeCommsRequest::CheckOutputSpentStatus(output_hash) => {
×
470
                let input_info = self.blockchain_db.fetch_input(output_hash).await?;
×
471
                Ok(NodeCommsResponse::InputMinedInfo(input_info))
×
472
            },
473
        }
474
    }
102✔
475

476
    /// Handles a `NewBlock` message. Only a single `NewBlock` message can be handled at once to prevent extraneous
477
    /// requests for the full block.
478
    /// This may (asynchronously) block until the other request(s) complete or time out and so should typically be
479
    /// executed in a dedicated task.
480
    pub async fn handle_new_block_message(
21✔
481
        &mut self,
21✔
482
        new_block: NewBlock,
21✔
483
        source_peer: NodeId,
21✔
484
    ) -> Result<(), CommsInterfaceError> {
21✔
485
        let block_hash = new_block.header.hash();
21✔
486

21✔
487
        if self.blockchain_db.inner().is_add_block_disabled() {
21✔
488
            info!(
×
489
                target: LOG_TARGET,
×
490
                "Ignoring block message ({}) because add_block is locked",
×
491
                block_hash.to_hex()
×
492
            );
493
            return Ok(());
×
494
        }
21✔
495

21✔
496
        // Lets check if the block exists before we try and ask for a complete block
21✔
497
        if self.check_exists_and_not_bad_block(block_hash).await? {
21✔
498
            return Ok(());
×
499
        }
21✔
500

21✔
501
        // lets check that the difficulty at least matches 50% of the tip header. The max difficulty drop is 16%, thus
21✔
502
        // 50% is way more than that and in order to attack the node, you need 50% of the mining power. We cannot check
21✔
503
        // the target difficulty as orphan blocks dont have a target difficulty. All we care here is that bad
21✔
504
        // blocks are not free to make, and that they are more expensive to make then they are to validate. As
21✔
505
        // soon as a block can be linked to the main chain, a proper full proof of work check will
21✔
506
        // be done before any other validation.
21✔
507
        self.check_min_block_difficulty(&new_block).await?;
21✔
508

509
        {
510
            // we use a double lock to make sure we can only reconcile one unique block at a time. We may receive the
511
            // same block from multiple peer near simultaneously. We should only reconcile each unique block once.
512
            let read_lock = self.list_of_reconciling_blocks.read().await;
21✔
513
            if read_lock.contains(&block_hash) {
21✔
514
                debug!(
×
515
                    target: LOG_TARGET,
×
516
                    "Block with hash `{}` is already being reconciled",
×
517
                    block_hash.to_hex()
×
518
                );
519
                return Ok(());
×
520
            }
21✔
521
        }
522
        {
523
            let mut write_lock = self.list_of_reconciling_blocks.write().await;
21✔
524
            if self.check_exists_and_not_bad_block(block_hash).await? {
21✔
525
                return Ok(());
×
526
            }
21✔
527

21✔
528
            if !write_lock.insert(block_hash) {
21✔
529
                debug!(
×
530
                    target: LOG_TARGET,
×
531
                    "Block with hash `{}` is already being reconciled",
×
532
                    block_hash.to_hex()
×
533
                );
534
                return Ok(());
×
535
            }
21✔
536
        }
21✔
537

21✔
538
        debug!(
21✔
539
            target: LOG_TARGET,
×
540
            "Block with hash `{}` is unknown. Constructing block from known mempool transactions / requesting missing \
×
541
             transactions from peer '{}'.",
×
542
            block_hash.to_hex(),
×
543
            source_peer
544
        );
545

546
        let result = self.reconcile_and_add_block(source_peer.clone(), new_block).await;
21✔
547

548
        {
549
            let mut write_lock = self.list_of_reconciling_blocks.write().await;
21✔
550
            write_lock.remove(&block_hash);
21✔
551
        }
21✔
552
        result?;
21✔
553
        Ok(())
18✔
554
    }
21✔
555

556
    async fn check_min_block_difficulty(&self, new_block: &NewBlock) -> Result<(), CommsInterfaceError> {
21✔
557
        let constants = self.consensus_manager.consensus_constants(new_block.header.height);
21✔
558
        let gen_hash = *self.consensus_manager.get_genesis_block().hash();
21✔
559
        let mut min_difficulty = constants.min_pow_difficulty(new_block.header.pow.pow_algo);
21✔
560
        let mut header = self.blockchain_db.fetch_last_chain_header().await?;
21✔
561
        loop {
562
            if new_block.header.pow_algo() == header.header().pow_algo() {
21✔
563
                min_difficulty = max(
21✔
564
                    header
21✔
565
                        .accumulated_data()
21✔
566
                        .target_difficulty
21✔
567
                        .checked_div_u64(2)
21✔
568
                        .unwrap_or(min_difficulty),
21✔
569
                    min_difficulty,
21✔
570
                );
21✔
571
                break;
21✔
572
            }
×
573
            if header.height() == 0 {
×
574
                break;
×
575
            }
×
576
            // we have not reached gen block, and the pow algo does not match, so lets go further back
577
            header = self
×
578
                .blockchain_db
×
579
                .fetch_chain_header(header.height().saturating_sub(1))
×
580
                .await?;
×
581
        }
582
        let achieved = match new_block.header.pow_algo() {
21✔
583
            PowAlgorithm::RandomXM => monero_randomx_difficulty(
×
584
                &new_block.header,
×
585
                &self.randomx_factory,
×
586
                &gen_hash,
×
587
                &self.consensus_manager,
×
588
            )?,
×
589
            PowAlgorithm::Sha3x => sha3x_difficulty(&new_block.header)?,
21✔
590
            PowAlgorithm::RandomXT => {
591
                let vm_key = *self
×
592
                    .blockchain_db
×
593
                    .fetch_chain_header(tari_rx_vm_key_height(header.height()))
×
594
                    .await?
×
595
                    .hash();
×
596
                tari_randomx_difficulty(&new_block.header, &self.randomx_factory, &vm_key)?
×
597
            },
598
        };
599
        if achieved < min_difficulty {
21✔
600
            return Err(CommsInterfaceError::InvalidBlockHeader(
×
601
                BlockHeaderValidationError::ProofOfWorkError(PowError::AchievedDifficultyBelowMin),
×
602
            ));
×
603
        }
21✔
604
        Ok(())
21✔
605
    }
21✔
606

607
    async fn check_exists_and_not_bad_block(&self, block: FixedHash) -> Result<bool, CommsInterfaceError> {
42✔
608
        if self.blockchain_db.chain_header_or_orphan_exists(block).await? {
42✔
609
            debug!(
×
610
                target: LOG_TARGET,
×
611
                "Block with hash `{}` already stored",
×
612
                block.to_hex()
×
613
            );
614
            return Ok(true);
×
615
        }
42✔
616
        let (is_bad_block, reason) = self.blockchain_db.bad_block_exists(block).await?;
42✔
617
        if is_bad_block {
42✔
618
            debug!(
×
619
                target: LOG_TARGET,
×
620
                "Block with hash `{}` already validated as a bad block due to `{}`",
×
621
                block.to_hex(), reason
×
622
            );
623
            return Err(CommsInterfaceError::ChainStorageError(
×
624
                ChainStorageError::ValidationError {
×
625
                    source: ValidationError::BadBlockFound {
×
626
                        hash: block.to_hex(),
×
627
                        reason,
×
628
                    },
×
629
                },
×
630
            ));
×
631
        }
42✔
632
        Ok(false)
42✔
633
    }
42✔
634

635
    async fn reconcile_and_add_block(
21✔
636
        &mut self,
21✔
637
        source_peer: NodeId,
21✔
638
        new_block: NewBlock,
21✔
639
    ) -> Result<(), CommsInterfaceError> {
21✔
640
        let block = self.reconcile_block(source_peer.clone(), new_block).await?;
21✔
641
        self.handle_block(block, Some(source_peer)).await?;
20✔
642
        Ok(())
18✔
643
    }
21✔
644

645
    #[allow(clippy::too_many_lines)]
646
    async fn reconcile_block(
21✔
647
        &mut self,
21✔
648
        source_peer: NodeId,
21✔
649
        new_block: NewBlock,
21✔
650
    ) -> Result<Block, CommsInterfaceError> {
21✔
651
        let NewBlock {
21✔
652
            header,
21✔
653
            coinbase_kernels,
21✔
654
            coinbase_outputs,
21✔
655
            kernel_excess_sigs: excess_sigs,
21✔
656
        } = new_block;
21✔
657
        // If the block is empty, we dont have to ask for the block, as we already have the full block available
21✔
658
        // to us.
21✔
659
        if excess_sigs.is_empty() {
21✔
660
            let block = BlockBuilder::new(header.version)
17✔
661
                .add_outputs(coinbase_outputs)
17✔
662
                .add_kernels(coinbase_kernels)
17✔
663
                .with_header(header)
17✔
664
                .build();
17✔
665
            return Ok(block);
17✔
666
        }
4✔
667

4✔
668
        let block_hash = header.hash();
4✔
669
        // We check the current tip and orphan status of the block because we cannot guarantee that mempool state is
670
        // correct and the mmr root calculation is only valid if the block is building on the tip.
671
        let current_meta = self.blockchain_db.get_chain_metadata().await?;
4✔
672
        if header.prev_hash != *current_meta.best_block_hash() {
4✔
673
            debug!(
×
674
                target: LOG_TARGET,
×
675
                "Orphaned block #{}: ({}), current tip is: #{} ({}). We need to fetch the complete block from peer: \
×
676
                 ({})",
×
677
                header.height,
×
678
                block_hash.to_hex(),
×
679
                current_meta.best_block_height(),
×
680
                current_meta.best_block_hash().to_hex(),
×
681
                source_peer,
682
            );
683
            #[allow(clippy::cast_possible_wrap)]
684
            #[cfg(feature = "metrics")]
685
            metrics::compact_block_tx_misses(header.height).set(excess_sigs.len() as i64);
×
686
            let block = self.request_full_block_from_peer(source_peer, block_hash).await?;
×
687
            return Ok(block);
×
688
        }
4✔
689

690
        // We know that the block is neither and orphan or a coinbase, so lets ask our mempool for the transactions
691
        let (known_transactions, missing_excess_sigs) = self.mempool.retrieve_by_excess_sigs(excess_sigs).await?;
4✔
692
        let known_transactions = known_transactions.into_iter().map(|tx| (*tx).clone()).collect();
4✔
693

4✔
694
        #[allow(clippy::cast_possible_wrap)]
4✔
695
        #[cfg(feature = "metrics")]
4✔
696
        metrics::compact_block_tx_misses(header.height).set(missing_excess_sigs.len() as i64);
4✔
697

4✔
698
        let mut builder = BlockBuilder::new(header.version)
4✔
699
            .add_outputs(coinbase_outputs)
4✔
700
            .add_kernels(coinbase_kernels)
4✔
701
            .with_transactions(known_transactions);
4✔
702

4✔
703
        if missing_excess_sigs.is_empty() {
4✔
704
            debug!(
×
705
                target: LOG_TARGET,
×
706
                "All transactions for block #{} ({}) found in mempool",
×
707
                header.height,
×
708
                block_hash.to_hex()
×
709
            );
710
        } else {
711
            debug!(
4✔
712
                target: LOG_TARGET,
×
713
                "Requesting {} unknown transaction(s) from peer '{}'.",
×
714
                missing_excess_sigs.len(),
×
715
                source_peer
716
            );
717

718
            let FetchMempoolTransactionsResponse {
719
                transactions,
4✔
720
                not_found,
4✔
721
            } = self
4✔
722
                .outbound_nci
4✔
723
                .request_transactions_by_excess_sig(source_peer.clone(), missing_excess_sigs)
4✔
724
                .await?;
4✔
725

726
            // Add returned transactions to unconfirmed pool
727
            if !transactions.is_empty() {
4✔
728
                self.mempool.insert_all(transactions.clone()).await?;
×
729
            }
4✔
730

731
            if !not_found.is_empty() {
4✔
732
                warn!(
4✔
733
                    target: LOG_TARGET,
×
734
                    "Peer {} was not able to return all transactions for block #{} ({}). {} transaction(s) not found. \
×
735
                     Requesting full block.",
×
736
                    source_peer,
×
737
                    header.height,
×
738
                    block_hash.to_hex(),
×
739
                    not_found.len()
×
740
                );
741

742
                #[cfg(feature = "metrics")]
743
                metrics::compact_block_full_misses(header.height).inc();
4✔
744
                let block = self.request_full_block_from_peer(source_peer, block_hash).await?;
4✔
745
                return Ok(block);
3✔
746
            }
×
747

×
748
            builder = builder.with_transactions(
×
749
                transactions
×
750
                    .into_iter()
×
751
                    .map(|tx| Arc::try_unwrap(tx).unwrap_or_else(|tx| (*tx).clone()))
×
752
                    .collect(),
×
753
            );
×
754
        }
×
755

756
        // NB: Add the header last because `with_transactions` etc updates the current header, but we have the final one
757
        // already
758
        builder = builder.with_header(header.clone());
×
759
        let block = builder.build();
×
760

761
        // Perform a sanity check on the reconstructed block, if the MMR roots don't match then it's possible one or
762
        // more transactions in our mempool had the same excess/signature for a *different* transaction.
763
        // This is extremely unlikely, but still possible. In case of a mismatch, request the full block from the peer.
764
        let (block, mmr_roots) = match self.blockchain_db.calculate_mmr_roots(block).await {
×
765
            Err(_) => {
766
                let block = self.request_full_block_from_peer(source_peer, block_hash).await?;
×
767
                return Ok(block);
×
768
            },
769
            Ok(v) => v,
×
770
        };
771
        if let Err(e) = helpers::check_mmr_roots(&header, &mmr_roots) {
×
772
            warn!(
×
773
                target: LOG_TARGET,
×
774
                "Reconstructed block #{} ({}) failed MMR check validation!. Requesting full block. Error: {}",
×
775
                header.height,
×
776
                block_hash.to_hex(),
×
777
                e,
778
            );
779

780
            #[cfg(feature = "metrics")]
781
            metrics::compact_block_mmr_mismatch(header.height).inc();
×
782
            let block = self.request_full_block_from_peer(source_peer, block_hash).await?;
×
783
            return Ok(block);
×
784
        }
×
785

×
786
        Ok(block)
×
787
    }
21✔
788

789
    async fn request_full_block_from_peer(
4✔
790
        &mut self,
4✔
791
        source_peer: NodeId,
4✔
792
        block_hash: BlockHash,
4✔
793
    ) -> Result<Block, CommsInterfaceError> {
4✔
794
        match self
4✔
795
            .outbound_nci
4✔
796
            .request_blocks_by_hashes_from_peer(block_hash, Some(source_peer.clone()))
4✔
797
            .await
4✔
798
        {
799
            Ok(Some(block)) => Ok(block),
3✔
800
            Ok(None) => {
801
                debug!(
1✔
802
                    target: LOG_TARGET,
×
803
                    "Peer `{}` failed to return the block that was requested.", source_peer
×
804
                );
805
                Err(CommsInterfaceError::InvalidPeerResponse(format!(
1✔
806
                    "Invalid response from peer `{}`: Peer failed to provide the block that was propagated",
1✔
807
                    source_peer
1✔
808
                )))
1✔
809
            },
810
            Err(CommsInterfaceError::UnexpectedApiResponse) => {
811
                debug!(
×
812
                    target: LOG_TARGET,
×
813
                    "Peer `{}` sent unexpected API response.", source_peer
×
814
                );
815
                Err(CommsInterfaceError::UnexpectedApiResponse)
×
816
            },
817
            Err(e) => Err(e),
×
818
        }
819
    }
4✔
820

821
    /// Handle inbound blocks from remote nodes and local services.
822
    ///
823
    /// ## Arguments
824
    /// block - the block to store
825
    /// new_block_msg - propagate this new block message
826
    /// source_peer - the peer that sent this new block message, or None if the block was generated by a local miner
827
    pub async fn handle_block(
32✔
828
        &mut self,
32✔
829
        block: Block,
32✔
830
        source_peer: Option<NodeId>,
32✔
831
    ) -> Result<BlockHash, CommsInterfaceError> {
32✔
832
        let block_hash = block.hash();
32✔
833
        let block_height = block.header.height;
32✔
834

32✔
835
        info!(
32✔
836
            target: LOG_TARGET,
×
837
            "Block #{} ({}) received from {}",
×
838
            block_height,
×
839
            block_hash.to_hex(),
×
840
            source_peer
×
841
                .as_ref()
×
842
                .map(|p| format!("remote peer: {}", p))
×
843
                .unwrap_or_else(|| "local services".to_string())
×
844
        );
845
        debug!(target: LOG_TARGET, "Incoming block: {}", block);
32✔
846
        let timer = Instant::now();
32✔
847
        let block = self.hydrate_block(block).await?;
32✔
848

849
        let add_block_result = self.blockchain_db.add_block(block.clone()).await;
32✔
850
        // Create block event on block event stream
851
        match add_block_result {
2✔
852
            Ok(block_add_result) => {
30✔
853
                debug!(
30✔
854
                    target: LOG_TARGET,
×
855
                    "Block #{} ({}) added ({}) to blockchain in {:.2?}",
×
856
                    block_height,
×
857
                    block_hash.to_hex(),
×
858
                    block_add_result,
×
859
                    timer.elapsed()
×
860
                );
861

862
                let should_propagate = match &block_add_result {
30✔
863
                    BlockAddResult::Ok(_) => true,
30✔
864
                    BlockAddResult::BlockExists => false,
×
865
                    BlockAddResult::OrphanBlock => false,
×
866
                    BlockAddResult::ChainReorg { .. } => true,
×
867
                };
868

869
                #[cfg(feature = "metrics")]
870
                self.update_block_result_metrics(&block_add_result).await?;
30✔
871

872
                self.publish_block_event(BlockEvent::ValidBlockAdded(block.clone(), block_add_result));
30✔
873

30✔
874
                if should_propagate {
30✔
875
                    debug!(
30✔
876
                        target: LOG_TARGET,
×
877
                        "Propagate block ({}) to network.",
×
878
                        block_hash.to_hex()
×
879
                    );
880
                    let exclude_peers = source_peer.into_iter().collect();
30✔
881
                    let new_block_msg = NewBlock::from(&*block);
30✔
882
                    if let Err(e) = self.outbound_nci.propagate_block(new_block_msg, exclude_peers).await {
30✔
883
                        warn!(
×
884
                            target: LOG_TARGET,
×
885
                            "Failed to propagate block ({}) to network: {}.",
×
886
                            block_hash.to_hex(), e
×
887
                        );
888
                    }
30✔
889
                }
×
890
                Ok(block_hash)
30✔
891
            },
892

893
            Err(e @ ChainStorageError::ValidationError { .. }) => {
2✔
894
                #[cfg(feature = "metrics")]
2✔
895
                {
2✔
896
                    let block_hash = block.hash();
2✔
897
                    metrics::rejected_blocks(block.header.height, &block_hash).inc();
2✔
898
                }
2✔
899
                warn!(
2✔
900
                    target: LOG_TARGET,
×
901
                    "Peer {} sent an invalid block: {}",
×
902
                    source_peer
×
903
                        .as_ref()
×
904
                        .map(ToString::to_string)
×
905
                        .unwrap_or_else(|| "<local request>".to_string()),
×
906
                    e
907
                );
908
                self.publish_block_event(BlockEvent::AddBlockValidationFailed { block, source_peer });
2✔
909
                Err(e.into())
2✔
910
            },
911

912
            Err(e) => {
×
913
                #[cfg(feature = "metrics")]
×
914
                metrics::rejected_blocks(block.header.height, &block.hash()).inc();
×
915

×
916
                self.publish_block_event(BlockEvent::AddBlockErrored { block });
×
917
                Err(e.into())
×
918
            },
919
        }
920
    }
32✔
921

922
    async fn hydrate_block(&mut self, block: Block) -> Result<Arc<Block>, CommsInterfaceError> {
32✔
923
        let block_hash = block.hash();
32✔
924
        let block_height = block.header.height;
32✔
925
        if block.body.inputs().is_empty() {
32✔
926
            debug!(
24✔
927
                target: LOG_TARGET,
×
928
                "Block #{} ({}) contains no inputs so nothing to hydrate",
×
929
                block_height,
×
930
                block_hash.to_hex(),
×
931
            );
932
            return Ok(Arc::new(block));
24✔
933
        }
8✔
934

8✔
935
        let timer = Instant::now();
8✔
936
        let (header, mut inputs, outputs, kernels) = block.dissolve();
8✔
937

938
        let db = self.blockchain_db.inner().db_read_access()?;
8✔
939
        for input in &mut inputs {
17✔
940
            if !input.is_compact() {
9✔
941
                continue;
6✔
942
            }
3✔
943

944
            let output_mined_info =
3✔
945
                db.fetch_output(&input.output_hash())?
3✔
946
                    .ok_or_else(|| CommsInterfaceError::InvalidFullBlock {
3✔
947
                        hash: block_hash,
×
948
                        details: format!("Output {} to be spent does not exist in db", input.output_hash()),
×
949
                    })?;
3✔
950

951
            let rp_hash = match output_mined_info.output.proof {
3✔
952
                Some(proof) => proof.hash(),
3✔
953
                None => FixedHash::zero(),
×
954
            };
955
            input.add_output_data(
3✔
956
                output_mined_info.output.version,
3✔
957
                output_mined_info.output.features,
3✔
958
                output_mined_info.output.commitment,
3✔
959
                output_mined_info.output.script,
3✔
960
                output_mined_info.output.sender_offset_public_key,
3✔
961
                output_mined_info.output.covenant,
3✔
962
                output_mined_info.output.encrypted_data,
3✔
963
                output_mined_info.output.metadata_signature,
3✔
964
                rp_hash,
3✔
965
                output_mined_info.output.minimum_value_promise,
3✔
966
            );
3✔
967
        }
968
        debug!(
8✔
969
            target: LOG_TARGET,
×
970
            "Hydrated block #{} ({}) with {} input(s) in {:.2?}",
×
971
            block_height,
×
972
            block_hash.to_hex(),
×
973
            inputs.len(),
×
974
            timer.elapsed()
×
975
        );
976
        let block = Block::new(header, AggregateBody::new(inputs, outputs, kernels));
8✔
977
        Ok(Arc::new(block))
8✔
978
    }
32✔
979

980
    fn publish_block_event(&self, event: BlockEvent) {
32✔
981
        if let Err(event) = self.block_event_sender.send(Arc::new(event)) {
32✔
982
            debug!(target: LOG_TARGET, "No event subscribers. Event {} dropped.", event.0)
×
983
        }
32✔
984
    }
32✔
985

986
    #[cfg(feature = "metrics")]
987
    async fn update_block_result_metrics(&self, block_add_result: &BlockAddResult) -> Result<(), CommsInterfaceError> {
30✔
988
        fn update_target_difficulty(block: &ChainBlock) {
30✔
989
            match block.header().pow_algo() {
30✔
990
                PowAlgorithm::Sha3x => {
30✔
991
                    metrics::target_difficulty_sha()
30✔
992
                        .set(i64::try_from(block.accumulated_data().target_difficulty.as_u64()).unwrap_or(i64::MAX));
30✔
993
                },
30✔
994
                PowAlgorithm::RandomXM => {
×
995
                    metrics::target_difficulty_monero_randomx()
×
996
                        .set(i64::try_from(block.accumulated_data().target_difficulty.as_u64()).unwrap_or(i64::MAX));
×
997
                },
×
998
                PowAlgorithm::RandomXT => {
×
999
                    metrics::target_difficulty_tari_randomx()
×
1000
                        .set(i64::try_from(block.accumulated_data().target_difficulty.as_u64()).unwrap_or(i64::MAX));
×
1001
                },
×
1002
            }
1003
        }
30✔
1004

1005
        match block_add_result {
30✔
1006
            BlockAddResult::Ok(ref block) => {
30✔
1007
                update_target_difficulty(block);
30✔
1008
                #[allow(clippy::cast_possible_wrap)]
30✔
1009
                metrics::tip_height().set(block.height() as i64);
30✔
1010
                let utxo_set_size = self.blockchain_db.utxo_count().await?;
30✔
1011
                metrics::utxo_set_size().set(utxo_set_size.try_into().unwrap_or(i64::MAX));
30✔
1012
            },
1013
            BlockAddResult::ChainReorg { added, removed } => {
×
1014
                if let Some(fork_height) = added.last().map(|b| b.height()) {
×
1015
                    #[allow(clippy::cast_possible_wrap)]
1016
                    metrics::tip_height().set(fork_height as i64);
×
1017
                    metrics::reorg(fork_height, added.len(), removed.len()).inc();
×
1018

1019
                    let utxo_set_size = self.blockchain_db.utxo_count().await?;
×
1020
                    metrics::utxo_set_size().set(utxo_set_size.try_into().unwrap_or(i64::MAX));
×
1021
                }
×
1022
                for block in added {
×
1023
                    update_target_difficulty(block);
×
1024
                }
×
1025
            },
1026
            BlockAddResult::OrphanBlock => {
×
1027
                metrics::orphaned_blocks().inc();
×
1028
            },
×
1029
            _ => {},
×
1030
        }
1031
        Ok(())
30✔
1032
    }
30✔
1033

1034
    async fn get_target_difficulty_for_next_block(
3✔
1035
        &self,
3✔
1036
        pow_algo: PowAlgorithm,
3✔
1037
        constants: &ConsensusConstants,
3✔
1038
        current_block_hash: HashOutput,
3✔
1039
    ) -> Result<Difficulty, CommsInterfaceError> {
3✔
1040
        let target_difficulty = self
3✔
1041
            .blockchain_db
3✔
1042
            .fetch_target_difficulty_for_next_block(pow_algo, current_block_hash)
3✔
1043
            .await?;
3✔
1044

1045
        let target = target_difficulty.calculate(
3✔
1046
            constants.min_pow_difficulty(pow_algo),
3✔
1047
            constants.max_pow_difficulty(pow_algo),
3✔
1048
        );
3✔
1049
        trace!(target: LOG_TARGET, "Target difficulty {} for PoW {}", target, pow_algo);
3✔
1050
        Ok(target)
3✔
1051
    }
3✔
1052

1053
    pub async fn get_last_seen_hash(&self) -> Result<FixedHash, CommsInterfaceError> {
×
1054
        self.mempool.get_last_seen_hash().await.map_err(|e| e.into())
×
1055
    }
×
1056
}
1057

1058
impl<B> Clone for InboundNodeCommsHandlers<B> {
1059
    fn clone(&self) -> Self {
128✔
1060
        Self {
128✔
1061
            block_event_sender: self.block_event_sender.clone(),
128✔
1062
            blockchain_db: self.blockchain_db.clone(),
128✔
1063
            mempool: self.mempool.clone(),
128✔
1064
            consensus_manager: self.consensus_manager.clone(),
128✔
1065
            list_of_reconciling_blocks: self.list_of_reconciling_blocks.clone(),
128✔
1066
            outbound_nci: self.outbound_nci.clone(),
128✔
1067
            connectivity: self.connectivity.clone(),
128✔
1068
            randomx_factory: self.randomx_factory.clone(),
128✔
1069
        }
128✔
1070
    }
128✔
1071
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc