• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tari-project / tari / 15484013348

06 Jun 2025 06:08AM UTC coverage: 72.04% (+0.3%) from 71.789%
15484013348

push

github

web-flow
fix(network-discovery): add back idle event handling (#7194)

Description
---
fix(network-discovery): add back idle event handling

Motivation and Context
---
network discovery was spinning at full speed because the Idle event
transition was removed. Network logs would rotate < 1s.

```
[comms::dht::network_discovery::ready] [Thread:123190302967360] DEBUG NetworkDiscovery::Ready: Peer list contains 759 entries. Current discovery rounds in this cycle: 0.
[comms::dht::network_discovery::ready] [Thread:123190302967360] DEBUG First active round (current_num_rounds = 0) and num_peers (759) >= min_desired_peers (16). Forcing DHT discovery.
 [comms::dht::network_discovery::ready] [Thread:123190302967360] DEBUG Selecting 5 random peers for discovery (last round info available: false, new peers in last round: false).
[comms::dht::network_discovery::ready] [Thread:123190302967360] DEBUG No suitable peers found for the forced DHT discovery round (current_num_rounds = 0 path). Transitioning to Idle.
 [comms::dht::network_discovery] [Thread:123190302967360] DEBUG Transition triggered from current state `Ready` by event `Idle`
comms::dht::network_discovery] [Thread:123190302967360] DEBUG No state transition for event `Idle`. The current state is `Ready`

...instant rinse and repeat...
```

This PR adds the idle state transition back. Note that idle will idle
for 30 minutes so should only transition when all work is done and we
have downloaded sufficient peers.

How Has This Been Tested?
---
Manually - console wallet with empty peer db

What process can a PR reviewer use to test or verify this change?
---

<!-- Checklist -->
<!-- 1. Is the title of your PR in the form that would make nice release
notes? The title, excluding the conventional commit
tag, will be included exactly as is in the CHANGELOG, so please think
about it carefully. -->


Breaking Changes
---

- [x] None
- [ ] Requires data directory on base node to be deleted
- [ ] Requires hard fork
- [ ] Other... (continued)

3 of 4 new or added lines in 2 files covered. (75.0%)

412 existing lines in 30 files now uncovered.

80882 of 112274 relevant lines covered (72.04%)

242938.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.08
/base_layer/core/src/base_node/sync/header_sync/validator.rs
1
//  Copyright 2020, The Tari Project
2
//
3
//  Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
4
//  following conditions are met:
5
//
6
//  1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
7
//  disclaimer.
8
//
9
//  2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
10
//  following disclaimer in the documentation and/or other materials provided with the distribution.
11
//
12
//  3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
13
//  products derived from this software without specific prior written permission.
14
//
15
//  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
16
//  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
17
//  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
18
//  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
19
//  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
20
//  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
21
//  USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22
use std::cmp::Ordering;
23

24
use log::*;
25
use primitive_types::U512;
26
use tari_common_types::types::{FixedHash, HashOutput};
27
use tari_utilities::{epoch_time::EpochTime, hex::Hex};
28

29
use crate::{
30
    base_node::sync::{header_sync::HEADER_SYNC_INITIAL_MAX_HEADERS, BlockHeaderSyncError},
31
    blocks::{BlockHeader, BlockHeaderAccumulatedData, BlockHeaderValidationError, ChainHeader},
32
    chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend, ChainStorageError, TargetDifficulties},
33
    common::rolling_vec::RollingVec,
34
    consensus::ConsensusManager,
35
    proof_of_work::{randomx_factory::RandomXFactory, PowAlgorithm},
36
    validation::{
37
        header::HeaderFullValidator,
38
        tari_rx_vm_key_height,
39
        DifficultyCalculator,
40
        HeaderChainLinkedValidator,
41
        ValidationError,
42
        TARI_RX_VM_KEY_BLOCK_SWAP,
43
    },
44
};
45

46
const LOG_TARGET: &str = "c::bn::header_sync";
47

48
#[derive(Clone)]
49
pub struct BlockHeaderSyncValidator<B> {
50
    db: AsyncBlockchainDb<B>,
51
    state: Option<State>,
52
    consensus_rules: ConsensusManager,
53
    validator: HeaderFullValidator,
54
}
55

56
#[derive(Debug, Clone)]
57
struct State {
58
    current_height: u64,
59
    timestamps: RollingVec<EpochTime>,
60
    target_difficulties: TargetDifficulties,
61
    previous_accum: BlockHeaderAccumulatedData,
62
    previous_header: BlockHeader,
63
    valid_headers: Vec<ChainHeader>,
64
    vm_key: Vec<(u64, FixedHash)>,
65
}
66

67
impl<B: BlockchainBackend + 'static> BlockHeaderSyncValidator<B> {
68
    pub fn new(db: AsyncBlockchainDb<B>, consensus_rules: ConsensusManager, randomx_factory: RandomXFactory) -> Self {
14✔
69
        let difficulty_calculator = DifficultyCalculator::new(consensus_rules.clone(), randomx_factory);
14✔
70
        let validator = HeaderFullValidator::new(consensus_rules.clone(), difficulty_calculator);
14✔
71
        Self {
14✔
72
            db,
14✔
73
            state: None,
14✔
74
            consensus_rules,
14✔
75
            validator,
14✔
76
        }
14✔
77
    }
14✔
78

79
    #[allow(clippy::ptr_arg)]
80
    pub async fn initialize_state(&mut self, start_hash: &HashOutput) -> Result<(), BlockHeaderSyncError> {
4✔
81
        let start_header = self
4✔
82
            .db
4✔
83
            .fetch_header_by_block_hash(*start_hash)
4✔
84
            .await?
4✔
85
            .ok_or_else(|| BlockHeaderSyncError::StartHashNotFound(start_hash.to_hex()))?;
4✔
86
        let timestamps = self.db.fetch_block_timestamps(*start_hash).await?;
3✔
87
        let target_difficulties = self.db.fetch_target_difficulties_for_next_block(*start_hash).await?;
3✔
88
        let previous_accum = self
3✔
89
            .db
3✔
90
            .fetch_header_accumulated_data(*start_hash)
3✔
91
            .await?
3✔
92
            .ok_or_else(|| ChainStorageError::ValueNotFound {
3✔
93
                entity: "BlockHeaderAccumulatedData",
×
94
                field: "hash",
×
95
                value: start_hash.to_hex(),
×
96
            })?;
3✔
97
        debug!(
3✔
98
            target: LOG_TARGET,
×
99
            "Setting header validator state ({} timestamp(s), target difficulties: {} SHA3, {} Monero RandomX, {} Tari RandomX)",
×
100
            timestamps.len(),
×
101
            target_difficulties.get(PowAlgorithm::Sha3x).map(|t| t.len()).unwrap_or(0),
×
102
            target_difficulties.get(PowAlgorithm::RandomXM).map(|t| t.len()).unwrap_or(0),
×
103
            target_difficulties.get(PowAlgorithm::RandomXT).map(|t| t.len()).unwrap_or(0),
×
104
        );
105

106
        let gen_hash = *self.consensus_rules.get_genesis_block().hash();
3✔
107
        self.state = Some(State {
3✔
108
            current_height: start_header.height,
3✔
109
            timestamps,
3✔
110
            target_difficulties,
3✔
111
            previous_accum,
3✔
112
            previous_header: start_header,
3✔
113
            // One large allocation is usually better even if it is not always used.
3✔
114
            valid_headers: Vec::with_capacity(HEADER_SYNC_INITIAL_MAX_HEADERS),
3✔
115
            vm_key: vec![(0, gen_hash)],
3✔
116
        });
3✔
117

3✔
118
        Ok(())
3✔
119
    }
4✔
120

UNCOV
121
    pub fn current_valid_chain_tip_header(&self) -> Option<&ChainHeader> {
×
UNCOV
122
        self.valid_headers().last()
×
UNCOV
123
    }
×
124

125
    pub async fn validate(&mut self, header: BlockHeader) -> Result<U512, BlockHeaderSyncError> {
3✔
126
        let constants = self.consensus_rules.consensus_constants(header.height);
3✔
127
        if constants.effective_from_height() == header.height {
3✔
128
            if let Some(&mut ref mut mut_state) = self.state.as_mut() {
×
129
                // We need to update the target difficulties for the new algorithm
130
                mut_state
×
131
                    .target_difficulties
×
132
                    .update_algos(&self.consensus_rules, header.height)
×
133
                    .map_err(BlockHeaderSyncError::TargetDifficultiesError)?;
×
134
            }
×
135
        }
3✔
136

137
        let state = self.state();
3✔
138

139
        let target_difficulty = state
3✔
140
            .target_difficulties
3✔
141
            .get(header.pow_algo())
3✔
142
            .map_err(BlockHeaderSyncError::TargetDifficultiesError)?
3✔
143
            .calculate(
3✔
144
                constants.min_pow_difficulty(header.pow_algo()),
3✔
145
                constants.max_pow_difficulty(header.pow_algo()),
3✔
146
            );
3✔
147

148
        let result = {
3✔
149
            let txn = self.db.inner().db_read_access()?;
3✔
150
            let vm_key_height = tari_rx_vm_key_height(header.height);
3✔
151
            let vm_key = match txn.fetch_chain_header_by_height(vm_key_height) {
3✔
152
                Ok(header) => *header.hash(),
3✔
153
                Err(_) => {
154
                    // header not found, lets search our cached headers
155
                    let mut vm_key = None;
×
156
                    for (height, hash) in &state.vm_key {
×
157
                        if *height == vm_key_height {
×
158
                            vm_key = Some(*hash);
×
159
                            break;
×
160
                        }
×
161
                    }
162
                    vm_key.ok_or(ChainStorageError::UnexpectedResult(
×
163
                        "Could not find header in database or cache".to_string(),
×
164
                    ))?
×
165
                },
166
            };
167
            self.validator.validate(
3✔
168
                &*txn,
3✔
169
                &header,
3✔
170
                &state.previous_header,
3✔
171
                &state.timestamps,
3✔
172
                Some(target_difficulty),
3✔
173
                vm_key,
3✔
174
            )
3✔
175
        };
176
        let achieved_target = match result {
2✔
177
            Ok(achieved_target) => achieved_target,
2✔
178
            // future timelimit validation can succeed at a later time. As the block is not yet valid, we discard it
179
            // for now and ban the peer, but wont blacklist the block.
180
            Err(e @ ValidationError::BlockHeaderError(BlockHeaderValidationError::InvalidTimestampFutureTimeLimit)) => {
×
181
                return Err(e.into())
×
182
            },
183
            // We dont want to mark a block as bad for internal failures
184
            Err(
185
                e @ ValidationError::FatalStorageError(_) |
×
186
                e @ ValidationError::IncorrectNumberOfTimestampsProvided { .. },
×
187
            ) => return Err(e.into()),
×
188
            // We dont have to mark the block twice
189
            Err(e @ ValidationError::BadBlockFound { .. }) => return Err(e.into()),
×
190

191
            Err(e) => {
1✔
192
                let mut txn = self.db.write_transaction();
1✔
193
                txn.insert_bad_block(header.hash(), header.height, e.to_string());
1✔
194
                txn.commit().await?;
1✔
195
                return Err(e.into());
1✔
196
            },
197
        };
198

199
        // Header is valid, add this header onto the validation state for the next round
200
        // Mutable borrow done later in the function to allow multiple immutable borrows before this line. This has
201
        // nothing to do with locking or concurrency.
202
        let state = self.state_mut();
2✔
203
        state.previous_header = header.clone();
2✔
204

2✔
205
        // Ensure that timestamps are inserted in sorted order
2✔
206
        let maybe_index = state.timestamps.iter().position(|ts| *ts >= header.timestamp());
5✔
207
        match maybe_index {
2✔
208
            Some(pos) => {
×
209
                state.timestamps.insert(pos, header.timestamp());
×
210
            },
×
211
            None => {
2✔
212
                state.timestamps.push(header.timestamp());
2✔
213
            },
2✔
214
        }
215

216
        state.current_height = header.height;
2✔
217
        // Add a "more recent" datapoint onto the target difficulty
2✔
218
        state
2✔
219
            .target_difficulties
2✔
220
            .add_back(&header, target_difficulty)
2✔
221
            .map_err(ChainStorageError::UnexpectedResult)?;
2✔
222

223
        let accumulated_data = BlockHeaderAccumulatedData::builder(&state.previous_accum)
2✔
224
            .with_hash(header.hash())
2✔
225
            .with_achieved_target_difficulty(achieved_target)
2✔
226
            .with_total_kernel_offset(header.total_kernel_offset.clone())
2✔
227
            .build()?;
2✔
228

229
        let total_accumulated_difficulty = accumulated_data.total_accumulated_difficulty;
2✔
230
        // NOTE: accumulated_data constructed from header so they are guaranteed to correspond
2✔
231
        let chain_header = ChainHeader::try_construct(header, accumulated_data).unwrap();
2✔
232

2✔
233
        state.previous_accum = chain_header.accumulated_data().clone();
2✔
234
        if chain_header.header().height % TARI_RX_VM_KEY_BLOCK_SWAP == 0 {
2✔
235
            // we need to save the hash of this header and height
×
236
            state.vm_key.push((chain_header.header().height, *chain_header.hash()));
×
237
        }
2✔
238
        state.valid_headers.push(chain_header);
2✔
239

2✔
240
        Ok(total_accumulated_difficulty)
2✔
241
    }
3✔
242

243
    /// Drains and returns all the headers that were validated.
244
    ///
245
    /// ## Panics
246
    ///
247
    /// Panics if initialize_state was not called prior to calling this function
UNCOV
248
    pub fn take_valid_headers(&mut self) -> Vec<ChainHeader> {
×
UNCOV
249
        self.state_mut().valid_headers.drain(..).collect::<Vec<_>>()
×
UNCOV
250
    }
×
251

252
    /// Returns a slice containing the current valid headers
253
    ///
254
    /// ## Panics
255
    ///
256
    /// Panics if initialize_state was not called prior to calling this function
257
    pub fn valid_headers(&self) -> &[ChainHeader] {
4✔
258
        &self.state().valid_headers
4✔
259
    }
4✔
260

UNCOV
261
    pub fn compare_chains(&self, our_header: &ChainHeader, their_header: &ChainHeader) -> Ordering {
×
UNCOV
262
        debug!(
×
263
            target: LOG_TARGET,
×
264
            "Comparing PoW on remote header #{} and local header #{}",
×
265
            their_header.height(),
×
266
            our_header.height()
×
267
        );
268

UNCOV
269
        self.consensus_rules
×
UNCOV
270
            .chain_strength_comparer()
×
UNCOV
271
            .compare(our_header, their_header)
×
UNCOV
272
    }
×
273

274
    fn state_mut(&mut self) -> &mut State {
2✔
275
        self.state
2✔
276
            .as_mut()
2✔
277
            .expect("state_mut() called before state was initialized (using the `begin` method)")
2✔
278
    }
2✔
279

280
    fn state(&self) -> &State {
8✔
281
        self.state
8✔
282
            .as_ref()
8✔
283
            .expect("state() called before state was initialized (using the `begin` method)")
8✔
284
    }
8✔
285
}
286

287
#[cfg(test)]
288
mod test {
289
    use tari_common::configuration::Network;
290
    use tari_test_utils::unpack_enum;
291

292
    use super::*;
293
    use crate::{
294
        blocks::BlockHeader,
295
        proof_of_work::PowAlgorithm,
296
        test_helpers::blockchain::{create_new_blockchain, TempDatabase},
297
    };
298

299
    fn setup() -> (
4✔
300
        BlockHeaderSyncValidator<TempDatabase>,
4✔
301
        AsyncBlockchainDb<TempDatabase>,
4✔
302
        ConsensusManager,
4✔
303
    ) {
4✔
304
        let rules = ConsensusManager::builder(Network::LocalNet).build().unwrap();
4✔
305
        let randomx_factory = RandomXFactory::default();
4✔
306
        let db = create_new_blockchain();
4✔
307
        (
4✔
308
            BlockHeaderSyncValidator::new(db.clone().into(), rules.clone(), randomx_factory),
4✔
309
            db.into(),
4✔
310
            rules,
4✔
311
        )
4✔
312
    }
4✔
313

314
    async fn setup_with_headers(
3✔
315
        n: usize,
3✔
316
    ) -> (
3✔
317
        BlockHeaderSyncValidator<TempDatabase>,
3✔
318
        AsyncBlockchainDb<TempDatabase>,
3✔
319
        ChainHeader,
3✔
320
    ) {
3✔
321
        let (validator, db, cm) = setup();
3✔
322
        let mut tip = db.fetch_tip_header().await.unwrap();
3✔
323
        for _ in 0..n {
3✔
324
            let mut header = BlockHeader::from_previous(tip.header());
14✔
325
            header.version = cm.consensus_constants(header.height).blockchain_version();
14✔
326
            // Needed to have unique keys for the blockchain db mmr count indexes (MDB_KEY_EXIST error)
14✔
327
            header.kernel_mmr_size += 1;
14✔
328
            header.output_smt_size += 1;
14✔
329
            let acc_data = BlockHeaderAccumulatedData {
14✔
330
                hash: header.hash(),
14✔
331
                ..Default::default()
14✔
332
            };
14✔
333

14✔
334
            let chain_header = ChainHeader::try_construct(header.clone(), acc_data.clone()).unwrap();
14✔
335
            db.insert_valid_headers(vec![chain_header.clone()]).await.unwrap();
14✔
336
            tip = chain_header;
14✔
337
        }
338

339
        (validator, db, tip)
3✔
340
    }
3✔
341

342
    mod initialize_state {
343
        use std::convert::TryInto;
344

345
        use super::*;
346

347
        #[tokio::test]
348
        async fn it_initializes_state_to_given_header() {
1✔
349
            let (mut validator, _, tip) = setup_with_headers(1).await;
1✔
350
            validator.initialize_state(&tip.header().hash()).await.unwrap();
1✔
351
            let state = validator.state();
1✔
352
            assert!(state.valid_headers.is_empty());
1✔
353
            assert_eq!(state.target_difficulties.get(PowAlgorithm::Sha3x).unwrap().len(), 2);
1✔
354
            assert!(state
1✔
355
                .target_difficulties
1✔
356
                .get(PowAlgorithm::RandomXM)
1✔
357
                .unwrap()
1✔
358
                .is_empty());
1✔
359
            assert_eq!(state.timestamps.len(), 2);
1✔
360
            assert_eq!(state.current_height, 1);
1✔
361
        }
1✔
362

363
        #[tokio::test]
364
        async fn it_errors_if_hash_does_not_exist() {
1✔
365
            let (mut validator, _, _cm) = setup();
1✔
366
            let start_hash = vec![0; 32];
1✔
367
            let err = validator
1✔
368
                .initialize_state(&start_hash.clone().try_into().unwrap())
1✔
369
                .await
1✔
370
                .unwrap_err();
1✔
371
            unpack_enum!(BlockHeaderSyncError::StartHashNotFound(hash) = err);
1✔
372
            assert_eq!(hash, start_hash.to_hex());
1✔
373
        }
1✔
374
    }
375

376
    mod validate {
377
        use super::*;
378

379
        #[tokio::test]
380
        async fn it_passes_if_headers_are_valid() {
1✔
381
            let (mut validator, _, tip) = setup_with_headers(1).await;
1✔
382
            validator.initialize_state(tip.hash()).await.unwrap();
1✔
383
            assert!(validator.valid_headers().is_empty());
1✔
384
            let mut next = BlockHeader::from_previous(tip.header());
1✔
385
            next.timestamp = tip.header().timestamp.checked_add(EpochTime::from(1)).unwrap();
1✔
386
            validator.validate(next).await.unwrap();
1✔
387
            assert_eq!(validator.valid_headers().len(), 1);
1✔
388
            let tip = validator.valid_headers().last().cloned().unwrap();
1✔
389
            let mut next = BlockHeader::from_previous(tip.header());
1✔
390
            next.timestamp = tip.header().timestamp.checked_add(EpochTime::from(1)).unwrap();
1✔
391
            validator.validate(next).await.unwrap();
1✔
392
            assert_eq!(validator.valid_headers().len(), 2);
1✔
393
        }
1✔
394

395
        #[tokio::test]
396
        async fn it_fails_if_height_is_not_serial() {
1✔
397
            let (mut validator, _, tip) = setup_with_headers(12).await;
1✔
398
            validator.initialize_state(tip.hash()).await.unwrap();
1✔
399
            let mut next = BlockHeader::from_previous(tip.header());
1✔
400
            next.height = 14;
1✔
401
            let err = validator.validate(next).await.unwrap_err();
1✔
402
            unpack_enum!(BlockHeaderSyncError::ValidationFailed(val_err) = err);
1✔
403
            unpack_enum!(ValidationError::BlockHeaderError(header_err) = val_err);
1✔
404
            unpack_enum!(BlockHeaderValidationError::InvalidHeight { actual, expected } = header_err);
1✔
405
            assert_eq!(actual, 14);
1✔
406
            assert_eq!(expected, 13);
1✔
407
        }
1✔
408
    }
409
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc