• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 15484013348

06 Jun 2025 06:08AM UTC coverage: 72.04% (+0.3%) from 71.789%
15484013348

push

github

web-flow
fix(network-discovery): add back idle event handling (#7194)

Description
---
fix(network-discovery): add back idle event handling

Motivation and Context
---
network discovery was spinning at full speed because the Idle event
transition was removed. Network logs would rotate < 1s.

```
[comms::dht::network_discovery::ready] [Thread:123190302967360] DEBUG NetworkDiscovery::Ready: Peer list contains 759 entries. Current discovery rounds in this cycle: 0.
[comms::dht::network_discovery::ready] [Thread:123190302967360] DEBUG First active round (current_num_rounds = 0) and num_peers (759) >= min_desired_peers (16). Forcing DHT discovery.
 [comms::dht::network_discovery::ready] [Thread:123190302967360] DEBUG Selecting 5 random peers for discovery (last round info available: false, new peers in last round: false).
[comms::dht::network_discovery::ready] [Thread:123190302967360] DEBUG No suitable peers found for the forced DHT discovery round (current_num_rounds = 0 path). Transitioning to Idle.
 [comms::dht::network_discovery] [Thread:123190302967360] DEBUG Transition triggered from current state `Ready` by event `Idle`
comms::dht::network_discovery] [Thread:123190302967360] DEBUG No state transition for event `Idle`. The current state is `Ready`

...instant rinse and repeat...
```

This PR adds the idle state transition back. Note that idle will idle
for 30 minutes so should only transition when all work is done and we
have downloaded sufficient peers.

How Has This Been Tested?
---
Manually - console wallet with empty peer db

What process can a PR reviewer use to test or verify this change?
---

<!-- Checklist -->
<!-- 1. Is the title of your PR in the form that would make nice release
notes? The title, excluding the conventional commit
tag, will be included exactly as is in the CHANGELOG, so please think
about it carefully. -->


Breaking Changes
---

- [x] None
- [ ] Requires data directory on base node to be deleted
- [ ] Requires hard fork
- [ ] Other... (continued)

3 of 4 new or added lines in 2 files covered. (75.0%)

412 existing lines in 30 files now uncovered.

80882 of 112274 relevant lines covered (72.04%)

242938.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

51.12
/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs
1
// Copyright 2019. The Tari Project
2
//
3
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
// following conditions are met:
5
//
6
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
// disclaimer.
8
//
9
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
// following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
// products derived from this software without specific prior written permission.
14
//
15
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22

23
#[cfg(feature = "metrics")]
24
use std::convert::{TryFrom, TryInto};
25
use std::{cmp::max, collections::HashSet, sync::Arc, time::Instant};
26

27
use log::*;
28
use strum_macros::Display;
29
use tari_common_types::types::{BlockHash, FixedHash, HashOutput};
30
use tari_comms::{connectivity::ConnectivityRequester, peer_manager::NodeId};
31
use tari_utilities::hex::Hex;
32
use tokio::sync::RwLock;
33

34
#[cfg(feature = "metrics")]
35
use crate::base_node::metrics;
36
use crate::{
37
    base_node::comms_interface::{
38
        error::CommsInterfaceError,
39
        local_interface::BlockEventSender,
40
        FetchMempoolTransactionsResponse,
41
        NodeCommsRequest,
42
        NodeCommsResponse,
43
        OutboundNodeCommsInterface,
44
    },
45
    blocks::{Block, BlockBuilder, BlockHeader, BlockHeaderValidationError, ChainBlock, NewBlock, NewBlockTemplate},
46
    chain_storage::{async_db::AsyncBlockchainDb, BlockAddResult, BlockchainBackend, ChainStorageError},
47
    consensus::{ConsensusConstants, ConsensusManager},
48
    mempool::Mempool,
49
    proof_of_work::{
50
        monero_randomx_difficulty,
51
        randomx_factory::RandomXFactory,
52
        sha3x_difficulty,
53
        tari_randomx_difficulty,
54
        Difficulty,
55
        PowAlgorithm,
56
        PowError,
57
    },
58
    transactions::aggregated_body::AggregateBody,
59
    validation::{helpers, tari_rx_vm_key_height, ValidationError},
60
};
61

62
const LOG_TARGET: &str = "c::bn::comms_interface::inbound_handler";
63
const MAX_REQUEST_BY_BLOCK_HASHES: usize = 100;
64
const MAX_REQUEST_BY_KERNEL_EXCESS_SIGS: usize = 100;
65
const MAX_REQUEST_BY_UTXO_HASHES: usize = 100;
66
const MAX_MEMPOOL_TIMEOUT: u64 = 150;
67

68
/// Events that can be published on the Validated Block Event Stream
69
/// Broadcast is to notify subscribers if this is a valid propagated block event
70
#[derive(Debug, Clone, Display)]
71
pub enum BlockEvent {
72
    ValidBlockAdded(Arc<Block>, BlockAddResult),
73
    AddBlockValidationFailed {
74
        block: Arc<Block>,
75
        source_peer: Option<NodeId>,
76
    },
77
    AddBlockErrored {
78
        block: Arc<Block>,
79
    },
80
    BlockSyncComplete(Arc<ChainBlock>, u64),
81
    BlockSyncRewind(Vec<Arc<ChainBlock>>),
82
}
83

84
/// The InboundNodeCommsInterface is used to handle all received inbound requests from remote nodes.
85
pub struct InboundNodeCommsHandlers<B> {
86
    block_event_sender: BlockEventSender,
87
    blockchain_db: AsyncBlockchainDb<B>,
88
    mempool: Mempool,
89
    consensus_manager: ConsensusManager,
90
    list_of_reconciling_blocks: Arc<RwLock<HashSet<HashOutput>>>,
91
    outbound_nci: OutboundNodeCommsInterface,
92
    connectivity: ConnectivityRequester,
93
    randomx_factory: RandomXFactory,
94
}
95

96
impl<B> InboundNodeCommsHandlers<B>
97
where B: BlockchainBackend + 'static
98
{
99
    /// Construct a new InboundNodeCommsInterface.
100
    pub fn new(
56✔
101
        block_event_sender: BlockEventSender,
56✔
102
        blockchain_db: AsyncBlockchainDb<B>,
56✔
103
        mempool: Mempool,
56✔
104
        consensus_manager: ConsensusManager,
56✔
105
        outbound_nci: OutboundNodeCommsInterface,
56✔
106
        connectivity: ConnectivityRequester,
56✔
107
        randomx_factory: RandomXFactory,
56✔
108
    ) -> Self {
56✔
109
        Self {
56✔
110
            block_event_sender,
56✔
111
            blockchain_db,
56✔
112
            mempool,
56✔
113
            consensus_manager,
56✔
114
            list_of_reconciling_blocks: Arc::new(RwLock::new(HashSet::new())),
56✔
115
            outbound_nci,
56✔
116
            connectivity,
56✔
117
            randomx_factory,
56✔
118
        }
56✔
119
    }
56✔
120

121
    /// Handle inbound node comms requests from remote nodes and local services.
122
    #[allow(clippy::too_many_lines)]
123
    pub async fn handle_request(&self, request: NodeCommsRequest) -> Result<NodeCommsResponse, CommsInterfaceError> {
100✔
124
        trace!(target: LOG_TARGET, "Handling remote request {}", request);
100✔
125
        match request {
100✔
126
            NodeCommsRequest::GetChainMetadata => Ok(NodeCommsResponse::ChainMetadata(
127
                self.blockchain_db.get_chain_metadata().await?,
82✔
128
            )),
129
            NodeCommsRequest::GetTargetDifficultyNextBlock(algo) => {
×
130
                let header = self.blockchain_db.fetch_tip_header().await?;
×
131
                let constants = self.consensus_manager.consensus_constants(header.header().height);
×
132
                let target_difficulty = self
×
133
                    .get_target_difficulty_for_next_block(algo, constants, *header.hash())
×
134
                    .await?;
×
135
                Ok(NodeCommsResponse::TargetDifficulty(target_difficulty))
×
136
            },
137
            NodeCommsRequest::FetchHeaders(range) => {
1✔
138
                let headers = self.blockchain_db.fetch_chain_headers(range).await?;
1✔
139
                Ok(NodeCommsResponse::BlockHeaders(headers))
1✔
140
            },
141
            NodeCommsRequest::FetchHeadersByHashes(block_hashes) => {
×
142
                if block_hashes.len() > MAX_REQUEST_BY_BLOCK_HASHES {
×
143
                    return Err(CommsInterfaceError::InvalidRequest {
×
144
                        request: "FetchHeadersByHashes",
×
145
                        details: format!(
×
146
                            "Exceeded maximum block hashes request (max: {}, got:{})",
×
147
                            MAX_REQUEST_BY_BLOCK_HASHES,
×
148
                            block_hashes.len()
×
149
                        ),
×
150
                    });
×
151
                }
×
152
                let mut block_headers = Vec::with_capacity(block_hashes.len());
×
153
                for block_hash in block_hashes {
×
154
                    let block_hex = block_hash.to_hex();
×
155
                    match self.blockchain_db.fetch_chain_header_by_block_hash(block_hash).await? {
×
156
                        Some(block_header) => {
×
157
                            block_headers.push(block_header);
×
158
                        },
×
159
                        None => {
160
                            error!(target: LOG_TARGET, "Could not fetch headers with hashes:{}", block_hex);
×
161
                            return Err(CommsInterfaceError::InternalError(format!(
×
162
                                "Could not fetch headers with hashes:{}",
×
163
                                block_hex
×
164
                            )));
×
165
                        },
166
                    }
167
                }
168
                Ok(NodeCommsResponse::BlockHeaders(block_headers))
×
169
            },
170
            NodeCommsRequest::FetchMatchingUtxos(utxo_hashes) => {
1✔
171
                let mut res = Vec::with_capacity(utxo_hashes.len());
1✔
172
                for (output, spent) in (self
1✔
173
                    .blockchain_db
1✔
174
                    .fetch_outputs_with_spend_status_at_tip(utxo_hashes)
1✔
175
                    .await?)
1✔
176
                    .into_iter()
1✔
177
                    .flatten()
1✔
178
                {
179
                    if !spent {
1✔
180
                        res.push(output);
1✔
181
                    }
1✔
182
                }
183
                Ok(NodeCommsResponse::TransactionOutputs(res))
1✔
184
            },
185
            NodeCommsRequest::FetchMatchingBlocks { range, compact } => {
3✔
186
                let blocks = self.blockchain_db.fetch_blocks(range, compact).await?;
3✔
187
                Ok(NodeCommsResponse::HistoricalBlocks(blocks))
3✔
188
            },
189
            NodeCommsRequest::FetchBlocksByKernelExcessSigs(excess_sigs) => {
×
190
                if excess_sigs.len() > MAX_REQUEST_BY_KERNEL_EXCESS_SIGS {
×
191
                    return Err(CommsInterfaceError::InvalidRequest {
×
192
                        request: "FetchBlocksByKernelExcessSigs",
×
193
                        details: format!(
×
194
                            "Exceeded maximum number of kernel excess sigs in request (max: {}, got:{})",
×
195
                            MAX_REQUEST_BY_KERNEL_EXCESS_SIGS,
×
196
                            excess_sigs.len()
×
197
                        ),
×
198
                    });
×
199
                }
×
200
                let mut blocks = Vec::with_capacity(excess_sigs.len());
×
201
                for sig in excess_sigs {
×
202
                    let sig_hex = sig.get_signature().to_hex();
×
203
                    debug!(
×
204
                        target: LOG_TARGET,
×
205
                        "A peer has requested a block with kernel with sig {}", sig_hex
×
206
                    );
207
                    match self.blockchain_db.fetch_block_with_kernel(sig).await {
×
208
                        Ok(Some(block)) => blocks.push(block),
×
209
                        Ok(None) => warn!(
×
210
                            target: LOG_TARGET,
×
211
                            "Could not provide requested block containing kernel with sig {} to peer because not \
×
212
                             stored",
×
213
                            sig_hex
214
                        ),
215
                        Err(e) => warn!(
×
216
                            target: LOG_TARGET,
×
217
                            "Could not provide requested block containing kernel with sig {} to peer because: {}",
×
218
                            sig_hex,
×
219
                            e.to_string()
×
220
                        ),
221
                    }
222
                }
223
                Ok(NodeCommsResponse::HistoricalBlocks(blocks))
×
224
            },
225
            NodeCommsRequest::FetchBlocksByUtxos(commitments) => {
×
226
                if commitments.len() > MAX_REQUEST_BY_UTXO_HASHES {
×
227
                    return Err(CommsInterfaceError::InvalidRequest {
×
228
                        request: "FetchBlocksByUtxos",
×
229
                        details: format!(
×
230
                            "Exceeded maximum number of utxo hashes in request (max: {}, got:{})",
×
231
                            MAX_REQUEST_BY_UTXO_HASHES,
×
232
                            commitments.len()
×
233
                        ),
×
234
                    });
×
235
                }
×
236
                let mut blocks = Vec::with_capacity(commitments.len());
×
237
                for commitment in commitments {
×
238
                    let commitment_hex = commitment.to_hex();
×
239
                    debug!(
×
240
                        target: LOG_TARGET,
×
241
                        "A peer has requested a block with commitment {}", commitment_hex,
×
242
                    );
243
                    match self.blockchain_db.fetch_block_with_utxo(commitment).await {
×
244
                        Ok(Some(block)) => blocks.push(block),
×
245
                        Ok(None) => warn!(
×
246
                            target: LOG_TARGET,
×
247
                            "Could not provide requested block with commitment {} to peer because not stored",
×
248
                            commitment_hex,
249
                        ),
250
                        Err(e) => warn!(
×
251
                            target: LOG_TARGET,
×
252
                            "Could not provide requested block with commitment {} to peer because: {}",
×
253
                            commitment_hex,
×
254
                            e.to_string()
×
255
                        ),
256
                    }
257
                }
258
                Ok(NodeCommsResponse::HistoricalBlocks(blocks))
×
259
            },
260
            NodeCommsRequest::GetHeaderByHash(hash) => {
×
261
                let header = self.blockchain_db.fetch_chain_header_by_block_hash(hash).await?;
×
262
                Ok(NodeCommsResponse::BlockHeader(header))
×
263
            },
264
            NodeCommsRequest::GetBlockByHash(hash) => {
×
265
                let block = self.blockchain_db.fetch_block_by_hash(hash, false).await?;
×
266
                Ok(NodeCommsResponse::HistoricalBlock(Box::new(block)))
×
267
            },
268
            NodeCommsRequest::GetNewBlockTemplate(request) => {
3✔
269
                let best_block_header = self.blockchain_db.fetch_tip_header().await?;
3✔
270
                let mut last_seen_hash = self.mempool.get_last_seen_hash().await?;
3✔
271
                let mut is_mempool_synced = false;
3✔
272
                let start = Instant::now();
3✔
273
                // this will wait a max of 150ms by default before returning anyway with a potential broken template
274
                // We need to ensure the mempool has seen the latest base node height before we can be confident the
275
                // template is correct
276
                while !is_mempool_synced && start.elapsed().as_millis() < MAX_MEMPOOL_TIMEOUT.into() {
6✔
277
                    if best_block_header.hash() == &last_seen_hash || last_seen_hash == FixedHash::default() {
3✔
278
                        is_mempool_synced = true;
3✔
279
                    } else {
3✔
280
                        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
×
281
                        last_seen_hash = self.mempool.get_last_seen_hash().await?;
×
282
                    }
283
                }
284

285
                if !is_mempool_synced {
3✔
286
                    warn!(
×
287
                        target: LOG_TARGET,
×
288
                        "Mempool out of sync - last seen hash '{}' does not match the tip hash '{}'. This condition \
×
289
                         should auto correct with the next block template request",
×
290
                        last_seen_hash, best_block_header.hash()
×
291
                    );
292
                }
3✔
293
                let mut header = BlockHeader::from_previous(best_block_header.header());
3✔
294
                let constants = self.consensus_manager.consensus_constants(header.height);
3✔
295
                header.version = constants.blockchain_version();
3✔
296
                header.pow.pow_algo = request.algo;
3✔
297

3✔
298
                let constants_weight = constants.max_block_transaction_weight();
3✔
299
                let asking_weight = if request.max_weight > constants_weight || request.max_weight == 0 {
3✔
300
                    constants_weight
3✔
301
                } else {
302
                    request.max_weight
×
303
                };
304

305
                debug!(
3✔
306
                    target: LOG_TARGET,
×
307
                    "Fetching transactions with a maximum weight of {} for the template", asking_weight
×
308
                );
309
                let transactions = self
3✔
310
                    .mempool
3✔
311
                    .retrieve(asking_weight)
3✔
312
                    .await?
3✔
313
                    .into_iter()
3✔
314
                    .map(|tx| Arc::try_unwrap(tx).unwrap_or_else(|tx| (*tx).clone()))
8✔
315
                    .collect::<Vec<_>>();
3✔
316

3✔
317
                debug!(
3✔
318
                    target: LOG_TARGET,
×
319
                    "Adding {} transaction(s) to new block template",
×
320
                    transactions.len(),
×
321
                );
322

323
                let prev_hash = header.prev_hash;
3✔
324
                let height = header.height;
3✔
325

3✔
326
                let block = header.into_builder().with_transactions(transactions).build();
3✔
327
                let block_hash = block.hash();
3✔
328
                let block_template = NewBlockTemplate::from_block(
3✔
329
                    block,
3✔
330
                    self.get_target_difficulty_for_next_block(request.algo, constants, prev_hash)
3✔
331
                        .await?,
3✔
332
                    self.consensus_manager.get_block_reward_at(height),
3✔
333
                    is_mempool_synced,
3✔
334
                )?;
×
335

336
                debug!(target: LOG_TARGET,
3✔
337
                    "New block template requested and prepared at height: #{}, target difficulty: {}, block hash: `{}`, weight: {}, {}",
×
338
                    block_template.header.height,
×
339
                    block_template.target_difficulty,
×
340
                    block_hash.to_hex(),
×
341
                    block_template
×
342
                        .body
×
343
                        .calculate_weight(constants.transaction_weight_params())
×
344
                        .map_err(|e| CommsInterfaceError::InternalError(e.to_string()))?,
×
345
                    block_template.body.to_counts_string()
×
346
                );
347

348
                Ok(NodeCommsResponse::NewBlockTemplate(block_template))
3✔
349
            },
350
            NodeCommsRequest::GetNewBlock(block_template) => {
3✔
351
                let height = block_template.header.height;
3✔
352
                let target_difficulty = block_template.target_difficulty;
3✔
353
                let block = self.blockchain_db.prepare_new_block(block_template).await?;
3✔
354
                let constants = self.consensus_manager.consensus_constants(block.header.height);
3✔
355
                debug!(target: LOG_TARGET,
3✔
356
                    "Prepared block: #{}, target difficulty: {}, block hash: `{}`, weight: {}, {}",
×
357
                    height,
×
358
                    target_difficulty,
×
359
                    block.hash().to_hex(),
×
360
                    block
×
361
                        .body
×
362
                        .calculate_weight(constants.transaction_weight_params())
×
363
                        .map_err(|e| CommsInterfaceError::InternalError(e.to_string()))?,
×
364
                    block.body.to_counts_string()
×
365
                );
366
                Ok(NodeCommsResponse::NewBlock {
3✔
367
                    success: true,
3✔
368
                    error: None,
3✔
369
                    block: Some(block),
3✔
370
                })
3✔
371
            },
372
            NodeCommsRequest::GetBlockFromAllChains(hash) => {
3✔
373
                let block_hex = hash.to_hex();
3✔
374
                debug!(
3✔
375
                    target: LOG_TARGET,
×
376
                    "A peer has requested a block with hash {}", block_hex
×
377
                );
378

379
                #[allow(clippy::blocks_in_conditions)]
380
                let maybe_block = match self
3✔
381
                    .blockchain_db
3✔
382
                    .fetch_block_by_hash(hash, true)
3✔
383
                    .await
3✔
384
                    .unwrap_or_else(|e| {
3✔
385
                        warn!(
×
386
                            target: LOG_TARGET,
×
387
                            "Could not provide requested block {} to peer because: {}",
×
388
                            block_hex,
×
389
                            e.to_string()
×
390
                        );
391

392
                        None
×
393
                    }) {
3✔
UNCOV
394
                    None => self.blockchain_db.fetch_orphan(hash).await.map_or_else(
×
UNCOV
395
                        |e| {
×
UNCOV
396
                            warn!(
×
397
                                target: LOG_TARGET,
×
398
                                "Could not provide requested block {} to peer because: {}", block_hex, e,
×
399
                            );
400

UNCOV
401
                            None
×
UNCOV
402
                        },
×
UNCOV
403
                        Some,
×
UNCOV
404
                    ),
×
405
                    Some(block) => Some(block.into_block()),
3✔
406
                };
407

408
                Ok(NodeCommsResponse::Block(Box::new(maybe_block)))
3✔
409
            },
410
            NodeCommsRequest::FetchKernelByExcessSig(signature) => {
1✔
411
                let kernels = match self.blockchain_db.fetch_kernel_by_excess_sig(signature).await {
1✔
412
                    Ok(Some((kernel, _))) => vec![kernel],
1✔
413
                    Ok(None) => vec![],
×
414
                    Err(err) => {
×
415
                        error!(target: LOG_TARGET, "Could not fetch kernel {}", err.to_string());
×
416
                        return Err(err.into());
×
417
                    },
418
                };
419

420
                Ok(NodeCommsResponse::TransactionKernels(kernels))
1✔
421
            },
422
            NodeCommsRequest::FetchMempoolTransactionsByExcessSigs { excess_sigs } => {
3✔
423
                let (transactions, not_found) = self.mempool.retrieve_by_excess_sigs(excess_sigs).await?;
3✔
424
                Ok(NodeCommsResponse::FetchMempoolTransactionsByExcessSigsResponse(
3✔
425
                    FetchMempoolTransactionsResponse {
3✔
426
                        transactions,
3✔
427
                        not_found,
3✔
428
                    },
3✔
429
                ))
3✔
430
            },
431
            NodeCommsRequest::FetchValidatorNodesKeys { height } => {
×
432
                let active_validator_nodes = self.blockchain_db.fetch_active_validator_nodes(height).await?;
×
433
                Ok(NodeCommsResponse::FetchValidatorNodesKeysResponse(
×
434
                    active_validator_nodes,
×
435
                ))
×
436
            },
437
            NodeCommsRequest::GetShardKey { height, public_key } => {
×
438
                let shard_key = self.blockchain_db.get_shard_key(height, public_key).await?;
×
439
                Ok(NodeCommsResponse::GetShardKeyResponse(shard_key))
×
440
            },
441
            NodeCommsRequest::FetchTemplateRegistrations {
442
                start_height,
×
443
                end_height,
×
444
            } => {
445
                let template_registrations = self
×
446
                    .blockchain_db
×
447
                    .fetch_template_registrations(start_height..=end_height)
×
448
                    .await?;
×
449
                Ok(NodeCommsResponse::FetchTemplateRegistrationsResponse(
×
450
                    template_registrations,
×
451
                ))
×
452
            },
453
            NodeCommsRequest::FetchUnspentUtxosInBlock { block_hash } => {
×
454
                let utxos = self.blockchain_db.fetch_outputs_in_block(block_hash).await?;
×
455
                Ok(NodeCommsResponse::TransactionOutputs(utxos))
×
456
            },
457
        }
458
    }
100✔
459

460
    /// Handles a `NewBlock` message. Only a single `NewBlock` message can be handled at once to prevent extraneous
461
    /// requests for the full block.
462
    /// This may (asynchronously) block until the other request(s) complete or time out and so should typically be
463
    /// executed in a dedicated task.
464
    pub async fn handle_new_block_message(
20✔
465
        &mut self,
20✔
466
        new_block: NewBlock,
20✔
467
        source_peer: NodeId,
20✔
468
    ) -> Result<(), CommsInterfaceError> {
20✔
469
        let block_hash = new_block.header.hash();
20✔
470

20✔
471
        if self.blockchain_db.inner().is_add_block_disabled() {
20✔
472
            info!(
×
473
                target: LOG_TARGET,
×
474
                "Ignoring block message ({}) because add_block is locked",
×
475
                block_hash.to_hex()
×
476
            );
477
            return Ok(());
×
478
        }
20✔
479

20✔
480
        // Lets check if the block exists before we try and ask for a complete block
20✔
481
        if self.check_exists_and_not_bad_block(block_hash).await? {
20✔
482
            return Ok(());
×
483
        }
20✔
484

20✔
485
        // lets check that the difficulty at least matches 50% of the tip header. The max difficulty drop is 16%, thus
20✔
486
        // 50% is way more than that and in order to attack the node, you need 50% of the mining power. We cannot check
20✔
487
        // the target difficulty as orphan blocks dont have a target difficulty. All we care here is that bad
20✔
488
        // blocks are not free to make, and that they are more expensive to make then they are to validate. As
20✔
489
        // soon as a block can be linked to the main chain, a proper full proof of work check will
20✔
490
        // be done before any other validation.
20✔
491
        self.check_min_block_difficulty(&new_block).await?;
20✔
492

493
        {
494
            // we use a double lock to make sure we can only reconcile one unique block at a time. We may receive the
495
            // same block from multiple peer near simultaneously. We should only reconcile each unique block once.
496
            let read_lock = self.list_of_reconciling_blocks.read().await;
20✔
497
            if read_lock.contains(&block_hash) {
20✔
498
                debug!(
×
499
                    target: LOG_TARGET,
×
500
                    "Block with hash `{}` is already being reconciled",
×
501
                    block_hash.to_hex()
×
502
                );
503
                return Ok(());
×
504
            }
20✔
505
        }
506
        {
507
            let mut write_lock = self.list_of_reconciling_blocks.write().await;
20✔
508
            if self.check_exists_and_not_bad_block(block_hash).await? {
20✔
509
                return Ok(());
×
510
            }
20✔
511

20✔
512
            if !write_lock.insert(block_hash) {
20✔
513
                debug!(
×
514
                    target: LOG_TARGET,
×
515
                    "Block with hash `{}` is already being reconciled",
×
516
                    block_hash.to_hex()
×
517
                );
518
                return Ok(());
×
519
            }
20✔
520
        }
20✔
521

20✔
522
        debug!(
20✔
523
            target: LOG_TARGET,
×
524
            "Block with hash `{}` is unknown. Constructing block from known mempool transactions / requesting missing \
×
525
             transactions from peer '{}'.",
×
526
            block_hash.to_hex(),
×
527
            source_peer
528
        );
529

530
        let result = self.reconcile_and_add_block(source_peer.clone(), new_block).await;
20✔
531

532
        {
533
            let mut write_lock = self.list_of_reconciling_blocks.write().await;
20✔
534
            write_lock.remove(&block_hash);
20✔
535
        }
20✔
536
        result?;
20✔
537
        Ok(())
18✔
538
    }
20✔
539

540
    async fn check_min_block_difficulty(&self, new_block: &NewBlock) -> Result<(), CommsInterfaceError> {
20✔
541
        let constants = self.consensus_manager.consensus_constants(new_block.header.height);
20✔
542
        let gen_hash = *self.consensus_manager.get_genesis_block().hash();
20✔
543
        let mut min_difficulty = constants.min_pow_difficulty(new_block.header.pow.pow_algo);
20✔
544
        let mut header = self.blockchain_db.fetch_last_chain_header().await?;
20✔
545
        loop {
546
            if new_block.header.pow_algo() == header.header().pow_algo() {
20✔
547
                min_difficulty = max(
20✔
548
                    header
20✔
549
                        .accumulated_data()
20✔
550
                        .target_difficulty
20✔
551
                        .checked_div_u64(2)
20✔
552
                        .unwrap_or(min_difficulty),
20✔
553
                    min_difficulty,
20✔
554
                );
20✔
555
                break;
20✔
556
            }
×
557
            if header.height() == 0 {
×
558
                break;
×
559
            }
×
560
            // we have not reached gen block, and the pow algo does not match, so lets go further back
561
            header = self
×
562
                .blockchain_db
×
563
                .fetch_chain_header(header.height().saturating_sub(1))
×
564
                .await?;
×
565
        }
566
        let achieved = match new_block.header.pow_algo() {
20✔
567
            PowAlgorithm::RandomXM => monero_randomx_difficulty(
×
568
                &new_block.header,
×
569
                &self.randomx_factory,
×
570
                &gen_hash,
×
571
                &self.consensus_manager,
×
572
            )?,
×
573
            PowAlgorithm::Sha3x => sha3x_difficulty(&new_block.header)?,
20✔
574
            PowAlgorithm::RandomXT => {
575
                let vm_key = *self
×
576
                    .blockchain_db
×
577
                    .fetch_chain_header(tari_rx_vm_key_height(header.height()))
×
578
                    .await?
×
579
                    .hash();
×
580
                tari_randomx_difficulty(&new_block.header, &self.randomx_factory, &vm_key)?
×
581
            },
582
        };
583
        if achieved < min_difficulty {
20✔
584
            return Err(CommsInterfaceError::InvalidBlockHeader(
×
585
                BlockHeaderValidationError::ProofOfWorkError(PowError::AchievedDifficultyBelowMin),
×
586
            ));
×
587
        }
20✔
588
        Ok(())
20✔
589
    }
20✔
590

591
    async fn check_exists_and_not_bad_block(&self, block: FixedHash) -> Result<bool, CommsInterfaceError> {
40✔
592
        if self.blockchain_db.chain_header_or_orphan_exists(block).await? {
40✔
593
            debug!(
×
594
                target: LOG_TARGET,
×
595
                "Block with hash `{}` already stored",
×
596
                block.to_hex()
×
597
            );
598
            return Ok(true);
×
599
        }
40✔
600
        let (is_bad_block, reason) = self.blockchain_db.bad_block_exists(block).await?;
40✔
601
        if is_bad_block {
40✔
602
            debug!(
×
603
                target: LOG_TARGET,
×
604
                "Block with hash `{}` already validated as a bad block due to `{}`",
×
605
                block.to_hex(), reason
×
606
            );
607
            return Err(CommsInterfaceError::ChainStorageError(
×
608
                ChainStorageError::ValidationError {
×
609
                    source: ValidationError::BadBlockFound {
×
610
                        hash: block.to_hex(),
×
611
                        reason,
×
612
                    },
×
613
                },
×
614
            ));
×
615
        }
40✔
616
        Ok(false)
40✔
617
    }
40✔
618

619
    async fn reconcile_and_add_block(
20✔
620
        &mut self,
20✔
621
        source_peer: NodeId,
20✔
622
        new_block: NewBlock,
20✔
623
    ) -> Result<(), CommsInterfaceError> {
20✔
624
        let block = self.reconcile_block(source_peer.clone(), new_block).await?;
20✔
625
        self.handle_block(block, Some(source_peer)).await?;
20✔
626
        Ok(())
18✔
627
    }
20✔
628

629
    #[allow(clippy::too_many_lines)]
630
    async fn reconcile_block(
20✔
631
        &mut self,
20✔
632
        source_peer: NodeId,
20✔
633
        new_block: NewBlock,
20✔
634
    ) -> Result<Block, CommsInterfaceError> {
20✔
635
        let NewBlock {
20✔
636
            header,
20✔
637
            coinbase_kernels,
20✔
638
            coinbase_outputs,
20✔
639
            kernel_excess_sigs: excess_sigs,
20✔
640
        } = new_block;
20✔
641
        // If the block is empty, we dont have to ask for the block, as we already have the full block available
20✔
642
        // to us.
20✔
643
        if excess_sigs.is_empty() {
20✔
644
            let block = BlockBuilder::new(header.version)
17✔
645
                .add_outputs(coinbase_outputs)
17✔
646
                .add_kernels(coinbase_kernels)
17✔
647
                .with_header(header)
17✔
648
                .build();
17✔
649
            return Ok(block);
17✔
650
        }
3✔
651

3✔
652
        let block_hash = header.hash();
3✔
653
        // We check the current tip and orphan status of the block because we cannot guarantee that mempool state is
654
        // correct and the mmr root calculation is only valid if the block is building on the tip.
655
        let current_meta = self.blockchain_db.get_chain_metadata().await?;
3✔
656
        if header.prev_hash != *current_meta.best_block_hash() {
3✔
657
            debug!(
×
658
                target: LOG_TARGET,
×
659
                "Orphaned block #{}: ({}), current tip is: #{} ({}). We need to fetch the complete block from peer: \
×
660
                 ({})",
×
661
                header.height,
×
662
                block_hash.to_hex(),
×
663
                current_meta.best_block_height(),
×
664
                current_meta.best_block_hash().to_hex(),
×
665
                source_peer,
666
            );
667
            #[allow(clippy::cast_possible_wrap)]
668
            #[cfg(feature = "metrics")]
669
            metrics::compact_block_tx_misses(header.height).set(excess_sigs.len() as i64);
×
670
            let block = self.request_full_block_from_peer(source_peer, block_hash).await?;
×
671
            return Ok(block);
×
672
        }
3✔
673

674
        // We know that the block is neither and orphan or a coinbase, so lets ask our mempool for the transactions
675
        let (known_transactions, missing_excess_sigs) = self.mempool.retrieve_by_excess_sigs(excess_sigs).await?;
3✔
676
        let known_transactions = known_transactions.into_iter().map(|tx| (*tx).clone()).collect();
3✔
677

3✔
678
        #[allow(clippy::cast_possible_wrap)]
3✔
679
        #[cfg(feature = "metrics")]
3✔
680
        metrics::compact_block_tx_misses(header.height).set(missing_excess_sigs.len() as i64);
3✔
681

3✔
682
        let mut builder = BlockBuilder::new(header.version)
3✔
683
            .add_outputs(coinbase_outputs)
3✔
684
            .add_kernels(coinbase_kernels)
3✔
685
            .with_transactions(known_transactions);
3✔
686

3✔
687
        if missing_excess_sigs.is_empty() {
3✔
688
            debug!(
×
689
                target: LOG_TARGET,
×
690
                "All transactions for block #{} ({}) found in mempool",
×
691
                header.height,
×
692
                block_hash.to_hex()
×
693
            );
694
        } else {
695
            debug!(
3✔
696
                target: LOG_TARGET,
×
697
                "Requesting {} unknown transaction(s) from peer '{}'.",
×
698
                missing_excess_sigs.len(),
×
699
                source_peer
700
            );
701

702
            let FetchMempoolTransactionsResponse {
703
                transactions,
3✔
704
                not_found,
3✔
705
            } = self
3✔
706
                .outbound_nci
3✔
707
                .request_transactions_by_excess_sig(source_peer.clone(), missing_excess_sigs)
3✔
708
                .await?;
3✔
709

710
            // Add returned transactions to unconfirmed pool
711
            if !transactions.is_empty() {
3✔
712
                self.mempool.insert_all(transactions.clone()).await?;
×
713
            }
3✔
714

715
            if !not_found.is_empty() {
3✔
716
                warn!(
3✔
717
                    target: LOG_TARGET,
×
718
                    "Peer {} was not able to return all transactions for block #{} ({}). {} transaction(s) not found. \
×
719
                     Requesting full block.",
×
720
                    source_peer,
×
721
                    header.height,
×
722
                    block_hash.to_hex(),
×
723
                    not_found.len()
×
724
                );
725

726
                #[cfg(feature = "metrics")]
727
                metrics::compact_block_full_misses(header.height).inc();
3✔
728
                let block = self.request_full_block_from_peer(source_peer, block_hash).await?;
3✔
729
                return Ok(block);
3✔
730
            }
×
731

×
732
            builder = builder.with_transactions(
×
733
                transactions
×
734
                    .into_iter()
×
735
                    .map(|tx| Arc::try_unwrap(tx).unwrap_or_else(|tx| (*tx).clone()))
×
736
                    .collect(),
×
737
            );
×
738
        }
×
739

740
        // NB: Add the header last because `with_transactions` etc updates the current header, but we have the final one
741
        // already
742
        builder = builder.with_header(header.clone());
×
743
        let block = builder.build();
×
744

745
        // Perform a sanity check on the reconstructed block, if the MMR roots don't match then it's possible one or
746
        // more transactions in our mempool had the same excess/signature for a *different* transaction.
747
        // This is extremely unlikely, but still possible. In case of a mismatch, request the full block from the peer.
748
        let (block, mmr_roots) = match self.blockchain_db.calculate_mmr_roots(block).await {
×
749
            Err(_) => {
750
                let block = self.request_full_block_from_peer(source_peer, block_hash).await?;
×
751
                return Ok(block);
×
752
            },
753
            Ok(v) => v,
×
754
        };
755
        if let Err(e) = helpers::check_mmr_roots(&header, &mmr_roots) {
×
756
            warn!(
×
757
                target: LOG_TARGET,
×
758
                "Reconstructed block #{} ({}) failed MMR check validation!. Requesting full block. Error: {}",
×
759
                header.height,
×
760
                block_hash.to_hex(),
×
761
                e,
762
            );
763

764
            #[cfg(feature = "metrics")]
765
            metrics::compact_block_mmr_mismatch(header.height).inc();
×
766
            let block = self.request_full_block_from_peer(source_peer, block_hash).await?;
×
767
            return Ok(block);
×
768
        }
×
769

×
770
        Ok(block)
×
771
    }
20✔
772

773
    async fn request_full_block_from_peer(
3✔
774
        &mut self,
3✔
775
        source_peer: NodeId,
3✔
776
        block_hash: BlockHash,
3✔
777
    ) -> Result<Block, CommsInterfaceError> {
3✔
778
        match self
3✔
779
            .outbound_nci
3✔
780
            .request_blocks_by_hashes_from_peer(block_hash, Some(source_peer.clone()))
3✔
781
            .await
3✔
782
        {
783
            Ok(Some(block)) => Ok(block),
3✔
784
            Ok(None) => {
UNCOV
785
                debug!(
×
786
                    target: LOG_TARGET,
×
787
                    "Peer `{}` failed to return the block that was requested.", source_peer
×
788
                );
UNCOV
789
                Err(CommsInterfaceError::InvalidPeerResponse(format!(
×
UNCOV
790
                    "Invalid response from peer `{}`: Peer failed to provide the block that was propagated",
×
UNCOV
791
                    source_peer
×
UNCOV
792
                )))
×
793
            },
794
            Err(CommsInterfaceError::UnexpectedApiResponse) => {
795
                debug!(
×
796
                    target: LOG_TARGET,
×
797
                    "Peer `{}` sent unexpected API response.", source_peer
×
798
                );
799
                Err(CommsInterfaceError::UnexpectedApiResponse)
×
800
            },
801
            Err(e) => Err(e),
×
802
        }
803
    }
3✔
804

805
    /// Handle inbound blocks from remote nodes and local services.
806
    ///
807
    /// ## Arguments
808
    /// block - the block to store
809
    /// new_block_msg - propagate this new block message
810
    /// source_peer - the peer that sent this new block message, or None if the block was generated by a local miner
811
    pub async fn handle_block(
32✔
812
        &mut self,
32✔
813
        block: Block,
32✔
814
        source_peer: Option<NodeId>,
32✔
815
    ) -> Result<BlockHash, CommsInterfaceError> {
32✔
816
        let block_hash = block.hash();
32✔
817
        let block_height = block.header.height;
32✔
818

32✔
819
        info!(
32✔
820
            target: LOG_TARGET,
×
821
            "Block #{} ({}) received from {}",
×
822
            block_height,
×
823
            block_hash.to_hex(),
×
824
            source_peer
×
825
                .as_ref()
×
826
                .map(|p| format!("remote peer: {}", p))
×
827
                .unwrap_or_else(|| "local services".to_string())
×
828
        );
829
        debug!(target: LOG_TARGET, "Incoming block: {}", block);
32✔
830
        let timer = Instant::now();
32✔
831
        let block = self.hydrate_block(block).await?;
32✔
832

833
        let add_block_result = self.blockchain_db.add_block(block.clone()).await;
32✔
834
        // Create block event on block event stream
835
        match add_block_result {
2✔
836
            Ok(block_add_result) => {
30✔
837
                debug!(
30✔
838
                    target: LOG_TARGET,
×
839
                    "Block #{} ({}) added ({}) to blockchain in {:.2?}",
×
840
                    block_height,
×
841
                    block_hash.to_hex(),
×
842
                    block_add_result,
×
843
                    timer.elapsed()
×
844
                );
845

846
                let should_propagate = match &block_add_result {
30✔
847
                    BlockAddResult::Ok(_) => true,
30✔
848
                    BlockAddResult::BlockExists => false,
×
849
                    BlockAddResult::OrphanBlock => false,
×
850
                    BlockAddResult::ChainReorg { .. } => true,
×
851
                };
852

853
                #[cfg(feature = "metrics")]
854
                self.update_block_result_metrics(&block_add_result).await?;
30✔
855

856
                self.publish_block_event(BlockEvent::ValidBlockAdded(block.clone(), block_add_result));
30✔
857

30✔
858
                if should_propagate {
30✔
859
                    debug!(
30✔
860
                        target: LOG_TARGET,
×
861
                        "Propagate block ({}) to network.",
×
862
                        block_hash.to_hex()
×
863
                    );
864
                    let exclude_peers = source_peer.into_iter().collect();
30✔
865
                    let new_block_msg = NewBlock::from(&*block);
30✔
866
                    if let Err(e) = self.outbound_nci.propagate_block(new_block_msg, exclude_peers).await {
30✔
867
                        warn!(
×
868
                            target: LOG_TARGET,
×
869
                            "Failed to propagate block ({}) to network: {}.",
×
870
                            block_hash.to_hex(), e
×
871
                        );
872
                    }
30✔
873
                }
×
874
                Ok(block_hash)
30✔
875
            },
876

877
            Err(e @ ChainStorageError::ValidationError { .. }) => {
2✔
878
                #[cfg(feature = "metrics")]
2✔
879
                {
2✔
880
                    let block_hash = block.hash();
2✔
881
                    metrics::rejected_blocks(block.header.height, &block_hash).inc();
2✔
882
                }
2✔
883
                warn!(
2✔
884
                    target: LOG_TARGET,
×
885
                    "Peer {} sent an invalid block: {}",
×
886
                    source_peer
×
887
                        .as_ref()
×
888
                        .map(ToString::to_string)
×
889
                        .unwrap_or_else(|| "<local request>".to_string()),
×
890
                    e
891
                );
892
                self.publish_block_event(BlockEvent::AddBlockValidationFailed { block, source_peer });
2✔
893
                Err(e.into())
2✔
894
            },
895

896
            Err(e) => {
×
897
                #[cfg(feature = "metrics")]
×
898
                metrics::rejected_blocks(block.header.height, &block.hash()).inc();
×
899

×
900
                self.publish_block_event(BlockEvent::AddBlockErrored { block });
×
901
                Err(e.into())
×
902
            },
903
        }
904
    }
32✔
905

906
    async fn hydrate_block(&mut self, block: Block) -> Result<Arc<Block>, CommsInterfaceError> {
32✔
907
        let block_hash = block.hash();
32✔
908
        let block_height = block.header.height;
32✔
909
        if block.body.inputs().is_empty() {
32✔
910
            debug!(
24✔
911
                target: LOG_TARGET,
×
912
                "Block #{} ({}) contains no inputs so nothing to hydrate",
×
913
                block_height,
×
914
                block_hash.to_hex(),
×
915
            );
916
            return Ok(Arc::new(block));
24✔
917
        }
8✔
918

8✔
919
        let timer = Instant::now();
8✔
920
        let (header, mut inputs, outputs, kernels) = block.dissolve();
8✔
921

922
        let db = self.blockchain_db.inner().db_read_access()?;
8✔
923
        for input in &mut inputs {
17✔
924
            if !input.is_compact() {
9✔
925
                continue;
6✔
926
            }
3✔
927

928
            let output_mined_info =
3✔
929
                db.fetch_output(&input.output_hash())?
3✔
930
                    .ok_or_else(|| CommsInterfaceError::InvalidFullBlock {
3✔
931
                        hash: block_hash,
×
932
                        details: format!("Output {} to be spent does not exist in db", input.output_hash()),
×
933
                    })?;
3✔
934

935
            let rp_hash = match output_mined_info.output.proof {
3✔
936
                Some(proof) => proof.hash(),
3✔
937
                None => FixedHash::zero(),
×
938
            };
939
            input.add_output_data(
3✔
940
                output_mined_info.output.version,
3✔
941
                output_mined_info.output.features,
3✔
942
                output_mined_info.output.commitment,
3✔
943
                output_mined_info.output.script,
3✔
944
                output_mined_info.output.sender_offset_public_key,
3✔
945
                output_mined_info.output.covenant,
3✔
946
                output_mined_info.output.encrypted_data,
3✔
947
                output_mined_info.output.metadata_signature,
3✔
948
                rp_hash,
3✔
949
                output_mined_info.output.minimum_value_promise,
3✔
950
            );
3✔
951
        }
952
        debug!(
8✔
953
            target: LOG_TARGET,
×
954
            "Hydrated block #{} ({}) with {} input(s) in {:.2?}",
×
955
            block_height,
×
956
            block_hash.to_hex(),
×
957
            inputs.len(),
×
958
            timer.elapsed()
×
959
        );
960
        let block = Block::new(header, AggregateBody::new(inputs, outputs, kernels));
8✔
961
        Ok(Arc::new(block))
8✔
962
    }
32✔
963

964
    fn publish_block_event(&self, event: BlockEvent) {
32✔
965
        if let Err(event) = self.block_event_sender.send(Arc::new(event)) {
32✔
966
            debug!(target: LOG_TARGET, "No event subscribers. Event {} dropped.", event.0)
×
967
        }
32✔
968
    }
32✔
969

970
    #[cfg(feature = "metrics")]
971
    async fn update_block_result_metrics(&self, block_add_result: &BlockAddResult) -> Result<(), CommsInterfaceError> {
30✔
972
        fn update_target_difficulty(block: &ChainBlock) {
30✔
973
            match block.header().pow_algo() {
30✔
974
                PowAlgorithm::Sha3x => {
30✔
975
                    metrics::target_difficulty_sha()
30✔
976
                        .set(i64::try_from(block.accumulated_data().target_difficulty.as_u64()).unwrap_or(i64::MAX));
30✔
977
                },
30✔
978
                PowAlgorithm::RandomXM => {
×
979
                    metrics::target_difficulty_monero_randomx()
×
980
                        .set(i64::try_from(block.accumulated_data().target_difficulty.as_u64()).unwrap_or(i64::MAX));
×
981
                },
×
982
                PowAlgorithm::RandomXT => {
×
983
                    metrics::target_difficulty_tari_randomx()
×
984
                        .set(i64::try_from(block.accumulated_data().target_difficulty.as_u64()).unwrap_or(i64::MAX));
×
985
                },
×
986
            }
987
        }
30✔
988

989
        match block_add_result {
30✔
990
            BlockAddResult::Ok(ref block) => {
30✔
991
                update_target_difficulty(block);
30✔
992
                #[allow(clippy::cast_possible_wrap)]
30✔
993
                metrics::tip_height().set(block.height() as i64);
30✔
994
                let utxo_set_size = self.blockchain_db.utxo_count().await?;
30✔
995
                metrics::utxo_set_size().set(utxo_set_size.try_into().unwrap_or(i64::MAX));
30✔
996
            },
997
            BlockAddResult::ChainReorg { added, removed } => {
×
998
                if let Some(fork_height) = added.last().map(|b| b.height()) {
×
999
                    #[allow(clippy::cast_possible_wrap)]
1000
                    metrics::tip_height().set(fork_height as i64);
×
1001
                    metrics::reorg(fork_height, added.len(), removed.len()).inc();
×
1002

1003
                    let utxo_set_size = self.blockchain_db.utxo_count().await?;
×
1004
                    metrics::utxo_set_size().set(utxo_set_size.try_into().unwrap_or(i64::MAX));
×
1005
                }
×
1006
                for block in added {
×
1007
                    update_target_difficulty(block);
×
1008
                }
×
1009
            },
1010
            BlockAddResult::OrphanBlock => {
×
1011
                metrics::orphaned_blocks().inc();
×
1012
            },
×
1013
            _ => {},
×
1014
        }
1015
        Ok(())
30✔
1016
    }
30✔
1017

1018
    async fn get_target_difficulty_for_next_block(
3✔
1019
        &self,
3✔
1020
        pow_algo: PowAlgorithm,
3✔
1021
        constants: &ConsensusConstants,
3✔
1022
        current_block_hash: HashOutput,
3✔
1023
    ) -> Result<Difficulty, CommsInterfaceError> {
3✔
1024
        let target_difficulty = self
3✔
1025
            .blockchain_db
3✔
1026
            .fetch_target_difficulty_for_next_block(pow_algo, current_block_hash)
3✔
1027
            .await?;
3✔
1028

1029
        let target = target_difficulty.calculate(
3✔
1030
            constants.min_pow_difficulty(pow_algo),
3✔
1031
            constants.max_pow_difficulty(pow_algo),
3✔
1032
        );
3✔
1033
        trace!(target: LOG_TARGET, "Target difficulty {} for PoW {}", target, pow_algo);
3✔
1034
        Ok(target)
3✔
1035
    }
3✔
1036

1037
    pub async fn get_last_seen_hash(&self) -> Result<FixedHash, CommsInterfaceError> {
×
1038
        self.mempool.get_last_seen_hash().await.map_err(|e| e.into())
×
1039
    }
×
1040
}
1041

1042
impl<B> Clone for InboundNodeCommsHandlers<B> {
1043
    fn clone(&self) -> Self {
125✔
1044
        Self {
125✔
1045
            block_event_sender: self.block_event_sender.clone(),
125✔
1046
            blockchain_db: self.blockchain_db.clone(),
125✔
1047
            mempool: self.mempool.clone(),
125✔
1048
            consensus_manager: self.consensus_manager.clone(),
125✔
1049
            list_of_reconciling_blocks: self.list_of_reconciling_blocks.clone(),
125✔
1050
            outbound_nci: self.outbound_nci.clone(),
125✔
1051
            connectivity: self.connectivity.clone(),
125✔
1052
            randomx_factory: self.randomx_factory.clone(),
125✔
1053
        }
125✔
1054
    }
125✔
1055
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc