• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

input-output-hk / catalyst-libs / 14057701605

25 Mar 2025 10:45AM UTC coverage: 65.037% (-0.06%) from 65.094%
14057701605

Pull #245

github

web-flow
Merge 37e68e8f9 into 6c1f9eb1c
Pull Request #245: fix(rust/cardano-chain-follower): fix immutable roll forward events

31 of 102 new or added lines in 3 files covered. (30.39%)

7 existing lines in 1 file now uncovered.

10203 of 15688 relevant lines covered (65.04%)

2824.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/rust/cardano-chain-follower/src/chain_sync_live_chains.rs
1
//! Storage of each Live Chain per Blockchain.
2

3
use std::{
4
    ops::Bound,
5
    sync::{Arc, LazyLock, RwLock},
6
    time::Duration,
7
};
8

9
use cardano_blockchain_types::{Fork, MultiEraBlock, Network, Point, Slot};
10
use crossbeam_skiplist::SkipMap;
11
use rayon::prelude::*;
12
use strum::IntoEnumIterator;
13
use tracing::{debug, error};
14

15
use crate::{
16
    error::{Error, Result},
17
    mithril_snapshot_data::latest_mithril_snapshot_id,
18
    stats::{self},
19
};
20

21
/// Type we use to manage the Sync Task handle map.
22
type LiveChainBlockList = SkipMap<Point, MultiEraBlock>;
23

24
/// Because we have multi-entry relationships in the live-chain, it need to protect it
25
/// with a `read/write lock`. The underlying `SkipMap` is still capable of multiple
26
/// simultaneous reads from multiple threads which is the most common access.
27
#[derive(Clone)]
28
struct ProtectedLiveChainBlockList(Arc<RwLock<LiveChainBlockList>>);
29

30
/// Handle to the mithril sync thread. One for each Network ONLY.
31
static LIVE_CHAINS: LazyLock<SkipMap<Network, ProtectedLiveChainBlockList>> = LazyLock::new(|| {
×
32
    let map = SkipMap::new();
×
33
    for network in Network::iter() {
×
34
        map.insert(network, ProtectedLiveChainBlockList::new());
×
35
    }
×
36
    map
×
37
});
×
38

39
/// Latest TIP received from the Peer Node.
40
static PEER_TIP: LazyLock<SkipMap<Network, Point>> = LazyLock::new(|| {
×
41
    let map = SkipMap::new();
×
42
    for network in Network::iter() {
×
43
        map.insert(network, Point::UNKNOWN);
×
44
    }
×
45
    map
×
46
});
×
47

48
/// Initial slot age to probe.
49
const INITIAL_SLOT_PROBE_AGE: u64 = 40;
50

51
/// Set the last TIP received from the peer.
52
fn update_peer_tip(chain: Network, tip: Point) {
×
53
    PEER_TIP.insert(chain, tip);
×
54
}
×
55

56
/// Get the last TIP received from the peer.
57
/// If the peer tip doesn't exist, get the UNKNOWN point.
58
pub(crate) fn get_peer_tip(chain: Network) -> Point {
×
59
    (*PEER_TIP.get_or_insert(chain, Point::UNKNOWN).value()).clone()
×
60
}
×
61

62
/// Number of seconds to wait if we detect a `SyncReady` race condition.
63
const DATA_RACE_BACKOFF_SECS: u64 = 2;
64

65
impl ProtectedLiveChainBlockList {
66
    /// Create a new instance of the protected Live Chain skip map.
67
    fn new() -> Self {
×
68
        ProtectedLiveChainBlockList(Arc::new(RwLock::new(LiveChainBlockList::new())))
×
69
    }
×
70

71
    /// Get the `nth` Live block immediately following the specified block.
72
    /// If the search is NOT strict, then the point requested is never found.
73
    /// 0 = The Block immediately after the requested point.
74
    /// 1+ = The block that follows the block after the requested point
75
    /// negative = The block before the requested point.
76
    fn get_block(&self, point: &Point, mut advance: i64, strict: bool) -> Option<MultiEraBlock> {
×
77
        let chain = self.0.read().ok()?;
×
78

79
        let mut this = if strict {
×
80
            chain.get(point)?
×
81
        } else if advance < 0 {
×
82
            // This is a fuzzy lookup backwards.
83
            advance = advance.saturating_add(1);
×
84
            chain.upper_bound(Bound::Excluded(point))?
×
85
        } else {
86
            // This is a fuzzy lookup forwards.
87
            chain.lower_bound(Bound::Excluded(point))?
×
88
        };
89

90
        // If we are stepping backwards, look backwards.
91
        while advance < 0 {
×
92
            advance = advance.saturating_add(1);
×
93
            this = this.prev()?;
×
94
        }
95

96
        // If we are stepping forwards, look forwards.
97
        while advance > 0 {
×
98
            advance = advance.saturating_sub(1);
×
99
            this = this.next()?;
×
100
        }
101

102
        // Return the block we found.
103
        Some(this.value().clone())
×
104
    }
×
105

106
    /// Get the earliest block in the Live Chain
107
    fn get_earliest_block(&self) -> Option<MultiEraBlock> {
×
108
        let chain = self.0.read().ok()?;
×
109
        let entry = chain.front()?;
×
110
        Some(entry.value().clone())
×
111
    }
×
112

113
    /// Get the point of the first known block in the Live Chain.
114
    fn get_first_live_point(live_chain: &LiveChainBlockList) -> Result<Point> {
×
115
        let Some(check_first_live_entry) = live_chain.front() else {
×
116
            return Err(Error::LiveSync(
×
117
                "First Block not found in the Live Chain during Backfill".to_string(),
×
118
            ));
×
119
        };
120
        let check_first_live_block = check_first_live_entry.value();
×
121
        Ok(check_first_live_block.point())
×
122
    }
×
123

124
    /// Get the point of the last known block in the Live Chain.
125
    fn get_last_live_point(live_chain: &LiveChainBlockList) -> Point {
×
126
        let Some(check_last_live_entry) = live_chain.back() else {
×
127
            // Its not an error if we can't get a latest block because the chain is empty,
128
            // so report that we don't know...
129
            return Point::UNKNOWN;
×
130
        };
131
        let check_last_live_block = check_last_live_entry.value();
×
132
        check_last_live_block.point()
×
133
    }
×
134

135
    /// Atomic Backfill the chain with the given blocks
136
    /// Blocks must be sorted in order from earliest to latest.
137
    /// Final block MUST seamlessly link to the current head of the live chain. (Enforced)
138
    /// First block MUST seamlessly link to the Tip of the Immutable chain. (Enforced)
139
    /// The blocks MUST be contiguous and properly self referential.
140
    /// Note: This last condition is NOT enforced, but must be met or block chain
141
    /// iteration will fail.
142
    fn backfill(&self, chain: Network, blocks: &[MultiEraBlock]) -> Result<()> {
×
143
        let live_chain = self.0.write().map_err(|_| Error::Internal)?;
×
144

145
        // Make sure our first live block == the last mithril tip.
146
        // Ensures we are properly connected to the Mithril Chain.
147
        let first_block_point = blocks
×
148
            .first()
×
149
            .ok_or(Error::LiveSync("No first block for backfill.".to_string()))?
×
150
            .point();
×
151
        let latest_mithril_tip = latest_mithril_snapshot_id(chain).tip();
×
152
        if !first_block_point.strict_eq(&latest_mithril_tip) {
×
153
            return Err(Error::LiveSync(format!(
×
154
                "First Block of Live BackFill {first_block_point} MUST be last block of Mithril Snapshot {latest_mithril_tip}."
×
155
            )));
×
156
        }
×
157

158
        // Get the current Oldest block in the live chain.
159
        let check_first_live_point = Self::get_first_live_point(&live_chain)?;
×
160

161
        let last_backfill_block = blocks
×
162
            .last()
×
163
            .ok_or(Error::LiveSync("No last block for backfill.".to_string()))?
×
164
            .clone();
×
165
        let last_backfill_point = last_backfill_block.point();
×
166

×
167
        // Make sure the backfill will properly connect the partial Live chain to the Mithril
×
168
        // chain.
×
169
        if !last_backfill_point.strict_eq(&check_first_live_point) {
×
170
            return Err(Error::LiveSync(format!(
×
171
                "Last Block of Live BackFill {last_backfill_point} MUST be First block of current Live Chain {check_first_live_point}."
×
172
            )));
×
173
        }
×
174

×
175
        // SkipMap is thread-safe, so we can parallel iterate inserting the blocks.
×
176
        blocks.par_iter().for_each(|block| {
×
177
            let _unused = live_chain.insert(block.point(), block.clone());
×
178
        });
×
179

×
180
        // End of Successful backfill == Reaching TIP, because live sync is always at tip.
×
181
        stats::tip_reached(chain);
×
182

×
183
        Ok(())
×
184
    }
×
185

186
    /// Check if the given point is strictly in the live-chain. This means the slot and
187
    /// block hash MUST be present.
188
    fn strict_block_lookup(live_chain: &LiveChainBlockList, point: &Point) -> bool {
×
189
        if let Some(found_block) = live_chain.get(point) {
×
190
            return found_block.value().point().strict_eq(point);
×
191
        }
×
192
        false
×
193
    }
×
194

195
    /// Adds a block to the tip of the live chain, and automatically purges blocks that
196
    /// would be lost due to rollback. Will REFUSE to add a block which does NOT have
197
    /// a proper "previous" point defined.
198
    fn add_block_to_tip(
×
199
        &self, chain: Network, block: MultiEraBlock, fork_count: &mut Fork, tip: Point,
×
200
    ) -> Result<()> {
×
201
        let live_chain = self.0.write().map_err(|_| Error::Internal)?;
×
202

203
        // Check if the insert is the next logical block in the live chain.
204
        // Most likely case, so check it first.
205
        let previous_point = block.previous();
×
206
        let last_live_point = Self::get_last_live_point(&live_chain);
×
207
        if !previous_point.strict_eq(&last_live_point) {
×
208
            // Detected a rollback, so increase the fork count.
209
            fork_count.incr();
×
210
            let mut rollback_size: u64 = 0;
×
211

×
212
            // We are NOT contiguous, so check if we can become contiguous with a rollback.
×
213
            debug!("Detected non-contiguous block, rolling back. Fork: {fork_count}");
×
214

215
            // First check if the previous is >= the earliest block in the live chain.
216
            // This is because when we start syncing we could rollback earlier than our
217
            // previously known earliest block.
218
            // Also check the point we want to link to actually exists.  If either are not true,
219
            // Then we could be trying to roll back to an earlier block than our earliest known
220
            // block.
221
            let check_first_live_point = Self::get_first_live_point(&live_chain)?;
×
222
            if (block.point() < check_first_live_point)
×
223
                || !Self::strict_block_lookup(&live_chain, &previous_point)
×
224
            {
225
                debug!("Rollback before live chain, clear it.");
×
226
                // We rolled back earlier than the current live chain.
227
                // Purge the entire chain, and just add this one block as the new tip.
228
                rollback_size = live_chain.len() as u64;
×
229
                live_chain.clear();
×
230
            } else {
231
                // If we get here we know for a fact that the previous block exists.
232
                // Remove the latest live block, and keep removing it until we re-establish
233
                // connection with the chain sequence.
234
                // We search backwards because a rollback is more likely in the newest blocks than
235
                // the oldest.
236
                while let Some(popped) = live_chain.pop_back() {
×
237
                    rollback_size = rollback_size.saturating_add(1);
×
238
                    if previous_point.strict_eq(&popped.value().previous()) {
×
239
                        // We are now contiguous, so stop purging.
240
                        break;
×
241
                    }
×
242
                }
243
            }
244

245
            // Record a rollback statistic (We record the ACTUAL size our rollback effected our
246
            // internal live chain, not what the node thinks.)
247
            stats::rollback::rollback(
×
248
                chain,
×
249
                stats::rollback::RollbackType::LiveChain,
×
250
                rollback_size,
×
251
            );
×
252
        }
×
253

254
        let head_slot = block.point().slot_or_default();
×
255

×
256
        // Add the block to the tip of the Live Chain.
×
257
        let _unused = live_chain.insert(block.point(), block);
×
258

×
259
        let tip_slot = tip.slot_or_default();
×
260
        update_peer_tip(chain, tip);
×
261

×
262
        // Record the new live chain stats after we add a new block.
×
263
        stats::new_live_block(chain, live_chain.len() as u64, head_slot, tip_slot);
×
264

×
265
        Ok(())
×
266
    }
×
267

268
    /// Checks if the point exists in the live chain.
269
    /// If it does, removes all block preceding it (but not the point itself).
270
    /// Will refuse to purge if the point is not the TIP of the mithril chain.
271
    fn purge(&self, chain: Network, point: &Point) -> Result<()> {
×
272
        // Make sure our first live block == the last mithril tip.
×
273
        // Ensures we are properly connected to the Mithril Chain.
×
274
        // But don't check this if we are about to purge the entire chain.
×
275
        // We do this before we bother locking the chain for update.
×
276
        if *point != Point::TIP {
×
277
            let latest_mithril_tip = latest_mithril_snapshot_id(chain).tip();
×
278
            if !point.strict_eq(&latest_mithril_tip) {
×
279
                return Err(Error::LiveSync(format!(
×
280
                "First Block of Live Purge {point} MUST be last block of Mithril Snapshot {latest_mithril_tip}."
×
281
            )));
×
282
            }
×
283
        }
×
284

285
        let live_chain = self.0.write().map_err(|_| Error::Internal)?;
×
286

287
        // Special Case.
288
        // If the Purge Point == TIP_POINT, then we purge the entire chain.
289
        if *point == Point::TIP {
×
290
            live_chain.clear();
×
291
        } else {
×
292
            // If the block we want to purge upto must be in the chain.
293
            let Some(purge_start_block_entry) = live_chain.get(point) else {
×
294
                return Err(Error::LiveSync(format!(
×
295
                    "The block to purge to {point} is not in the Live chain."
×
296
                )));
×
297
            };
298

299
            // Make sure the block that IS present, is the actual block, by strict equality.
300
            if !purge_start_block_entry.value().point().strict_eq(point) {
×
301
                return Err(Error::LiveSync(format!(
×
302
                "The block to purge to {point} slot is in the live chain, but its hashes do not match."
×
303
            )));
×
304
            }
×
305

306
            // Purge every block prior to the purge point.
307
            while let Some(previous_block) = purge_start_block_entry.prev() {
×
308
                let _unused = previous_block.remove();
×
309
            }
×
310

311
            // Try and FORCE the skip map to reclaim its memory
312
            crossbeam_epoch::pin().flush();
×
313
            crossbeam_epoch::pin().flush();
×
314
        }
315

316
        Ok(())
×
317
    }
×
318

319
    /// Get the current number of blocks in the live chain
320
    fn len(&self) -> usize {
×
321
        if let Ok(chain) = self.0.read() {
×
322
            chain.len()
×
323
        } else {
324
            0
×
325
        }
326
    }
×
327

328
    /// Get chain sync intersection points for communicating with peer node.
329
    fn get_intersect_points(&self) -> Vec<pallas::network::miniprotocols::Point> {
×
330
        let mut intersect_points = Vec::new();
×
331

332
        let Ok(chain) = self.0.read() else {
×
333
            return intersect_points;
×
334
        };
335

336
        // Add the top 4 blocks as the first points to intersect.
337
        let Some(entry) = chain.back() else {
×
338
            return intersect_points;
×
339
        };
340
        intersect_points.push(entry.value().point().into());
×
341
        for _ in 0..2 {
×
342
            if let Some(entry) = entry.prev() {
×
343
                intersect_points.push(entry.value().point().into());
×
344
            } else {
×
345
                return intersect_points;
×
346
            };
347
        }
348

349
        // Now find points based on an every increasing Slot age.
350
        let mut slot_age: Slot = INITIAL_SLOT_PROBE_AGE.into();
×
351
        let reference_slot = entry.value().point().slot_or_default();
×
352
        let mut previous_point = entry.value().point();
×
353

354
        // Loop until we exhaust probe slots, OR we would step past genesis.
355
        // It is ok because slot implement saturating subtraction.
356
        #[allow(clippy::arithmetic_side_effects)]
357
        while slot_age < reference_slot {
×
358
            let ref_point = Point::fuzzy(reference_slot - slot_age);
×
359
            let Some(entry) = chain.lower_bound(Bound::Included(&ref_point)) else {
×
360
                break;
×
361
            };
362
            if entry.value().point() == previous_point {
×
363
                break;
×
364
            };
×
365
            previous_point = entry.value().point();
×
366
            intersect_points.push(previous_point.clone().into());
×
367
            slot_age *= 2;
×
368
        }
369

370
        intersect_points
×
371
    }
×
372

373
    /// Given a known point on the live chain, and a fork count, find the best block we
374
    /// have.
375
    fn find_best_fork_block(
×
376
        &self, point: &Point, previous_point: &Point, fork: Fork,
×
377
    ) -> Option<(MultiEraBlock, u64)> {
×
378
        let mut rollback_depth: u64 = 0;
×
379
        let Ok(chain) = self.0.read() else {
×
380
            return None;
×
381
        };
382

383
        // Get the block <= the current slot.
384
        let ref_point = Point::fuzzy(point.slot_or_default());
×
385
        let mut entry = chain.upper_bound(Bound::Included(&ref_point))?;
×
386

387
        let mut this_block = entry.value().clone();
×
388
        // Check if the previous block is the one we previously knew, and if so, thats the best
×
389
        // block.
×
390
        if this_block.point().strict_eq(previous_point) {
×
391
            return Some((this_block, rollback_depth));
×
392
        }
×
393

394
        // Search backwards for a fork smaller than or equal to the one we know.
395
        while this_block.fork() > fork {
×
396
            rollback_depth = rollback_depth.saturating_add(1);
×
397
            entry = entry.prev()?;
×
398

399
            this_block = entry.value().clone();
×
400
        }
401

402
        Some((this_block, rollback_depth))
×
403
    }
×
404

405
    /// Get the point of the block at the head of the live chain.
406
    fn get_live_head_point(&self) -> Option<Point> {
×
407
        let live_chain = self.0.read().map_err(|_| Error::Internal).ok()?;
×
408

409
        let head_point = Self::get_last_live_point(&live_chain);
×
410
        if head_point == Point::UNKNOWN {
×
411
            return None;
×
412
        }
×
413

×
414
        Some(head_point)
×
415
    }
×
416
}
417

418
/// Get the `LiveChainBlockList` for a particular `Network`.
419
fn get_live_chain(chain: Network) -> ProtectedLiveChainBlockList {
×
420
    // Get a reference to our live chain storage.
421
    // This SHOULD always exist, because its initialized exhaustively.
422
    // If this FAILS, Recreate a blank chain, but log an error as its a serious UNRECOVERABLE
423
    // BUG.
424
    let entry = if let Some(entry) = LIVE_CHAINS.get(&chain) {
×
425
        entry
×
426
    } else {
427
        error!(
×
428
            chain = chain.to_string(),
×
429
            "Internal Error: Chain Sync Failed to find chain in LIVE_CHAINS"
×
430
        );
431

432
        // Try and correct the error.
433
        LIVE_CHAINS.insert(chain, ProtectedLiveChainBlockList::new());
×
434

×
435
        // This should NOT fail, because we just inserted it, its catastrophic failure if it does.
×
436
        #[allow(clippy::expect_used)]
×
437
        LIVE_CHAINS
×
438
            .get(&chain)
×
439
            .expect("Internal Error: Chain Sync Failed to find chain in LIVE_CHAINS")
×
440
    };
441

442
    let value = entry.value();
×
443
    value.clone()
×
444
}
×
445

446
/// Get the head `Point` currently in the live chain.
447
pub(crate) fn get_live_head_point(chain: Network) -> Option<Point> {
×
448
    let live_chain = get_live_chain(chain);
×
449
    live_chain.get_live_head_point()
×
450
}
×
451

452
/// Get the Live block relative to the specified point.
453
/// The starting block must exist if the search is strict.
454
pub(crate) fn get_live_block(
×
455
    chain: Network, point: &Point, advance: i64, strict: bool,
×
456
) -> Option<MultiEraBlock> {
×
457
    let live_chain = get_live_chain(chain);
×
458
    live_chain.get_block(point, advance, strict)
×
459
}
×
460

461
/// Get the fill to point for a chain.
462
///
463
/// Returns the Point of the block we are filling up-to, and it's fork count.
464
///
465
/// Note: It MAY change between calling this function and actually backfilling.
466
/// This is expected and normal behavior.
467
pub(crate) async fn get_fill_to_point(chain: Network) -> (Point, u64) {
×
468
    let live_chain = get_live_chain(chain);
×
469

470
    loop {
471
        if let Some(earliest_block) = live_chain.get_earliest_block() {
×
472
            return (earliest_block.point(), earliest_block.fork().into());
×
473
        }
×
474
        // Nothing in the Live chain to sync to, so wait until there is.
×
475
        tokio::time::sleep(Duration::from_secs(DATA_RACE_BACKOFF_SECS)).await;
×
476
    }
477
}
×
478

479
/// Insert a block into the live chain (in-order).
480
/// Can ONLY be used to add a new tip block to the live chain.
481
/// `rollback_count` should be set to 1 on the very first connection, after that,
482
/// it is maintained by this function, and MUST not be modified elsewhere.
483
pub(crate) fn live_chain_add_block_to_tip(
×
484
    chain: Network, block: MultiEraBlock, fork: &mut Fork, tip: Point,
×
485
) -> Result<()> {
×
486
    let live_chain = get_live_chain(chain);
×
487
    live_chain.add_block_to_tip(chain, block, fork, tip)
×
488
}
×
489

490
/// Backfill the live chain with the block set provided.
491
pub(crate) fn live_chain_backfill(chain: Network, blocks: &[MultiEraBlock]) -> Result<()> {
×
492
    let live_chain = get_live_chain(chain);
×
493
    live_chain.backfill(chain, blocks)
×
494
}
×
495

496
/// Get the length of the live chain.
497
/// Probably used by debug code only, so its ok if this is not use.
498
pub(crate) fn live_chain_length(chain: Network) -> usize {
×
499
    let live_chain = get_live_chain(chain);
×
500
    live_chain.len()
×
501
}
×
502

503
/// On an immutable update, purge the live-chain up to the new immutable tip.
504
/// Will error if the point is not in the Live chain.
505
pub(crate) fn purge_live_chain(chain: Network, point: &Point) -> Result<()> {
×
506
    let live_chain = get_live_chain(chain);
×
507
    live_chain.purge(chain, point)
×
508
}
×
509

510
/// Get intersection points to try and find best point to connect to the node on
511
/// reconnect.
512
pub(crate) fn get_intersect_points(chain: Network) -> Vec<pallas::network::miniprotocols::Point> {
×
513
    let live_chain = get_live_chain(chain);
×
514
    live_chain.get_intersect_points()
×
515
}
×
516

517
/// Find best block from a fork relative to a point.
518
pub(crate) fn find_best_fork_block(
×
519
    chain: Network, point: &Point, previous_point: &Point, fork: Fork,
×
520
) -> Option<(MultiEraBlock, u64)> {
×
521
    let live_chain = get_live_chain(chain);
×
522
    live_chain.find_best_fork_block(point, previous_point, fork)
×
523
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc