• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 28438804694

30 Jun 2026 10:48AM UTC coverage: 61.89% (+1.0%) from 60.884%
28438804694

push

github

web-flow
chore: upgrade anyhow (#7909)

Description
---
upgrade anyhow

72159 of 116593 relevant lines covered (61.89%)

224256.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

51.63
/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs
1
// Copyright 2019. The Tari Project
2
//
3
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
// following conditions are met:
5
//
6
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
// disclaimer.
8
//
9
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
// following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
// products derived from this software without specific prior written permission.
14
//
15
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
#[cfg(feature = "metrics")]
24
use std::convert::{TryFrom, TryInto};
25
use std::{
26
    cmp::max,
27
    collections::HashSet,
28
    sync::Arc,
29
    time::{Duration, Instant},
30
};
31

32
use log::*;
33
use strum_macros::Display;
34
use tari_common_types::types::{BlockHash, FixedHash, HashOutput};
35
use tari_comms::{connectivity::ConnectivityRequester, peer_manager::NodeId};
36
use tari_node_components::blocks::{
37
    Block,
38
    BlockBuilder,
39
    BlockHeader,
40
    BlockHeaderValidationError,
41
    ChainBlock,
42
    NewBlock,
43
    NewBlockTemplate,
44
};
45
use tari_transaction_components::{
46
    aggregated_body::AggregateBody,
47
    consensus::ConsensusConstants,
48
    tari_proof_of_work::{Difficulty, PowAlgorithm, PowError},
49
};
50
use tari_utilities::hex::Hex;
51
use tokio::sync::{RwLock, watch};
52

53
#[cfg(feature = "metrics")]
54
use crate::base_node::metrics;
55
use crate::{
56
    base_node::comms_interface::{
57
        FetchMempoolTransactionsResponse,
58
        NodeCommsRequest,
59
        NodeCommsResponse,
60
        OutboundNodeCommsInterface,
61
        comms_response::ValidatorNodeChange,
62
        error::CommsInterfaceError,
63
        local_interface::BlockEventSender,
64
    },
65
    chain_storage::{BlockAddResult, BlockchainBackend, ChainStorageError, async_db::AsyncBlockchainDb},
66
    consensus::BaseNodeConsensusManager,
67
    mempool::{Mempool, MempoolLastSeen},
68
    proof_of_work::{
69
        cuckaroo_pow::cuckaroo_difficulty,
70
        monero_randomx_difficulty,
71
        randomx_factory::RandomXFactory,
72
        sha3x_difficulty,
73
        tari_randomx_difficulty,
74
    },
75
    validation::{ValidationError, helpers, tari_rx_vm_key_height},
76
};
77

78
const LOG_TARGET: &str = "c::bn::comms_interface::inbound_handler";
79
const MAX_REQUEST_BY_BLOCK_HASHES: usize = 100;
80
const MAX_REQUEST_BY_KERNEL_EXCESS_SIGS: usize = 100;
81
const MAX_REQUEST_BY_UTXO_HASHES: usize = 100;
82
const MAX_MEMPOOL_TIMEOUT: u64 = 150;
83
#[cfg(feature = "metrics")]
84
const DIFF_INDICATOR_LAG: u64 = 25;
85

86
/// Events that can be published on the Validated Block Event Stream
87
/// Broadcast is to notify subscribers if this is a valid propagated block event
88
#[derive(Debug, Clone, Display)]
89
pub enum BlockEvent {
90
    ValidBlockAdded(Arc<Block>, BlockAddResult),
91
    AddBlockValidationFailed {
92
        block: Arc<Block>,
93
        source_peer: Option<NodeId>,
94
    },
95
    AddBlockErrored {
96
        block: Arc<Block>,
97
    },
98
    BlockSyncComplete(Arc<ChainBlock>, u64),
99
    BlockSyncRewind(Vec<Arc<ChainBlock>>),
100
}
101

102
/// The InboundNodeCommsInterface is used to handle all received inbound requests from remote nodes.
103
pub struct InboundNodeCommsHandlers<B> {
104
    block_event_sender: BlockEventSender,
105
    blockchain_db: AsyncBlockchainDb<B>,
106
    mempool: Mempool,
107
    consensus_manager: BaseNodeConsensusManager,
108
    list_of_reconciling_blocks: Arc<RwLock<HashSet<HashOutput>>>,
109
    outbound_nci: OutboundNodeCommsInterface,
110
    connectivity: ConnectivityRequester,
111
    randomx_factory: RandomXFactory,
112
    // Long-lived subscription to the mempool's last-seen block (hash + height). Cloned per template request (cheap,
113
    // local) so we can wait for the mempool to catch up without re-subscribing or busy-polling.
114
    mempool_last_seen: watch::Receiver<MempoolLastSeen>,
115
    // Maximum time to wait for the mempool to catch up to the chain tip when building a new block template. Defaults
116
    // to MAX_MEMPOOL_TIMEOUT; overridable (mainly for tests) via `with_mempool_sync_timeout`.
117
    mempool_sync_timeout: Duration,
118
}
119

120
impl<B> InboundNodeCommsHandlers<B>
121
where B: BlockchainBackend + 'static
122
{
123
    /// Construct a new InboundNodeCommsInterface.
124
    pub fn new(
66✔
125
        block_event_sender: BlockEventSender,
66✔
126
        blockchain_db: AsyncBlockchainDb<B>,
66✔
127
        mempool: Mempool,
66✔
128
        consensus_manager: BaseNodeConsensusManager,
66✔
129
        outbound_nci: OutboundNodeCommsInterface,
66✔
130
        connectivity: ConnectivityRequester,
66✔
131
        randomx_factory: RandomXFactory,
66✔
132
    ) -> Self {
66✔
133
        let mempool_last_seen = mempool.subscribe_last_seen();
66✔
134
        Self {
66✔
135
            block_event_sender,
66✔
136
            blockchain_db,
66✔
137
            mempool,
66✔
138
            consensus_manager,
66✔
139
            list_of_reconciling_blocks: Arc::new(RwLock::new(HashSet::new())),
66✔
140
            outbound_nci,
66✔
141
            connectivity,
66✔
142
            randomx_factory,
66✔
143
            mempool_last_seen,
66✔
144
            mempool_sync_timeout: Duration::from_millis(MAX_MEMPOOL_TIMEOUT),
66✔
145
        }
66✔
146
    }
66✔
147

148
    /// Override how long `GetNewBlockTemplate` will wait for the mempool to catch up to the chain tip. The production
149
    /// default (`MAX_MEMPOOL_TIMEOUT`) is intentionally short; tests use this to wait deterministically without racing
150
    /// that deadline.
151
    pub fn with_mempool_sync_timeout(mut self, timeout: Duration) -> Self {
1✔
152
        self.mempool_sync_timeout = timeout;
1✔
153
        self
1✔
154
    }
1✔
155

156
    /// Handle inbound node comms requests from remote nodes and local services.
157
    #[allow(clippy::too_many_lines)]
158
    pub async fn handle_request(&self, request: NodeCommsRequest) -> Result<NodeCommsResponse, CommsInterfaceError> {
131✔
159
        trace!(target: LOG_TARGET, "Handling remote request {request}");
131✔
160
        match request {
131✔
161
            NodeCommsRequest::GetChainMetadata => Ok(NodeCommsResponse::ChainMetadata(
162
                self.blockchain_db.get_chain_metadata().await?,
110✔
163
            )),
164
            NodeCommsRequest::GetTargetDifficultyNextBlock(algo) => {
×
165
                let header = self.blockchain_db.fetch_tip_header().await?;
×
166
                let constants = self.consensus_manager.consensus_constants(header.header().height);
×
167
                let target_difficulty = self
×
168
                    .get_target_difficulty_for_next_block(algo, constants, *header.hash())
×
169
                    .await?;
×
170
                Ok(NodeCommsResponse::TargetDifficulty(target_difficulty))
×
171
            },
172
            NodeCommsRequest::FetchHeaders(range) => {
1✔
173
                let headers = self.blockchain_db.fetch_chain_headers(range).await?;
1✔
174
                Ok(NodeCommsResponse::BlockHeaders(headers))
1✔
175
            },
176
            NodeCommsRequest::FetchHeadersByHashes(block_hashes) => {
×
177
                if block_hashes.len() > MAX_REQUEST_BY_BLOCK_HASHES {
×
178
                    return Err(CommsInterfaceError::InvalidRequest {
×
179
                        request: "FetchHeadersByHashes",
×
180
                        details: format!(
×
181
                            "Exceeded maximum block hashes request (max: {}, got:{})",
×
182
                            MAX_REQUEST_BY_BLOCK_HASHES,
×
183
                            block_hashes.len()
×
184
                        ),
×
185
                    });
×
186
                }
×
187
                let mut block_headers = Vec::with_capacity(block_hashes.len());
×
188
                for block_hash in block_hashes {
×
189
                    let block_hex = block_hash.to_hex();
×
190
                    match self.blockchain_db.fetch_chain_header_by_block_hash(block_hash).await? {
×
191
                        Some(block_header) => {
×
192
                            block_headers.push(block_header);
×
193
                        },
×
194
                        None => {
195
                            error!(target: LOG_TARGET, "Could not fetch headers with hashes:{block_hex}");
×
196
                            return Err(CommsInterfaceError::InternalError(format!(
×
197
                                "Could not fetch headers with hashes:{block_hex}"
×
198
                            )));
×
199
                        },
200
                    }
201
                }
202
                Ok(NodeCommsResponse::BlockHeaders(block_headers))
×
203
            },
204
            NodeCommsRequest::FetchMatchingUtxos(utxo_hashes) => {
1✔
205
                let mut res = Vec::with_capacity(utxo_hashes.len());
1✔
206
                for (output, spent) in (self
1✔
207
                    .blockchain_db
1✔
208
                    .fetch_outputs_with_spend_status_at_tip(utxo_hashes)
1✔
209
                    .await?)
1✔
210
                    .into_iter()
1✔
211
                    .flatten()
1✔
212
                {
213
                    if !spent {
1✔
214
                        res.push(output);
1✔
215
                    }
1✔
216
                }
217
                Ok(NodeCommsResponse::TransactionOutputs(res))
1✔
218
            },
219
            NodeCommsRequest::FetchMatchingBlocks { range, compact } => {
3✔
220
                let blocks = self.blockchain_db.fetch_blocks(range, compact).await?;
3✔
221
                Ok(NodeCommsResponse::HistoricalBlocks(blocks))
3✔
222
            },
223
            NodeCommsRequest::FetchBlocksByKernelExcessSigs(excess_sigs) => {
×
224
                if excess_sigs.len() > MAX_REQUEST_BY_KERNEL_EXCESS_SIGS {
×
225
                    return Err(CommsInterfaceError::InvalidRequest {
×
226
                        request: "FetchBlocksByKernelExcessSigs",
×
227
                        details: format!(
×
228
                            "Exceeded maximum number of kernel excess sigs in request (max: {}, got:{})",
×
229
                            MAX_REQUEST_BY_KERNEL_EXCESS_SIGS,
×
230
                            excess_sigs.len()
×
231
                        ),
×
232
                    });
×
233
                }
×
234
                let mut blocks = Vec::with_capacity(excess_sigs.len());
×
235
                for sig in excess_sigs {
×
236
                    let sig_hex = sig.get_signature().to_hex();
×
237
                    debug!(
×
238
                        target: LOG_TARGET,
×
239
                        "A peer has requested a block with kernel with sig {sig_hex}"
240
                    );
241
                    match self.blockchain_db.fetch_block_with_kernel(sig).await {
×
242
                        Ok(Some(block)) => blocks.push(block),
×
243
                        Ok(None) => warn!(
×
244
                            target: LOG_TARGET,
×
245
                            "Could not provide requested block containing kernel with sig {sig_hex} to peer because not \
246
                             stored"
247
                        ),
248
                        Err(e) => warn!(
×
249
                            target: LOG_TARGET,
×
250
                            "Could not provide requested block containing kernel with sig {sig_hex} to peer because: {e}"
251
                        ),
252
                    }
253
                }
254
                Ok(NodeCommsResponse::HistoricalBlocks(blocks))
×
255
            },
256
            NodeCommsRequest::FetchBlocksByUtxos(commitments) => {
×
257
                if commitments.len() > MAX_REQUEST_BY_UTXO_HASHES {
×
258
                    return Err(CommsInterfaceError::InvalidRequest {
×
259
                        request: "FetchBlocksByUtxos",
×
260
                        details: format!(
×
261
                            "Exceeded maximum number of utxo hashes in request (max: {}, got:{})",
×
262
                            MAX_REQUEST_BY_UTXO_HASHES,
×
263
                            commitments.len()
×
264
                        ),
×
265
                    });
×
266
                }
×
267
                let mut blocks = Vec::with_capacity(commitments.len());
×
268
                for commitment in commitments {
×
269
                    let commitment_hex = commitment.to_hex();
×
270
                    debug!(
×
271
                        target: LOG_TARGET,
×
272
                        "A peer has requested a block with commitment {commitment_hex}",
273
                    );
274
                    match self.blockchain_db.fetch_block_with_utxo(commitment).await {
×
275
                        Ok(Some(block)) => blocks.push(block),
×
276
                        Ok(None) => warn!(
×
277
                            target: LOG_TARGET,
×
278
                            "Could not provide requested block with commitment {commitment_hex} because not stored"
279
                        ),
280
                        Err(e) => warn!(
×
281
                            target: LOG_TARGET,
×
282
                            "Could not provide requested block with commitment {commitment_hex} because: {e}"
283
                        ),
284
                    }
285
                }
286
                Ok(NodeCommsResponse::HistoricalBlocks(blocks))
×
287
            },
288
            NodeCommsRequest::GetHeaderByHash(hash) => {
×
289
                let header = self.blockchain_db.fetch_chain_header_by_block_hash(hash).await?;
×
290
                Ok(NodeCommsResponse::BlockHeader(header))
×
291
            },
292
            NodeCommsRequest::GetBlockByHash(hash) => {
×
293
                let block = self.blockchain_db.fetch_block_by_hash(hash, false).await?;
×
294
                Ok(NodeCommsResponse::HistoricalBlock(Box::new(block)))
×
295
            },
296
            NodeCommsRequest::GetNewBlockTemplate(request) => {
4✔
297
                // We need the mempool to have seen the latest base node tip before we can be confident the template is
298
                // correct. Rather than busy-polling the mempool, we wait on its last-seen subscription. If the mempool
299
                // advances *past* our captured tip (a newer block was added while we were preparing), the tip - and any
300
                // template built on it - is stale, so we re-fetch a fresher tip and try again. The whole operation is
301
                // bounded by a single overall deadline, after which we proceed with whatever tip we have.
302
                //
303
                // Clone the long-lived subscription (set up at startup). Cloning marks the new receiver as having seen
304
                // the current value, so we read it first and only then wait for the *next* change - an update landing
305
                // between the read and the wait cannot be missed.
306
                let mut last_seen_rx = self.mempool_last_seen.clone();
4✔
307
                let deadline = Instant::now() + self.mempool_sync_timeout;
4✔
308

309
                let best_block_header;
310
                let is_mempool_synced;
311
                // The tip height evaluated on the previous iteration; lets us tell whether a re-fetch actually advanced
312
                // the tip, so we only re-fetch on genuine progress (and wait otherwise) instead of spinning.
313
                let mut prev_tip_height: Option<u64> = None;
4✔
314
                loop {
315
                    let header = self.blockchain_db.fetch_tip_header().await?;
5✔
316
                    let tip_hash = *header.hash();
5✔
317
                    let tip_height = header.height();
5✔
318
                    // MempoolLastSeen is Copy, so this releases the receiver borrow immediately (never held over
319
                    // .await).
320
                    let seen = *last_seen_rx.borrow_and_update();
5✔
321

322
                    // A default hash means the mempool has not processed any block yet, so there is nothing to wait
323
                    // for.
324
                    if seen.hash == tip_hash || seen.hash == FixedHash::default() {
5✔
325
                        best_block_header = header;
4✔
326
                        is_mempool_synced = true;
4✔
327
                        break;
4✔
328
                    }
1✔
329

330
                    // Mempool is ahead of our tip: a newer block landed. Re-fetch a fresher tip, but only if the last
331
                    // re-fetch made progress - otherwise fall through and wait for the next mempool update so we don't
332
                    // spin re-fetching an unchanged tip.
333
                    if seen.height > tip_height && prev_tip_height != Some(tip_height) && Instant::now() < deadline {
1✔
334
                        prev_tip_height = Some(tip_height);
×
335
                        continue;
×
336
                    }
1✔
337

338
                    let remaining = deadline.saturating_duration_since(Instant::now());
1✔
339
                    if remaining.is_zero() {
1✔
340
                        best_block_header = header;
×
341
                        is_mempool_synced = false;
×
342
                        break;
×
343
                    }
1✔
344
                    prev_tip_height = Some(tip_height);
1✔
345
                    // `changed()` errors only once the mempool (sender) is dropped, in which case we stop waiting.
346
                    match tokio::time::timeout(remaining, last_seen_rx.changed()).await {
1✔
347
                        Ok(Ok(())) => continue,
1✔
348
                        Ok(Err(_)) | Err(_) => {
349
                            best_block_header = header;
×
350
                            is_mempool_synced = false;
×
351
                            break;
×
352
                        },
353
                    }
354
                }
355

356
                if !is_mempool_synced {
4✔
357
                    warn!(
×
358
                        target: LOG_TARGET,
×
359
                        "Mempool out of sync - last seen hash '{}' does not match the tip hash '{}'. This condition \
360
                         should auto correct with the next block template request",
361
                        last_seen_rx.borrow().hash, best_block_header.hash()
×
362
                    );
363
                }
4✔
364
                let mut header = BlockHeader::from_previous(best_block_header.header());
4✔
365
                let constants = self.consensus_manager.consensus_constants(header.height);
4✔
366
                header.version = constants.blockchain_version().into();
4✔
367
                header.pow.pow_algo = request.algo;
4✔
368

369
                let constants_weight = constants.max_block_transaction_weight();
4✔
370
                let asking_weight = if request.max_weight > constants_weight || request.max_weight == 0 {
4✔
371
                    constants_weight
4✔
372
                } else {
373
                    request.max_weight
×
374
                };
375

376
                debug!(
4✔
377
                    target: LOG_TARGET,
×
378
                    "Fetching transactions with a maximum weight of {asking_weight} for the template"
379
                );
380
                let transactions = self
4✔
381
                    .mempool
4✔
382
                    .retrieve(asking_weight)
4✔
383
                    .await?
4✔
384
                    .into_iter()
4✔
385
                    .map(|tx| Arc::try_unwrap(tx).unwrap_or_else(|tx| (*tx).clone()))
8✔
386
                    .collect::<Vec<_>>();
4✔
387

388
                debug!(
4✔
389
                    target: LOG_TARGET,
×
390
                    "Adding {} transaction(s) to new block template",
391
                    transactions.len(),
×
392
                );
393

394
                let prev_hash = header.prev_hash;
4✔
395
                let height = header.height;
4✔
396

397
                let block = header.into_builder().with_transactions(transactions).build();
4✔
398
                let block_hash = block.hash();
4✔
399
                let block_template = NewBlockTemplate::from_block(
4✔
400
                    block,
4✔
401
                    self.get_target_difficulty_for_next_block(request.algo, constants, prev_hash)
4✔
402
                        .await?,
4✔
403
                    self.consensus_manager.get_block_reward_at(height),
4✔
404
                    is_mempool_synced,
4✔
405
                )?;
×
406

407
                debug!(target: LOG_TARGET,
4✔
408
                    "New block template requested and prepared at height: #{}, target difficulty: {}, block hash: `{}`, weight: {}, {}",
409
                    block_template.header.height,
410
                    block_template.target_difficulty,
411
                    block_hash.to_hex(),
×
412
                    block_template
×
413
                        .body
×
414
                        .calculate_weight(constants.transaction_weight_params())
×
415
                        .map_err(|e| CommsInterfaceError::InternalError(e.to_string()))?,
×
416
                    block_template.body.to_counts_string()
×
417
                );
418

419
                Ok(NodeCommsResponse::NewBlockTemplate(block_template))
4✔
420
            },
421
            NodeCommsRequest::GetNewBlock(block_template) => {
3✔
422
                let height = block_template.header.height;
3✔
423
                let target_difficulty = block_template.target_difficulty;
3✔
424
                let block = self.blockchain_db.prepare_new_block(block_template).await?;
3✔
425
                let constants = self.consensus_manager.consensus_constants(block.header.height);
3✔
426
                debug!(target: LOG_TARGET,
3✔
427
                    "Prepared block: #{}, target difficulty: {}, block hash: `{}`, weight: {}, {}",
428
                    height,
429
                    target_difficulty,
430
                    block.hash().to_hex(),
×
431
                    block
×
432
                        .body
×
433
                        .calculate_weight(constants.transaction_weight_params())
×
434
                        .map_err(|e| CommsInterfaceError::InternalError(e.to_string()))?,
×
435
                    block.body.to_counts_string()
×
436
                );
437
                Ok(NodeCommsResponse::NewBlock {
3✔
438
                    success: true,
3✔
439
                    error: None,
3✔
440
                    block: Some(block),
3✔
441
                })
3✔
442
            },
443
            NodeCommsRequest::GetBlockFromAllChains(hash) => {
4✔
444
                let block_hex = hash.to_hex();
4✔
445
                debug!(
4✔
446
                    target: LOG_TARGET,
×
447
                    "A peer has requested a block with hash {block_hex}"
448
                );
449

450
                #[allow(clippy::blocks_in_conditions)]
451
                let maybe_block = match self
4✔
452
                    .blockchain_db
4✔
453
                    .fetch_block_by_hash(hash, true)
4✔
454
                    .await
4✔
455
                    .unwrap_or_else(|e| {
4✔
456
                        warn!(
×
457
                            target: LOG_TARGET,
×
458
                            "Could not provide requested block {block_hex} to peer because: {e}",
459
                        );
460

461
                        None
×
462
                    }) {
×
463
                    None => self.blockchain_db.fetch_orphan(hash).await.map_or_else(
1✔
464
                        |e| {
1✔
465
                            warn!(
1✔
466
                                target: LOG_TARGET,
×
467
                                "Could not provide requested block {block_hex} to peer because: {e}"
468
                            );
469

470
                            None
1✔
471
                        },
1✔
472
                        Some,
473
                    ),
474
                    Some(block) => Some(block.into_block()),
3✔
475
                };
476

477
                Ok(NodeCommsResponse::Block(Box::new(maybe_block)))
4✔
478
            },
479
            NodeCommsRequest::FetchKernelByExcessSig(signature) => {
1✔
480
                let kernels = match self.blockchain_db.fetch_kernel_by_excess_sig(signature).await {
1✔
481
                    Ok(Some((kernel, _))) => vec![kernel],
1✔
482
                    Ok(None) => vec![],
×
483
                    Err(err) => {
×
484
                        error!(target: LOG_TARGET, "Could not fetch kernel {err}");
×
485
                        return Err(err.into());
×
486
                    },
487
                };
488

489
                Ok(NodeCommsResponse::TransactionKernels(kernels))
1✔
490
            },
491
            NodeCommsRequest::FetchMempoolTransactionsByExcessSigs { excess_sigs } => {
4✔
492
                let (transactions, not_found) = self.mempool.retrieve_by_excess_sigs(excess_sigs).await?;
4✔
493
                Ok(NodeCommsResponse::FetchMempoolTransactionsByExcessSigsResponse(
4✔
494
                    FetchMempoolTransactionsResponse {
4✔
495
                        transactions,
4✔
496
                        not_found,
4✔
497
                    },
4✔
498
                ))
4✔
499
            },
500
            NodeCommsRequest::FetchValidatorNodesKeys {
501
                height,
×
502
                validator_network,
×
503
            } => {
504
                let active_validator_nodes = self
×
505
                    .blockchain_db
×
506
                    .fetch_active_validator_nodes(height, validator_network)
×
507
                    .await?;
×
508
                Ok(NodeCommsResponse::FetchValidatorNodesKeysResponse(
×
509
                    active_validator_nodes,
×
510
                ))
×
511
            },
512
            NodeCommsRequest::GetValidatorNode {
513
                sidechain_id,
×
514
                public_key,
×
515
            } => {
516
                let vn = self.blockchain_db.get_validator_node(sidechain_id, public_key).await?;
×
517
                Ok(NodeCommsResponse::GetValidatorNode(vn))
×
518
            },
519
            NodeCommsRequest::FetchTemplateRegistrations {
520
                start_height,
×
521
                end_height,
×
522
            } => {
523
                let template_registrations = self
×
524
                    .blockchain_db
×
525
                    .fetch_template_registrations(start_height..=end_height)
×
526
                    .await?;
×
527
                Ok(NodeCommsResponse::FetchTemplateRegistrationsResponse(
×
528
                    template_registrations,
×
529
                ))
×
530
            },
531
            NodeCommsRequest::FetchUnspentUtxosInBlock { block_hash } => {
×
532
                let utxos = self.blockchain_db.fetch_outputs_in_block(block_hash).await?;
×
533
                Ok(NodeCommsResponse::TransactionOutputs(utxos))
×
534
            },
535
            NodeCommsRequest::FetchMinedInfoByPayRef(payref) => {
×
536
                let output_info = self.blockchain_db.fetch_mined_info_by_payref(payref).await?;
×
537
                Ok(NodeCommsResponse::MinedInfo(output_info))
×
538
            },
539
            NodeCommsRequest::FetchMinedInfoByOutputHash(output_hash) => {
×
540
                let output_info = self.blockchain_db.fetch_mined_info_by_output_hash(output_hash).await?;
×
541
                Ok(NodeCommsResponse::MinedInfo(output_info))
×
542
            },
543
            NodeCommsRequest::FetchOutputMinedInfo(output_hash) => {
×
544
                let output_info = self.blockchain_db.fetch_output(output_hash).await?;
×
545
                Ok(NodeCommsResponse::OutputMinedInfo(output_info))
×
546
            },
547
            NodeCommsRequest::CheckOutputSpentStatus(output_hash) => {
×
548
                let input_info = self.blockchain_db.fetch_input(output_hash).await?;
×
549
                Ok(NodeCommsResponse::InputMinedInfo(input_info))
×
550
            },
551
            NodeCommsRequest::FetchValidatorNodeChanges { epoch, sidechain_id } => {
×
552
                let added_validators = self
×
553
                    .blockchain_db
×
554
                    .fetch_validators_activating_in_epoch(sidechain_id.clone(), epoch)
×
555
                    .await?;
×
556

557
                let exit_validators = self
×
558
                    .blockchain_db
×
559
                    .fetch_validators_exiting_in_epoch(sidechain_id.clone(), epoch)
×
560
                    .await?;
×
561

562
                info!(
×
563
                    target: LOG_TARGET,
×
564
                    "Fetched {} validators activating and {} validators exiting in epoch {}",
565
                    added_validators.len(),
×
566
                    exit_validators.len(),
×
567
                    epoch,
568
                );
569

570
                let mut node_changes = Vec::with_capacity(added_validators.len() + exit_validators.len());
×
571

572
                node_changes.extend(added_validators.into_iter().map(|vn| ValidatorNodeChange::Add {
×
573
                    registration: vn.original_registration.into(),
×
574
                    activation_epoch: vn.activation_epoch,
×
575
                    minimum_value_promise: vn.minimum_value_promise,
×
576
                    shard_key: vn.shard_key,
×
577
                }));
×
578

579
                node_changes.extend(exit_validators.into_iter().map(|vn| ValidatorNodeChange::Remove {
×
580
                    public_key: vn.public_key,
×
581
                }));
×
582

583
                Ok(NodeCommsResponse::FetchValidatorNodeChangesResponse(node_changes))
×
584
            },
585
        }
586
    }
131✔
587

588
    /// Handles a `NewBlock` message. Only a single `NewBlock` message can be handled at once to prevent extraneous
589
    /// requests for the full block.
590
    /// This may (asynchronously) block until the other request(s) complete or time out and so should typically be
591
    /// executed in a dedicated task.
592
    pub async fn handle_new_block_message(
24✔
593
        &mut self,
24✔
594
        new_block: NewBlock,
24✔
595
        source_peer: NodeId,
24✔
596
    ) -> Result<(), CommsInterfaceError> {
24✔
597
        let block_hash = new_block.header.hash();
24✔
598

599
        if self.blockchain_db.inner().is_add_block_disabled() {
24✔
600
            info!(
×
601
                target: LOG_TARGET,
×
602
                "Ignoring block message ({}) because add_block is locked",
603
                block_hash.to_hex()
×
604
            );
605
            return Ok(());
×
606
        }
24✔
607

608
        // Lets check if the block exists before we try and ask for a complete block
609
        if self.check_exists_and_not_bad_block(block_hash).await? {
24✔
610
            return Ok(());
×
611
        }
24✔
612

613
        // lets check that the difficulty at least matches 50% of the tip header. The max difficulty drop is 16%, thus
614
        // 50% is way more than that and in order to attack the node, you need 50% of the mining power. We cannot check
615
        // the target difficulty as orphan blocks dont have a target difficulty. All we care here is that bad
616
        // blocks are not free to make, and that they are more expensive to make then they are to validate. As
617
        // soon as a block can be linked to the main chain, a proper full proof of work check will
618
        // be done before any other validation.
619
        self.check_min_block_difficulty(&new_block).await?;
24✔
620

621
        {
622
            // we use a double lock to make sure we can only reconcile one unique block at a time. We may receive the
623
            // same block from multiple peer near simultaneously. We should only reconcile each unique block once.
624
            let read_lock = self.list_of_reconciling_blocks.read().await;
24✔
625
            if read_lock.contains(&block_hash) {
24✔
626
                debug!(
×
627
                    target: LOG_TARGET,
×
628
                    "Block with hash `{}` is already being reconciled",
629
                    block_hash.to_hex()
×
630
                );
631
                return Ok(());
×
632
            }
24✔
633
        }
634
        {
635
            let mut write_lock = self.list_of_reconciling_blocks.write().await;
24✔
636
            if self.check_exists_and_not_bad_block(block_hash).await? {
24✔
637
                return Ok(());
1✔
638
            }
23✔
639

640
            if !write_lock.insert(block_hash) {
23✔
641
                debug!(
×
642
                    target: LOG_TARGET,
×
643
                    "Block with hash `{}` is already being reconciled",
644
                    block_hash.to_hex()
×
645
                );
646
                return Ok(());
×
647
            }
23✔
648
        }
649

650
        debug!(
23✔
651
            target: LOG_TARGET,
×
652
            "Block with hash `{}` is unknown. Constructing block from known mempool transactions / requesting missing \
653
             transactions from peer '{}'.",
654
            block_hash.to_hex(),
×
655
            source_peer
656
        );
657

658
        let result = self.reconcile_and_add_block(source_peer.clone(), new_block).await;
23✔
659

660
        {
661
            let mut write_lock = self.list_of_reconciling_blocks.write().await;
23✔
662
            write_lock.remove(&block_hash);
23✔
663
        }
664
        result?;
23✔
665
        Ok(())
20✔
666
    }
24✔
667

668
    async fn check_min_block_difficulty(&self, new_block: &NewBlock) -> Result<(), CommsInterfaceError> {
24✔
669
        let constants = self.consensus_manager.consensus_constants(new_block.header.height);
24✔
670
        let gen_hash = *self.consensus_manager.get_genesis_block().hash();
24✔
671
        let mut min_difficulty = constants.min_pow_difficulty(new_block.header.pow.pow_algo);
24✔
672
        let mut header = self.blockchain_db.fetch_last_chain_header().await?;
24✔
673
        loop {
674
            if new_block.header.pow_algo() == header.header().pow_algo() {
24✔
675
                min_difficulty = max(
24✔
676
                    header
24✔
677
                        .accumulated_data()
24✔
678
                        .target_difficulty
24✔
679
                        .checked_div_u64(2)
24✔
680
                        .unwrap_or(min_difficulty),
24✔
681
                    min_difficulty,
24✔
682
                );
24✔
683
                break;
24✔
684
            }
×
685
            if header.height() == 0 {
×
686
                break;
×
687
            }
×
688
            // we have not reached gen block, and the pow algo does not match, so lets go further back
689
            header = self
×
690
                .blockchain_db
×
691
                .fetch_chain_header(header.height().saturating_sub(1))
×
692
                .await?;
×
693
        }
694
        let achieved = match new_block.header.pow_algo() {
24✔
695
            PowAlgorithm::RandomXM => monero_randomx_difficulty(
×
696
                &new_block.header,
×
697
                &self.randomx_factory,
×
698
                &gen_hash,
×
699
                &self.consensus_manager,
×
700
            )?,
×
701
            PowAlgorithm::Sha3x => sha3x_difficulty(&new_block.header)?,
24✔
702
            PowAlgorithm::RandomXT => {
703
                let vm_key = *self
×
704
                    .blockchain_db
×
705
                    .fetch_chain_header(tari_rx_vm_key_height(new_block.header.height))
×
706
                    .await?
×
707
                    .hash();
×
708
                tari_randomx_difficulty(&new_block.header, &self.randomx_factory, &vm_key)?
×
709
            },
710
            PowAlgorithm::Cuckaroo => {
711
                let constants = self.consensus_manager.consensus_constants(new_block.header.height);
×
712
                let cuckaroo_cycle = constants.cuckaroo_cycle_length();
×
713
                let edge_bits = constants.cuckaroo_edge_bits();
×
714
                cuckaroo_difficulty(&new_block.header, cuckaroo_cycle, edge_bits)?
×
715
            },
716
        };
717
        if achieved < min_difficulty {
24✔
718
            debug!(
×
719
                target: LOG_TARGET,
×
720
                "Block failed with invalid pow: {new_block}"
721
            );
722
            return Err(CommsInterfaceError::InvalidBlockHeader(
×
723
                BlockHeaderValidationError::ProofOfWorkError(PowError::AchievedDifficultyBelowMin {
×
724
                    minimum: min_difficulty,
×
725
                    achieved,
×
726
                }),
×
727
            ));
×
728
        }
24✔
729
        Ok(())
24✔
730
    }
24✔
731

732
    async fn check_exists_and_not_bad_block(&self, block: FixedHash) -> Result<bool, CommsInterfaceError> {
48✔
733
        if self.blockchain_db.chain_header_or_orphan_exists(block).await? {
48✔
734
            debug!(
1✔
735
                target: LOG_TARGET,
×
736
                "Block with hash `{}` already stored",
737
                block.to_hex()
×
738
            );
739
            return Ok(true);
1✔
740
        }
47✔
741
        let (is_bad_block, reason) = self.blockchain_db.bad_block_exists(block).await?;
47✔
742
        if is_bad_block {
47✔
743
            debug!(
×
744
                target: LOG_TARGET,
×
745
                "Block with hash `{}` already validated as a bad block due to `{}`",
746
                block.to_hex(), reason
×
747
            );
748
            return Err(CommsInterfaceError::ChainStorageError(
×
749
                ChainStorageError::ValidationError {
×
750
                    source: ValidationError::BadBlockFound {
×
751
                        hash: block.to_hex(),
×
752
                        reason,
×
753
                    },
×
754
                },
×
755
            ));
×
756
        }
47✔
757
        Ok(false)
47✔
758
    }
48✔
759

760
    async fn reconcile_and_add_block(
23✔
761
        &mut self,
23✔
762
        source_peer: NodeId,
23✔
763
        new_block: NewBlock,
23✔
764
    ) -> Result<(), CommsInterfaceError> {
23✔
765
        let block = self.reconcile_block(source_peer.clone(), new_block).await?;
23✔
766
        self.handle_block(block, Some(source_peer)).await?;
22✔
767
        Ok(())
20✔
768
    }
23✔
769

770
    #[allow(clippy::too_many_lines)]
771
    async fn reconcile_block(
23✔
772
        &mut self,
23✔
773
        source_peer: NodeId,
23✔
774
        new_block: NewBlock,
23✔
775
    ) -> Result<Block, CommsInterfaceError> {
23✔
776
        let NewBlock {
777
            header,
23✔
778
            coinbase_kernels,
23✔
779
            coinbase_outputs,
23✔
780
            kernel_excess_sigs: excess_sigs,
23✔
781
        } = new_block;
23✔
782
        // If the block is empty, we dont have to ask for the block, as we already have the full block available
783
        // to us.
784
        if excess_sigs.is_empty() {
23✔
785
            let block = BlockBuilder::new(header.version)
17✔
786
                .add_outputs(coinbase_outputs)
17✔
787
                .add_kernels(coinbase_kernels)
17✔
788
                .with_header(header)
17✔
789
                .build();
17✔
790
            return Ok(block);
17✔
791
        }
6✔
792

793
        let block_hash = header.hash();
6✔
794
        // We check the current tip and orphan status of the block because we cannot guarantee that mempool state is
795
        // correct and the mmr root calculation is only valid if the block is building on the tip.
796
        let current_meta = self.blockchain_db.get_chain_metadata().await?;
6✔
797
        if header.prev_hash != *current_meta.best_block_hash() {
6✔
798
            debug!(
×
799
                target: LOG_TARGET,
×
800
                "Orphaned block #{}: ({}), current tip is: #{} ({}). We need to fetch the complete block from peer: \
801
                 ({})",
802
                header.height,
803
                block_hash.to_hex(),
×
804
                current_meta.best_block_height(),
×
805
                current_meta.best_block_hash().to_hex(),
×
806
                source_peer,
807
            );
808
            #[allow(clippy::cast_possible_wrap)]
809
            #[cfg(feature = "metrics")]
810
            metrics::compact_block_tx_misses(header.height).set(excess_sigs.len() as i64);
×
811
            let block = self.request_full_block_from_peer(source_peer, block_hash).await?;
×
812
            return Ok(block);
×
813
        }
6✔
814

815
        // We know that the block is neither and orphan or a coinbase, so lets ask our mempool for the transactions
816
        let (known_transactions, missing_excess_sigs) = self.mempool.retrieve_by_excess_sigs(excess_sigs).await?;
6✔
817
        let known_transactions = known_transactions.into_iter().map(|tx| (*tx).clone()).collect();
6✔
818

819
        #[allow(clippy::cast_possible_wrap)]
820
        #[cfg(feature = "metrics")]
821
        metrics::compact_block_tx_misses(header.height).set(missing_excess_sigs.len() as i64);
6✔
822

823
        let mut builder = BlockBuilder::new(header.version)
6✔
824
            .add_outputs(coinbase_outputs)
6✔
825
            .add_kernels(coinbase_kernels)
6✔
826
            .with_transactions(known_transactions);
6✔
827

828
        if missing_excess_sigs.is_empty() {
6✔
829
            debug!(
2✔
830
                target: LOG_TARGET,
×
831
                "All transactions for block #{} ({}) found in mempool",
832
                header.height,
833
                block_hash.to_hex()
×
834
            );
835
        } else {
836
            debug!(
4✔
837
                target: LOG_TARGET,
×
838
                "Requesting {} unknown transaction(s) from peer '{}'.",
839
                missing_excess_sigs.len(),
×
840
                source_peer
841
            );
842

843
            let FetchMempoolTransactionsResponse {
844
                transactions,
4✔
845
                not_found,
4✔
846
            } = self
4✔
847
                .outbound_nci
4✔
848
                .request_transactions_by_excess_sig(source_peer.clone(), missing_excess_sigs)
4✔
849
                .await?;
4✔
850

851
            // Add returned transactions to unconfirmed pool
852
            if !transactions.is_empty() {
4✔
853
                self.mempool.insert_all(transactions.clone()).await?;
×
854
            }
4✔
855

856
            if !not_found.is_empty() {
4✔
857
                warn!(
4✔
858
                    target: LOG_TARGET,
×
859
                    "Peer {} was not able to return all transactions for block #{} ({}). {} transaction(s) not found. \
860
                     Requesting full block.",
861
                    source_peer,
862
                    header.height,
863
                    block_hash.to_hex(),
×
864
                    not_found.len()
×
865
                );
866

867
                #[cfg(feature = "metrics")]
868
                metrics::compact_block_full_misses(header.height).inc();
4✔
869
                let block = self.request_full_block_from_peer(source_peer, block_hash).await?;
4✔
870
                return Ok(block);
3✔
871
            }
×
872

873
            builder = builder.with_transactions(
×
874
                transactions
×
875
                    .into_iter()
×
876
                    .map(|tx| Arc::try_unwrap(tx).unwrap_or_else(|tx| (*tx).clone()))
×
877
                    .collect(),
×
878
            );
879
        }
880

881
        // NB: Add the header last because `with_transactions` etc updates the current header, but we have the final one
882
        // already
883
        builder = builder.with_header(header.clone());
2✔
884
        let block = builder.build();
2✔
885

886
        // Perform a sanity check on the reconstructed block, if the MMR roots don't match then it's possible one or
887
        // more transactions in our mempool had the same excess/signature for a *different* transaction.
888
        // This is extremely unlikely, but still possible. In case of a mismatch, request the full block from the peer.
889
        let (block, mmr_roots) = match self.blockchain_db.calculate_mmr_roots(block).await {
2✔
890
            Err(_) => {
891
                let block = self.request_full_block_from_peer(source_peer, block_hash).await?;
×
892
                return Ok(block);
×
893
            },
894
            Ok(v) => v,
2✔
895
        };
896
        if let Err(e) = helpers::check_mmr_roots(&header, &mmr_roots) {
2✔
897
            warn!(
×
898
                target: LOG_TARGET,
×
899
                "Reconstructed block #{} ({}) failed MMR check validation!. Requesting full block. Error: {}",
900
                header.height,
901
                block_hash.to_hex(),
×
902
                e,
903
            );
904

905
            #[cfg(feature = "metrics")]
906
            metrics::compact_block_mmr_mismatch(header.height).inc();
×
907
            let block = self.request_full_block_from_peer(source_peer, block_hash).await?;
×
908
            return Ok(block);
×
909
        }
2✔
910

911
        Ok(block)
2✔
912
    }
23✔
913

914
    async fn request_full_block_from_peer(
4✔
915
        &mut self,
4✔
916
        source_peer: NodeId,
4✔
917
        block_hash: BlockHash,
4✔
918
    ) -> Result<Block, CommsInterfaceError> {
4✔
919
        match self
4✔
920
            .outbound_nci
4✔
921
            .request_blocks_by_hashes_from_peer(block_hash, Some(source_peer.clone()))
4✔
922
            .await
4✔
923
        {
924
            Ok(Some(block)) => Ok(block),
3✔
925
            Ok(None) => {
926
                debug!(
1✔
927
                    target: LOG_TARGET,
×
928
                    "Peer `{source_peer}` failed to return the block that was requested."
929
                );
930
                Err(CommsInterfaceError::InvalidPeerResponse(format!(
1✔
931
                    "Invalid response from peer `{source_peer}`: Peer failed to provide the block that was propagated"
1✔
932
                )))
1✔
933
            },
934
            Err(CommsInterfaceError::UnexpectedApiResponse) => {
935
                debug!(
×
936
                    target: LOG_TARGET,
×
937
                    "Peer `{source_peer}` sent unexpected API response."
938
                );
939
                Err(CommsInterfaceError::UnexpectedApiResponse)
×
940
            },
941
            Err(e) => Err(e),
×
942
        }
943
    }
4✔
944

945
    /// Handle inbound blocks from remote nodes and local services.
946
    ///
947
    /// ## Arguments
948
    /// block - the block to store
949
    /// new_block_msg - propagate this new block message
950
    /// source_peer - the peer that sent this new block message, or None if the block was generated by a local miner
951
    pub async fn handle_block(
30✔
952
        &mut self,
30✔
953
        block: Block,
30✔
954
        source_peer: Option<NodeId>,
30✔
955
    ) -> Result<BlockHash, CommsInterfaceError> {
30✔
956
        let block_hash = block.hash();
30✔
957
        let block_height = block.header.height;
30✔
958

959
        info!(
30✔
960
            target: LOG_TARGET,
×
961
            "Block #{} ({}) received from {}",
962
            block_height,
963
            block_hash.to_hex(),
×
964
            source_peer
×
965
                .as_ref()
×
966
                .map(|p| format!("remote peer: {p}"))
×
967
                .unwrap_or_else(|| "local services".to_string())
×
968
        );
969
        debug!(target: LOG_TARGET, "Incoming block: {block}");
30✔
970
        let timer = Instant::now();
30✔
971
        let block = self.hydrate_block(block).await?;
30✔
972

973
        let add_block_result = self.blockchain_db.add_block(block.clone()).await;
30✔
974
        // Create block event on block event stream
975
        match add_block_result {
2✔
976
            Ok(block_add_result) => {
28✔
977
                debug!(
28✔
978
                    target: LOG_TARGET,
×
979
                    "Block #{} ({}) added ({}) to blockchain in {:.2?}",
980
                    block_height,
981
                    block_hash.to_hex(),
×
982
                    block_add_result,
983
                    timer.elapsed()
×
984
                );
985

986
                let should_propagate = match &block_add_result {
28✔
987
                    BlockAddResult::Ok(_) => true,
28✔
988
                    BlockAddResult::BlockExists => false,
×
989
                    BlockAddResult::OrphanBlock => false,
×
990
                    BlockAddResult::ChainReorg { .. } => true,
×
991
                };
992

993
                #[cfg(feature = "metrics")]
994
                self.update_block_result_metrics(&block_add_result).await?;
28✔
995

996
                self.publish_block_event(BlockEvent::ValidBlockAdded(block.clone(), block_add_result));
28✔
997

998
                if should_propagate {
28✔
999
                    debug!(
28✔
1000
                        target: LOG_TARGET,
×
1001
                        "Propagate block ({}) to network.",
1002
                        block_hash.to_hex()
×
1003
                    );
1004
                    let exclude_peers = source_peer.into_iter().collect();
28✔
1005
                    let new_block_msg = NewBlock::from(&*block);
28✔
1006
                    if let Err(e) = self.outbound_nci.propagate_block(new_block_msg, exclude_peers).await {
28✔
1007
                        warn!(
×
1008
                            target: LOG_TARGET,
×
1009
                            "Failed to propagate block ({}) to network: {}.",
1010
                            block_hash.to_hex(), e
×
1011
                        );
1012
                    }
28✔
1013
                }
×
1014
                Ok(block_hash)
28✔
1015
            },
1016

1017
            Err(e @ ChainStorageError::ValidationError { .. }) => {
2✔
1018
                #[cfg(feature = "metrics")]
1019
                {
2✔
1020
                    let block_hash = block.hash();
2✔
1021
                    metrics::rejected_blocks(block.header.height, &block_hash).inc();
2✔
1022
                }
2✔
1023
                warn!(
2✔
1024
                    target: LOG_TARGET,
×
1025
                    "Peer {} sent an invalid block: {}",
1026
                    source_peer
×
1027
                        .as_ref()
×
1028
                        .map(ToString::to_string)
×
1029
                        .unwrap_or_else(|| "<local request>".to_string()),
×
1030
                    e
1031
                );
1032
                self.publish_block_event(BlockEvent::AddBlockValidationFailed { block, source_peer });
2✔
1033
                Err(e.into())
2✔
1034
            },
1035

1036
            Err(e) => {
×
1037
                #[cfg(feature = "metrics")]
1038
                metrics::rejected_blocks(block.header.height, &block.hash()).inc();
×
1039

1040
                self.publish_block_event(BlockEvent::AddBlockErrored { block });
×
1041
                Err(e.into())
×
1042
            },
1043
        }
1044
    }
30✔
1045

1046
    async fn hydrate_block(&mut self, block: Block) -> Result<Arc<Block>, CommsInterfaceError> {
30✔
1047
        let block_hash = block.hash();
30✔
1048
        let block_height = block.header.height;
30✔
1049
        if block.body.inputs().is_empty() {
30✔
1050
            debug!(
23✔
1051
                target: LOG_TARGET,
×
1052
                "Block #{} ({}) contains no inputs so nothing to hydrate",
1053
                block_height,
1054
                block_hash.to_hex(),
×
1055
            );
1056
            return Ok(Arc::new(block));
23✔
1057
        }
7✔
1058

1059
        let timer = Instant::now();
7✔
1060
        let (header, mut inputs, outputs, kernels) = block.dissolve();
7✔
1061

1062
        let db = self.blockchain_db.inner().db_read_access()?;
7✔
1063
        for input in &mut inputs {
9✔
1064
            if !input.is_compact() {
9✔
1065
                continue;
6✔
1066
            }
3✔
1067

1068
            let output_mined_info =
3✔
1069
                db.fetch_output(&input.output_hash())?
3✔
1070
                    .ok_or_else(|| CommsInterfaceError::InvalidFullBlock {
3✔
1071
                        hash: block_hash,
×
1072
                        details: format!("Output {} to be spent does not exist in db", input.output_hash()),
×
1073
                    })?;
×
1074

1075
            input.add_output_data(output_mined_info.output);
3✔
1076
        }
1077
        debug!(
7✔
1078
            target: LOG_TARGET,
×
1079
            "Hydrated block #{} ({}) with {} input(s) in {:.2?}",
1080
            block_height,
1081
            block_hash.to_hex(),
×
1082
            inputs.len(),
×
1083
            timer.elapsed()
×
1084
        );
1085
        let block = Block::new(header, AggregateBody::new(inputs, outputs, kernels));
7✔
1086
        Ok(Arc::new(block))
7✔
1087
    }
30✔
1088

1089
    fn publish_block_event(&self, event: BlockEvent) {
30✔
1090
        if let Err(event) = self.block_event_sender.send(Arc::new(event)) {
30✔
1091
            debug!(target: LOG_TARGET, "No event subscribers. Event {} dropped.", event.0)
×
1092
        }
30✔
1093
    }
30✔
1094

1095
    #[cfg(feature = "metrics")]
1096
    async fn update_block_result_metrics(&self, block_add_result: &BlockAddResult) -> Result<(), CommsInterfaceError> {
28✔
1097
        fn update_target_difficulty(block: &ChainBlock) {
28✔
1098
            match block.header().pow_algo() {
28✔
1099
                PowAlgorithm::Sha3x => {
28✔
1100
                    metrics::target_difficulty_sha()
28✔
1101
                        .set(i64::try_from(block.accumulated_data().target_difficulty.as_u64()).unwrap_or(i64::MAX));
28✔
1102
                },
28✔
1103
                PowAlgorithm::RandomXM => {
×
1104
                    metrics::target_difficulty_monero_randomx()
×
1105
                        .set(i64::try_from(block.accumulated_data().target_difficulty.as_u64()).unwrap_or(i64::MAX));
×
1106
                },
×
1107
                PowAlgorithm::RandomXT => {
×
1108
                    metrics::target_difficulty_tari_randomx()
×
1109
                        .set(i64::try_from(block.accumulated_data().target_difficulty.as_u64()).unwrap_or(i64::MAX));
×
1110
                },
×
1111
                PowAlgorithm::Cuckaroo => {
×
1112
                    metrics::target_difficulty_cuckaroo()
×
1113
                        .set(i64::try_from(block.accumulated_data().target_difficulty.as_u64()).unwrap_or(i64::MAX));
×
1114
                },
×
1115
            }
1116
        }
28✔
1117

1118
        match block_add_result {
28✔
1119
            BlockAddResult::Ok(block) => {
28✔
1120
                update_target_difficulty(block);
28✔
1121
                self.update_difficulty_indicators(block.height()).await?;
28✔
1122
                #[allow(clippy::cast_possible_wrap)]
1123
                metrics::tip_height().set(block.height() as i64);
28✔
1124
                let utxo_set_size = self.blockchain_db.utxo_count().await?;
28✔
1125
                metrics::utxo_set_size().set(utxo_set_size.try_into().unwrap_or(i64::MAX));
28✔
1126
                metrics::reorg_blocks_added().set(0);
28✔
1127
                metrics::reorg_blocks_removed().set(0);
28✔
1128
            },
1129
            BlockAddResult::ChainReorg { added, removed } => {
×
1130
                if let Some(fork_height) = added.last().map(|b| b.height()) {
×
1131
                    #[allow(clippy::cast_possible_wrap)]
1132
                    metrics::tip_height().set(fork_height as i64);
×
1133
                    metrics::reorg(fork_height, added.len(), removed.len()).inc();
×
1134
                    #[allow(clippy::cast_possible_wrap)]
1135
                    metrics::reorg_blocks_added().set(added.len() as i64);
×
1136
                    #[allow(clippy::cast_possible_wrap)]
1137
                    metrics::reorg_blocks_removed().set(removed.len() as i64);
×
1138

1139
                    let utxo_set_size = self.blockchain_db.utxo_count().await?;
×
1140
                    metrics::utxo_set_size().set(utxo_set_size.try_into().unwrap_or(i64::MAX));
×
1141
                }
×
1142
                for block in added {
×
1143
                    update_target_difficulty(block);
×
1144
                    self.update_difficulty_indicators(block.height()).await?;
×
1145
                }
1146
            },
1147
            BlockAddResult::OrphanBlock => {
×
1148
                metrics::orphaned_blocks().inc();
×
1149
                metrics::reorg_blocks_added().set(0);
×
1150
                metrics::reorg_blocks_removed().set(0);
×
1151
            },
×
1152
            _ => {},
×
1153
        }
1154
        Ok(())
28✔
1155
    }
28✔
1156

1157
    #[cfg(feature = "metrics")]
1158
    async fn update_difficulty_indicators(&self, tip: u64) -> Result<(), CommsInterfaceError> {
28✔
1159
        // Use canonical height from tip where reorgs are highly unlikely
1160
        if tip <= DIFF_INDICATOR_LAG {
28✔
1161
            // Not enough history yet; clear or skip
1162
            metrics::accumulated_difficulty_indicator().set(0);
28✔
1163
            metrics::target_difficulty_indicator().set(0);
28✔
1164
            metrics::difficulty_indicator_height().set(0);
28✔
1165
            metrics::target_difficulty().set(0);
28✔
1166
            metrics::accumulated_difficulty_exp2().set(0);
28✔
1167
            metrics::accumulated_difficulty_sig53().set(0);
28✔
1168
            metrics::accumulated_difficulty_as_f64().set(0.0);
28✔
1169
            return Ok(());
28✔
1170
        }
×
1171
        let height = tip - DIFF_INDICATOR_LAG;
×
1172
        let chain_header = self.blockchain_db.fetch_chain_header(height).await?;
×
1173

1174
        // Compute indicators in millibits as `logâ‚‚(value) * 1000` to make huge numbers fathomable in a time-series
1175
        // graph with enough granularity
1176
        let acc_diff_milli_bits = metrics::log2_u512(&chain_header.accumulated_data().total_accumulated_difficulty)
×
1177
            .map(metrics::milli_bits)
×
1178
            .unwrap_or(0);
×
1179
        let target_diff_milli_bits =
×
1180
            metrics::log2_u128(u128::from(chain_header.accumulated_data().target_difficulty.as_u64()))
×
1181
                .map(metrics::milli_bits)
×
1182
                .unwrap_or(0);
×
1183
        let (acc_diff_exp2, acc_diff_sig53) =
×
1184
            metrics::u512_exp2_sig53(&chain_header.accumulated_data().total_accumulated_difficulty).unwrap_or((0, 0));
×
1185
        let acc_diff_as_f64 =
×
1186
            metrics::approximate_u512_with_f64(&chain_header.accumulated_data().total_accumulated_difficulty)
×
1187
                .unwrap_or(0.0);
×
1188

1189
        // Publish
1190
        metrics::accumulated_difficulty_indicator().set(acc_diff_milli_bits);
×
1191
        metrics::target_difficulty_indicator().set(target_diff_milli_bits);
×
1192
        #[allow(clippy::cast_possible_wrap)]
1193
        metrics::difficulty_indicator_height().set(height as i64);
×
1194
        #[allow(clippy::cast_possible_wrap)]
1195
        metrics::target_difficulty()
×
1196
            .set(i64::try_from(chain_header.accumulated_data().target_difficulty.as_u64()).unwrap_or(i64::MAX));
×
1197
        metrics::accumulated_difficulty_exp2().set(acc_diff_exp2);
×
1198
        metrics::accumulated_difficulty_sig53().set(acc_diff_sig53);
×
1199
        metrics::accumulated_difficulty_as_f64().set(acc_diff_as_f64);
×
1200

1201
        Ok(())
×
1202
    }
28✔
1203

1204
    async fn get_target_difficulty_for_next_block(
4✔
1205
        &self,
4✔
1206
        pow_algo: PowAlgorithm,
4✔
1207
        constants: &ConsensusConstants,
4✔
1208
        current_block_hash: HashOutput,
4✔
1209
    ) -> Result<Difficulty, CommsInterfaceError> {
4✔
1210
        let target_difficulty = self
4✔
1211
            .blockchain_db
4✔
1212
            .fetch_target_difficulty_for_next_block(pow_algo, current_block_hash)
4✔
1213
            .await?;
4✔
1214

1215
        let target = target_difficulty.calculate(
4✔
1216
            constants.min_pow_difficulty(pow_algo),
4✔
1217
            constants.max_pow_difficulty(pow_algo),
4✔
1218
        );
1219
        trace!(target: LOG_TARGET, "Target difficulty {target} for PoW {pow_algo}");
4✔
1220
        Ok(target)
4✔
1221
    }
4✔
1222

1223
    pub async fn get_last_seen_hash(&self) -> Result<FixedHash, CommsInterfaceError> {
×
1224
        self.mempool.get_last_seen_hash().await.map_err(|e| e.into())
×
1225
    }
×
1226
}
1227

1228
impl<B> Clone for InboundNodeCommsHandlers<B> {
1229
    fn clone(&self) -> Self {
155✔
1230
        Self {
155✔
1231
            block_event_sender: self.block_event_sender.clone(),
155✔
1232
            blockchain_db: self.blockchain_db.clone(),
155✔
1233
            mempool: self.mempool.clone(),
155✔
1234
            consensus_manager: self.consensus_manager.clone(),
155✔
1235
            list_of_reconciling_blocks: self.list_of_reconciling_blocks.clone(),
155✔
1236
            outbound_nci: self.outbound_nci.clone(),
155✔
1237
            connectivity: self.connectivity.clone(),
155✔
1238
            randomx_factory: self.randomx_factory.clone(),
155✔
1239
            mempool_last_seen: self.mempool_last_seen.clone(),
155✔
1240
            mempool_sync_timeout: self.mempool_sync_timeout,
155✔
1241
        }
155✔
1242
    }
155✔
1243
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc