• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

input-output-hk / catalyst-libs / 16618685382

pending completion
16618685382

Pull #419

github

web-flow
Merge 3cdf83512 into 654b17f07
Pull Request #419: feat(docs): Form Element Documentation

10738 of 16725 relevant lines covered (64.2%)

2361.28 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/rust/cardano-chain-follower/src/chain_sync_live_chains.rs
1
//! Storage of each Live Chain per Blockchain.
2

3
use std::{
4
    ops::Bound,
5
    sync::{Arc, LazyLock, RwLock},
6
    time::Duration,
7
};
8

9
use cardano_blockchain_types::{Fork, MultiEraBlock, Network, Point, Slot};
10
use crossbeam_skiplist::SkipMap;
11
use rayon::prelude::*;
12
use strum::IntoEnumIterator;
13
use tracing::{debug, error};
14

15
use crate::{
16
    error::{Error, Result},
17
    mithril_snapshot_data::latest_mithril_snapshot_id,
18
    stats::{self},
19
};
20

21
/// Type we use to manage the Sync Task handle map.
22
type LiveChainBlockList = SkipMap<Point, MultiEraBlock>;
23

24
/// Because we have multi-entry relationships in the live-chain, it need to protect it
25
/// with a `read/write lock`. The underlying `SkipMap` is still capable of multiple
26
/// simultaneous reads from multiple threads which is the most common access.
27
#[derive(Clone)]
28
struct ProtectedLiveChainBlockList(Arc<RwLock<LiveChainBlockList>>);
29

30
/// Handle to the mithril sync thread. One for each Network ONLY.
31
static LIVE_CHAINS: LazyLock<SkipMap<Network, ProtectedLiveChainBlockList>> = LazyLock::new(|| {
×
32
    let map = SkipMap::new();
×
33
    for network in Network::iter() {
×
34
        map.insert(network, ProtectedLiveChainBlockList::new());
×
35
    }
×
36
    map
×
37
});
×
38

39
/// Latest TIP received from the Peer Node.
40
static PEER_TIP: LazyLock<SkipMap<Network, Point>> = LazyLock::new(|| {
×
41
    let map = SkipMap::new();
×
42
    for network in Network::iter() {
×
43
        map.insert(network, Point::UNKNOWN);
×
44
    }
×
45
    map
×
46
});
×
47

48
/// Initial slot age to probe.
49
const INITIAL_SLOT_PROBE_AGE: u64 = 40;
50

51
/// Set the last TIP received from the peer.
52
fn update_peer_tip(chain: Network, tip: Point) {
×
53
    PEER_TIP.insert(chain, tip);
×
54
}
×
55

56
/// Get the last TIP received from the peer.
57
/// If the peer tip doesn't exist, get the UNKNOWN point.
58
pub(crate) fn get_peer_tip(chain: Network) -> Point {
×
59
    (*PEER_TIP.get_or_insert(chain, Point::UNKNOWN).value()).clone()
×
60
}
×
61

62
/// Number of seconds to wait if we detect a `SyncReady` race condition.
63
const DATA_RACE_BACKOFF_SECS: u64 = 2;
64

65
impl ProtectedLiveChainBlockList {
66
    /// Create a new instance of the protected Live Chain skip map.
67
    fn new() -> Self {
×
68
        ProtectedLiveChainBlockList(Arc::new(RwLock::new(LiveChainBlockList::new())))
×
69
    }
×
70

71
    /// Get the `nth` Live block immediately following the specified block.
72
    /// If the search is NOT strict, then the point requested is never found.
73
    /// 0 = The Block immediately after the requested point.
74
    /// 1+ = The block that follows the block after the requested point
75
    /// negative = The block before the requested point.
76
    fn get_block(&self, point: &Point, mut advance: i64, strict: bool) -> Option<MultiEraBlock> {
×
77
        let chain = self.0.read().ok()?;
×
78

79
        let mut this = if strict && !point.is_fuzzy() {
×
80
            // Strict with concrete point.
81
            // We can call `get` only for non fuzzy points.
82
            // Because of the `Point` type equality strictly defined that fuzzy point and non fuzzy
83
            // point are always not equal, even if they are pointing to the same slot.
84
            chain.get(point)?
×
85
        } else if strict {
×
86
            // Strict but fuzzy point.
87
            // For the fuzzy point make a a fuzzy lookup forwards (including the point).
88
            // Because of the `Point` type equality strictly defined that fuzzy point and non fuzzy
89
            // point are always not equal, even if they are pointing to the same slot.
90
            let found = chain.lower_bound(Bound::Included(point))?;
×
91
            // make sure the found point is what we wanted.
92
            if point.slot_or_default() != found.value().point().slot_or_default() {
×
93
                return None;
×
94
            }
×
95
            found
×
96
        } else if advance < 0 {
×
97
            // This is a fuzzy lookup backwards.
98
            advance = advance.saturating_add(1);
×
99
            chain.upper_bound(Bound::Excluded(point))?
×
100
        } else {
101
            // This is a fuzzy lookup forwards.
102
            chain.lower_bound(Bound::Excluded(point))?
×
103
        };
104

105
        // If we are stepping backwards, look backwards.
106
        while advance < 0 {
×
107
            advance = advance.saturating_add(1);
×
108
            this = this.prev()?;
×
109
        }
110

111
        // If we are stepping forwards, look forwards.
112
        while advance > 0 {
×
113
            advance = advance.saturating_sub(1);
×
114
            this = this.next()?;
×
115
        }
116

117
        // Return the block we found.
118
        Some(this.value().clone())
×
119
    }
×
120

121
    /// Get the earliest block in the Live Chain
122
    fn get_earliest_block(&self) -> Option<MultiEraBlock> {
×
123
        let chain = self.0.read().ok()?;
×
124
        let entry = chain.front()?;
×
125
        Some(entry.value().clone())
×
126
    }
×
127

128
    /// Get the point of the first known block in the Live Chain.
129
    fn get_first_live_point(live_chain: &LiveChainBlockList) -> Result<Point> {
×
130
        let Some(check_first_live_entry) = live_chain.front() else {
×
131
            return Err(Error::LiveSync(
×
132
                "First Block not found in the Live Chain during Backfill".to_string(),
×
133
            ));
×
134
        };
135
        let check_first_live_block = check_first_live_entry.value();
×
136
        Ok(check_first_live_block.point())
×
137
    }
×
138

139
    /// Get the point of the last known block in the Live Chain.
140
    fn get_last_live_point(live_chain: &LiveChainBlockList) -> Point {
×
141
        let Some(check_last_live_entry) = live_chain.back() else {
×
142
            // Its not an error if we can't get a latest block because the chain is empty,
143
            // so report that we don't know...
144
            return Point::UNKNOWN;
×
145
        };
146
        let check_last_live_block = check_last_live_entry.value();
×
147
        check_last_live_block.point()
×
148
    }
×
149

150
    /// Atomic Backfill the chain with the given blocks
151
    /// Blocks must be sorted in order from earliest to latest.
152
    /// Final block MUST seamlessly link to the current head of the live chain. (Enforced)
153
    /// First block MUST seamlessly link to the Tip of the Immutable chain. (Enforced)
154
    /// The blocks MUST be contiguous and properly self referential.
155
    /// Note: This last condition is NOT enforced, but must be met or block chain
156
    /// iteration will fail.
157
    fn backfill(&self, chain: Network, blocks: &[MultiEraBlock]) -> Result<()> {
×
158
        let live_chain = self.0.write().map_err(|_| Error::Internal)?;
×
159

160
        // Make sure our first live block == the last mithril tip.
161
        // Ensures we are properly connected to the Mithril Chain.
162
        let first_block_point = blocks
×
163
            .first()
×
164
            .ok_or(Error::LiveSync("No first block for backfill.".to_string()))?
×
165
            .point();
×
166
        let latest_mithril_tip = latest_mithril_snapshot_id(chain).tip();
×
167
        if !first_block_point.strict_eq(&latest_mithril_tip) {
×
168
            return Err(Error::LiveSync(format!(
×
169
                "First Block of Live BackFill {first_block_point} MUST be last block of Mithril Snapshot {latest_mithril_tip}."
×
170
            )));
×
171
        }
×
172

173
        // Get the current Oldest block in the live chain.
174
        let check_first_live_point = Self::get_first_live_point(&live_chain)?;
×
175

176
        let last_backfill_block = blocks
×
177
            .last()
×
178
            .ok_or(Error::LiveSync("No last block for backfill.".to_string()))?
×
179
            .clone();
×
180
        let last_backfill_point = last_backfill_block.point();
×
181

×
182
        // Make sure the backfill will properly connect the partial Live chain to the Mithril
×
183
        // chain.
×
184
        if !last_backfill_point.strict_eq(&check_first_live_point) {
×
185
            return Err(Error::LiveSync(format!(
×
186
                "Last Block of Live BackFill {last_backfill_point} MUST be First block of current Live Chain {check_first_live_point}."
×
187
            )));
×
188
        }
×
189

×
190
        // SkipMap is thread-safe, so we can parallel iterate inserting the blocks.
×
191
        blocks.par_iter().for_each(|block| {
×
192
            let _unused = live_chain.insert(block.point(), block.clone());
×
193
        });
×
194

×
195
        // End of Successful backfill == Reaching TIP, because live sync is always at tip.
×
196
        stats::tip_reached(chain);
×
197

×
198
        Ok(())
×
199
    }
×
200

201
    /// Check if the given point is strictly in the live-chain. This means the slot and
202
    /// block hash MUST be present.
203
    fn strict_block_lookup(live_chain: &LiveChainBlockList, point: &Point) -> bool {
×
204
        if let Some(found_block) = live_chain.get(point) {
×
205
            return found_block.value().point().strict_eq(point);
×
206
        }
×
207
        false
×
208
    }
×
209

210
    /// Adds a block to the tip of the live chain, and automatically purges blocks that
211
    /// would be lost due to rollback. Will REFUSE to add a block which does NOT have
212
    /// a proper "previous" point defined.
213
    fn add_block_to_tip(
×
214
        &self, chain: Network, block: MultiEraBlock, fork_count: &mut Fork, tip: Point,
×
215
    ) -> Result<()> {
×
216
        let live_chain = self.0.write().map_err(|_| Error::Internal)?;
×
217

218
        // Check if the insert is the next logical block in the live chain.
219
        // Most likely case, so check it first.
220
        let previous_point = block.previous();
×
221
        let last_live_point = Self::get_last_live_point(&live_chain);
×
222
        if !previous_point.strict_eq(&last_live_point) {
×
223
            // Detected a rollback, so increase the fork count.
224
            fork_count.incr();
×
225
            let mut rollback_size: u64 = 0;
×
226

×
227
            // We are NOT contiguous, so check if we can become contiguous with a rollback.
×
228
            debug!("Detected non-contiguous block, rolling back. Fork: {fork_count}");
×
229

230
            // First check if the previous is >= the earliest block in the live chain.
231
            // This is because when we start syncing we could rollback earlier than our
232
            // previously known earliest block.
233
            // Also check the point we want to link to actually exists.  If either are not true,
234
            // Then we could be trying to roll back to an earlier block than our earliest known
235
            // block.
236
            let check_first_live_point = Self::get_first_live_point(&live_chain)?;
×
237
            if (block.point() < check_first_live_point)
×
238
                || !Self::strict_block_lookup(&live_chain, &previous_point)
×
239
            {
240
                debug!("Rollback before live chain, clear it.");
×
241
                // We rolled back earlier than the current live chain.
242
                // Purge the entire chain, and just add this one block as the new tip.
243
                rollback_size = live_chain.len() as u64;
×
244
                live_chain.clear();
×
245
            } else {
246
                // If we get here we know for a fact that the previous block exists.
247
                // Remove the latest live block, and keep removing it until we re-establish
248
                // connection with the chain sequence.
249
                // We search backwards because a rollback is more likely in the newest blocks than
250
                // the oldest.
251
                while let Some(popped) = live_chain.pop_back() {
×
252
                    rollback_size = rollback_size.saturating_add(1);
×
253
                    if previous_point.strict_eq(&popped.value().previous()) {
×
254
                        // We are now contiguous, so stop purging.
255
                        break;
×
256
                    }
×
257
                }
258
            }
259

260
            // Record a rollback statistic (We record the ACTUAL size our rollback effected our
261
            // internal live chain, not what the node thinks.)
262
            stats::rollback::rollback(
×
263
                chain,
×
264
                stats::rollback::RollbackType::LiveChain,
×
265
                rollback_size,
×
266
            );
×
267
        }
×
268

269
        let head_slot = block.point().slot_or_default();
×
270

×
271
        // Add the block to the tip of the Live Chain.
×
272
        let _unused = live_chain.insert(block.point(), block);
×
273

×
274
        let tip_slot = tip.slot_or_default();
×
275
        update_peer_tip(chain, tip);
×
276

×
277
        // Record the new live chain stats after we add a new block.
×
278
        stats::new_live_block(chain, live_chain.len() as u64, head_slot, tip_slot);
×
279

×
280
        Ok(())
×
281
    }
×
282

283
    /// Checks if the point exists in the live chain.
284
    /// If it does, removes all block preceding it (but not the point itself).
285
    /// Will refuse to purge if the point is not the TIP of the mithril chain.
286
    fn purge(&self, chain: Network, point: &Point) -> Result<()> {
×
287
        // Make sure our first live block == the last mithril tip.
×
288
        // Ensures we are properly connected to the Mithril Chain.
×
289
        // But don't check this if we are about to purge the entire chain.
×
290
        // We do this before we bother locking the chain for update.
×
291
        if *point != Point::TIP {
×
292
            let latest_mithril_tip = latest_mithril_snapshot_id(chain).tip();
×
293
            if !point.strict_eq(&latest_mithril_tip) {
×
294
                return Err(Error::LiveSync(format!(
×
295
                "First Block of Live Purge {point} MUST be last block of Mithril Snapshot {latest_mithril_tip}."
×
296
            )));
×
297
            }
×
298
        }
×
299

300
        let live_chain = self.0.write().map_err(|_| Error::Internal)?;
×
301

302
        // Special Case.
303
        // If the Purge Point == TIP_POINT, then we purge the entire chain.
304
        if *point == Point::TIP {
×
305
            live_chain.clear();
×
306
        } else {
×
307
            // If the block we want to purge upto must be in the chain.
308
            let Some(purge_start_block_entry) = live_chain.get(point) else {
×
309
                return Err(Error::LiveSync(format!(
×
310
                    "The block to purge to {point} is not in the Live chain."
×
311
                )));
×
312
            };
313

314
            // Make sure the block that IS present, is the actual block, by strict equality.
315
            if !purge_start_block_entry.value().point().strict_eq(point) {
×
316
                return Err(Error::LiveSync(format!(
×
317
                "The block to purge to {point} slot is in the live chain, but its hashes do not match."
×
318
            )));
×
319
            }
×
320

321
            // Purge every block prior to the purge point.
322
            while let Some(previous_block) = purge_start_block_entry.prev() {
×
323
                let _unused = previous_block.remove();
×
324
            }
×
325

326
            // Try and FORCE the skip map to reclaim its memory
327
            crossbeam_epoch::pin().flush();
×
328
            crossbeam_epoch::pin().flush();
×
329
        }
330

331
        Ok(())
×
332
    }
×
333

334
    /// Get the current number of blocks in the live chain
335
    fn len(&self) -> usize {
×
336
        if let Ok(chain) = self.0.read() {
×
337
            chain.len()
×
338
        } else {
339
            0
×
340
        }
341
    }
×
342

343
    /// Get chain sync intersection points for communicating with peer node.
344
    fn get_intersect_points(&self) -> Vec<pallas::network::miniprotocols::Point> {
×
345
        let mut intersect_points = Vec::new();
×
346

347
        let Ok(chain) = self.0.read() else {
×
348
            return intersect_points;
×
349
        };
350

351
        // Add the top 4 blocks as the first points to intersect.
352
        let Some(entry) = chain.back() else {
×
353
            return intersect_points;
×
354
        };
355
        intersect_points.push(entry.value().point().into());
×
356
        for _ in 0..2 {
×
357
            if let Some(entry) = entry.prev() {
×
358
                intersect_points.push(entry.value().point().into());
×
359
            } else {
×
360
                return intersect_points;
×
361
            };
362
        }
363

364
        // Now find points based on an every increasing Slot age.
365
        let mut slot_age: Slot = INITIAL_SLOT_PROBE_AGE.into();
×
366
        let reference_slot = entry.value().point().slot_or_default();
×
367
        let mut previous_point = entry.value().point();
×
368

369
        // Loop until we exhaust probe slots, OR we would step past genesis.
370
        // It is ok because slot implement saturating subtraction.
371
        #[allow(clippy::arithmetic_side_effects)]
372
        while slot_age < reference_slot {
×
373
            let ref_point = Point::fuzzy(reference_slot - slot_age);
×
374
            let Some(entry) = chain.lower_bound(Bound::Included(&ref_point)) else {
×
375
                break;
×
376
            };
377
            if entry.value().point() == previous_point {
×
378
                break;
×
379
            };
×
380
            previous_point = entry.value().point();
×
381
            intersect_points.push(previous_point.clone().into());
×
382
            slot_age *= 2;
×
383
        }
384

385
        intersect_points
×
386
    }
×
387

388
    /// Given a known point on the live chain, and a fork count, find the best block we
389
    /// have.
390
    fn find_best_fork_block(
×
391
        &self, point: &Point, previous_point: &Point, fork: Fork,
×
392
    ) -> Option<(MultiEraBlock, u64)> {
×
393
        let mut rollback_depth: u64 = 0;
×
394
        let Ok(chain) = self.0.read() else {
×
395
            return None;
×
396
        };
397

398
        // Get the block <= the current slot.
399
        let ref_point = Point::fuzzy(point.slot_or_default());
×
400
        let mut entry = chain.upper_bound(Bound::Included(&ref_point))?;
×
401

402
        let mut this_block = entry.value().clone();
×
403
        // Check if the previous block is the one we previously knew, and if so, thats the best
×
404
        // block.
×
405
        if this_block.point().strict_eq(previous_point) {
×
406
            return Some((this_block, rollback_depth));
×
407
        }
×
408

409
        // Search backwards for a fork smaller than or equal to the one we know.
410
        while this_block.fork() > fork {
×
411
            rollback_depth = rollback_depth.saturating_add(1);
×
412
            entry = entry.prev()?;
×
413

414
            this_block = entry.value().clone();
×
415
        }
416

417
        Some((this_block, rollback_depth))
×
418
    }
×
419

420
    /// Get the point of the block at the head of the live chain.
421
    fn get_live_head_point(&self) -> Option<Point> {
×
422
        let live_chain = self.0.read().map_err(|_| Error::Internal).ok()?;
×
423

424
        let head_point = Self::get_last_live_point(&live_chain);
×
425
        if head_point == Point::UNKNOWN {
×
426
            return None;
×
427
        }
×
428

×
429
        Some(head_point)
×
430
    }
×
431
}
432

433
/// Get the `LiveChainBlockList` for a particular `Network`.
434
fn get_live_chain(chain: Network) -> ProtectedLiveChainBlockList {
×
435
    // Get a reference to our live chain storage.
436
    // This SHOULD always exist, because its initialized exhaustively.
437
    // If this FAILS, Recreate a blank chain, but log an error as its a serious UNRECOVERABLE
438
    // BUG.
439
    let entry = if let Some(entry) = LIVE_CHAINS.get(&chain) {
×
440
        entry
×
441
    } else {
442
        error!(
×
443
            chain = chain.to_string(),
×
444
            "Internal Error: Chain Sync Failed to find chain in LIVE_CHAINS"
×
445
        );
446

447
        // Try and correct the error.
448
        LIVE_CHAINS.insert(chain, ProtectedLiveChainBlockList::new());
×
449

×
450
        // This should NOT fail, because we just inserted it, its catastrophic failure if it does.
×
451
        #[allow(clippy::expect_used)]
×
452
        LIVE_CHAINS
×
453
            .get(&chain)
×
454
            .expect("Internal Error: Chain Sync Failed to find chain in LIVE_CHAINS")
×
455
    };
456

457
    let value = entry.value();
×
458
    value.clone()
×
459
}
×
460

461
/// Get the head `Point` currently in the live chain.
462
pub(crate) fn get_live_head_point(chain: Network) -> Option<Point> {
×
463
    let live_chain = get_live_chain(chain);
×
464
    live_chain.get_live_head_point()
×
465
}
×
466

467
/// Get the Live block relative to the specified point.
468
/// The starting block must exist if the search is strict.
469
pub(crate) fn get_live_block(
×
470
    chain: Network, point: &Point, advance: i64, strict: bool,
×
471
) -> Option<MultiEraBlock> {
×
472
    let live_chain = get_live_chain(chain);
×
473
    live_chain.get_block(point, advance, strict)
×
474
}
×
475

476
/// Get the fill to point for a chain.
477
///
478
/// Returns the Point of the block we are filling up-to, and it's fork count.
479
///
480
/// Note: It MAY change between calling this function and actually backfilling.
481
/// This is expected and normal behavior.
482
pub(crate) async fn get_fill_to_point(chain: Network) -> (Point, u64) {
×
483
    let live_chain = get_live_chain(chain);
×
484

485
    loop {
486
        if let Some(earliest_block) = live_chain.get_earliest_block() {
×
487
            return (earliest_block.point(), earliest_block.fork().into());
×
488
        }
×
489
        // Nothing in the Live chain to sync to, so wait until there is.
×
490
        tokio::time::sleep(Duration::from_secs(DATA_RACE_BACKOFF_SECS)).await;
×
491
    }
492
}
×
493

494
/// Insert a block into the live chain (in-order).
495
/// Can ONLY be used to add a new tip block to the live chain.
496
/// `rollback_count` should be set to 1 on the very first connection, after that,
497
/// it is maintained by this function, and MUST not be modified elsewhere.
498
pub(crate) fn live_chain_add_block_to_tip(
×
499
    chain: Network, block: MultiEraBlock, fork: &mut Fork, tip: Point,
×
500
) -> Result<()> {
×
501
    let live_chain = get_live_chain(chain);
×
502
    live_chain.add_block_to_tip(chain, block, fork, tip)
×
503
}
×
504

505
/// Backfill the live chain with the block set provided.
506
pub(crate) fn live_chain_backfill(chain: Network, blocks: &[MultiEraBlock]) -> Result<()> {
×
507
    let live_chain = get_live_chain(chain);
×
508
    live_chain.backfill(chain, blocks)
×
509
}
×
510

511
/// Get the length of the live chain.
512
/// Probably used by debug code only, so its ok if this is not use.
513
pub(crate) fn live_chain_length(chain: Network) -> usize {
×
514
    let live_chain = get_live_chain(chain);
×
515
    live_chain.len()
×
516
}
×
517

518
/// On an immutable update, purge the live-chain up to the new immutable tip.
519
/// Will error if the point is not in the Live chain.
520
pub(crate) fn purge_live_chain(chain: Network, point: &Point) -> Result<()> {
×
521
    let live_chain = get_live_chain(chain);
×
522
    live_chain.purge(chain, point)
×
523
}
×
524

525
/// Get intersection points to try and find best point to connect to the node on
526
/// reconnect.
527
pub(crate) fn get_intersect_points(chain: Network) -> Vec<pallas::network::miniprotocols::Point> {
×
528
    let live_chain = get_live_chain(chain);
×
529
    live_chain.get_intersect_points()
×
530
}
×
531

532
/// Find best block from a fork relative to a point.
533
pub(crate) fn find_best_fork_block(
×
534
    chain: Network, point: &Point, previous_point: &Point, fork: Fork,
×
535
) -> Option<(MultiEraBlock, u64)> {
×
536
    let live_chain = get_live_chain(chain);
×
537
    live_chain.find_best_fork_block(point, previous_point, fork)
×
538
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc