• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

input-output-hk / catalyst-libs / 16054905839

03 Jul 2025 03:45PM UTC coverage: 65.701% (+0.02%) from 65.683%
16054905839

Pull #405

github

web-flow
Merge 30c505c79 into 31f00a4a8
Pull Request #405: fix(rust/rbac-registration): Fix `validate_txn_inputs_hash` error message

0 of 1 new or added line in 1 file covered. (0.0%)

6 existing lines in 2 files now uncovered.

10972 of 16700 relevant lines covered (65.7%)

2614.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/rust/cardano-chain-follower/src/mithril_turbo_downloader.rs
1
//! Turbo Downloads for Mithril Snapshots.
2

3
use std::{
4
    cmp,
5
    ffi::OsStr,
6
    io::{BufReader, ErrorKind, Read},
7
    path::{Path, PathBuf},
8
    // process::Stdio,
9
    sync::{
10
        atomic::{AtomicU64, Ordering},
11
        Arc, OnceLock,
12
    },
13
};
14

15
use anyhow::{anyhow, bail};
16
use async_trait::async_trait;
17
use catalyst_types::{conversion::from_saturating, mmap_file::MemoryMapFile};
18
use dashmap::DashSet;
19
use memx::memcmp;
20
use mithril_client::{
21
    common::CompressionAlgorithm,
22
    file_downloader::{DownloadEvent, FileDownloader, FileDownloaderUri},
23
    MithrilResult,
24
};
25
use tar::{Archive, EntryType};
26
use tokio::{fs::create_dir_all, task::spawn_blocking};
27
use tracing::{debug, error};
28
use zstd::Decoder;
29

30
use crate::{
31
    mithril_snapshot_config::MithrilSnapshotConfig,
32
    mithril_snapshot_data::latest_mithril_snapshot_data,
33
    stats::{self},
34
    turbo_downloader::ParallelDownloadProcessor,
35
};
36

37
/// A snapshot downloader that accelerates Download using `aria2`.
38
pub struct Inner {
39
    /// Configuration for the snapshot sync.
40
    cfg: MithrilSnapshotConfig,
41
    /// Last hashmap/list of changed chunks from the previous download
42
    new_chunks: Arc<DashSet<PathBuf>>,
43

44
    /// The number of files that were new in this download.
45
    new_files: AtomicU64,
46
    /// The number of files that changed in this download.
47
    chg_files: AtomicU64,
48
    /// The total number of files in the download.
49
    tot_files: AtomicU64,
50
    /// The total size of the files extracted in the download.
51
    ext_size: AtomicU64,
52
    /// The total size of the files we deduplicated.
53
    dedup_size: AtomicU64,
54

55
    /// The download processor for the current file download.
56
    dl_handler: std::sync::OnceLock<ParallelDownloadProcessor>,
57
}
58

59
/// This macro is what happens every time the file is different from previous.
60
macro_rules! changed_file {
61
    ($self:ident, $rel_file:ident, $abs_file:ident, $new_size:ident) => {
62
        $self.chg_files.fetch_add(1, Ordering::SeqCst);
63
        if $abs_file.extension() == Some(OsStr::new("chunk")) {
64
            $self.new_chunks.insert($abs_file);
65
        }
66
    };
67
}
68

69
/// This macro is what happens every time we decide the file can't be deduplicated.
70
macro_rules! new_file {
71
    ($self:ident, $rel_file:ident, $abs_file:ident, $new_size:ident) => {
72
        $self.new_files.fetch_add(1, Ordering::SeqCst);
73
        if $abs_file.extension() == Some(OsStr::new("chunk")) {
74
            $self.new_chunks.insert($abs_file);
75
        }
76
    };
77
}
78

79
impl Inner {
80
    /// Synchronous Download and Dedup archive.
81
    ///
82
    /// Stream Downloads and Decompresses files, and deduplicates them as they are
83
    /// extracted from the embedded tar archive.
84
    ///
85
    /// Per Entry:
86
    ///   If the file is NOT to be deduplicated, OR A previous file with the same name and
87
    /// size does not     exist, then just extract it where its supposed to go.
88
    ///
89
    /// To Dedup, the original file is mam-mapped.
90
    /// The new file is extracted to an in-memory buffer.
91
    /// If they compare the same, the original file is `HardLinked` to the new file name.
92
    /// Otherwise the new file buffer is saved to disk with the new file name.
93
    fn dl_and_dedup(&self, _location: &str, _target_dir: &Path) -> MithrilResult<()> {
×
94
        let mut archive = self.create_archive_extractor()?;
×
95

96
        // Iterate the files in the archive.
97
        let entries = match archive.entries() {
×
98
            Ok(entries) => entries,
×
99
            Err(error) => bail!("Failed to get entries from the archive: {error}"),
×
100
        };
101

102
        let tmp_dir = self.cfg.tmp_path();
×
103
        let latest_snapshot = latest_mithril_snapshot_data(self.cfg.chain);
×
104

105
        for entry in entries {
×
106
            let mut entry = match entry {
×
107
                Ok(entry) => entry,
×
108
                Err(error) => bail!("Failed to get an entry from the archive: {error}"),
×
109
            };
110
            let rel_file = entry.path()?.to_path_buf();
×
111
            let entry_size = entry.size();
×
112

×
113
            // debug!(chain = %self.cfg.chain, "DeDup : Extracting {}:{} loc {location} target {}",
×
114
            // rel_file.to_string_lossy(), entry_size, target_dir.to_string_lossy());
×
115

×
116
            // Check if we need to extract this path or not.
×
117
            if !self.check_for_extract(&rel_file, entry.header().entry_type()) {
×
118
                continue;
×
119
            }
×
120

×
121
            // Count total files processed.
×
122
            self.tot_files.fetch_add(1, Ordering::SeqCst);
×
123

×
124
            let mut abs_file = tmp_dir.clone();
×
125
            abs_file.push(rel_file.clone());
×
126

×
127
            let mut prev_file = latest_snapshot.id().path_if_exists();
×
128
            if let Some(prev_file) = &mut prev_file {
×
129
                prev_file.push(rel_file.clone());
×
130
            }
×
131

132
            // debug!(chain = %self.cfg.chain, "DeDup : tmp_dir {} abs_file {} prev_file
133
            // {prev_file:?}", tmp_dir.to_string_lossy(), abs_file.to_string_lossy() );
134

135
            self.ext_size.fetch_add(entry_size, Ordering::SeqCst);
×
136

137
            // Try and deduplicate the file if we can, otherwise just extract it.
138
            if let Ok(prev_mmap) = Self::can_deduplicate(&rel_file, entry_size, prev_file.as_ref())
×
139
            {
140
                let expected_file_size = from_saturating(entry_size);
×
141
                let mut buf: Vec<u8> = Vec::with_capacity(expected_file_size);
×
142
                if entry.read_to_end(&mut buf)? != expected_file_size {
×
143
                    bail!(
×
144
                        "Failed to read file {} of size {} got {}",
×
145
                        rel_file.display(),
×
146
                        entry_size,
×
147
                        buf.len()
×
148
                    );
×
149
                }
×
150
                // Got the full file and its the expected size.  Is it different?
×
151
                if memcmp(prev_mmap.as_slice(), buf.as_slice()) == cmp::Ordering::Equal {
×
152
                    // Same so lets Hardlink it, and throw away the temp buffer.
153

154
                    // Make sure our big mmap get dropped.
155
                    drop(prev_mmap);
×
156

×
157
                    // File is the same, so dedup it.
×
158
                    if self.cfg.dedup_tmp(&abs_file, &latest_snapshot).is_ok() {
×
159
                        self.dedup_size.fetch_add(entry_size, Ordering::SeqCst);
×
160
                        changed_file!(self, rel_file, abs_file, entry_size);
×
161
                        drop(buf);
×
162
                        continue;
×
163
                    }
×
164
                }
×
165

166
                if let Err(error) = std::fs::write(&abs_file, buf) {
×
167
                    error!(chain = %self.cfg.chain, "Failed to write file {} got {}", abs_file.display(), error);
×
168
                    bail!("Failed to write file {} got {}", abs_file.display(), error);
×
169
                }
×
170
            } else {
171
                // No dedup, just extract it into the tmp directory as-is.
172
                entry.unpack_in(&tmp_dir).inspect_err(|e| {
×
173
                    // Handle known I/O error kinds explicitly - `StorageFull`
×
174
                    // All other error kinds are logged as unhandled.
×
175
                    if e.kind() == ErrorKind::StorageFull {
×
176
                        error!(
×
177
                            chain = %self.cfg.chain,
178
                            error = %e,
179
                            "Storage full while extracting file {rel_file:?} with size {entry_size}"
×
180
                        );
181
                    } else {
182
                        error!(
×
183
                            chain = %self.cfg.chain,
184
                            error = %e,
185
                            "Unhandled I/O error kind: {}", e.kind());
×
186
                    }
187
                })?;
×
188
                debug!(chain = %self.cfg.chain, "DeDup: Extracted file {rel_file:?}:{entry_size}");
×
189
            }
190
            new_file!(self, rel_file, abs_file, entry_size);
×
191
        }
192

193
        let Some(dl_handler) = self.dl_handler.get() else {
×
194
            bail!("Failed to get the Parallel Download processor!");
×
195
        };
196

197
        debug!(chain = %self.cfg.chain, "Download {} bytes", dl_handler.dl_size());
×
198

199
        stats::mithril_dl_finished(self.cfg.chain, Some(dl_handler.dl_size()));
×
200

×
201
        Ok(())
×
202
    }
×
203

204
    /// Create a TAR archive extractor from the downloading file and a zstd decompressor.
205
    fn create_archive_extractor(
×
206
        &self,
×
207
    ) -> MithrilResult<Archive<Decoder<'static, BufReader<BufReader<ParallelDownloadProcessor>>>>>
×
208
    {
×
209
        let Some(dl_handler) = self.dl_handler.get() else {
×
210
            bail!("Failed to get the Parallel Download processor!");
×
211
        };
212
        let buf_reader = BufReader::new(dl_handler.clone());
×
213
        let decoder = match zstd::Decoder::new(buf_reader) {
×
214
            Ok(decoder) => decoder,
×
215
            Err(error) => bail!("Failed to create ZSTD decoder: {error}"),
×
216
        };
217
        Ok(tar::Archive::new(decoder))
×
218
    }
×
219

220
    /// Check if we are supposed to extract this file from the archive or not.
221
    fn check_for_extract(&self, path: &Path, extract_type: EntryType) -> bool {
×
222
        if path.is_absolute() {
×
223
            error!(chain = %self.cfg.chain, "DeDup : Cannot extract an absolute path:  {:?}", path);
×
224
            return false;
×
225
        }
×
226

×
227
        if extract_type.is_dir() {
×
228
            // We don't do anything with just a path, so skip it.
229
            return false;
×
230
        }
×
231

×
232
        if !extract_type.is_file() {
×
233
            error!(chain  = %self.cfg.chain, "DeDup : Cannot extract a non-file: {:?}:{:?}", path, extract_type);
×
234
            return false;
×
235
        }
×
236

×
237
        true
×
238
    }
×
239

240
    /// Check if a given path from the archive is able to be deduplicated.
241
    fn can_deduplicate(
×
242
        rel_file: &Path, file_size: u64, prev_file: Option<&PathBuf>,
×
243
    ) -> MithrilResult<MemoryMapFile> {
×
244
        // Can't dedup if the current file is not de-dupable (must be immutable)
×
245
        if rel_file.starts_with("immutable") {
×
246
            // Can't dedup if we don't have a previous file to dedup against.
247
            if let Some(prev_file) = prev_file {
×
248
                if let Some(current_size) = get_file_size_sync(prev_file) {
×
249
                    // If the current file is not exactly the same as the previous file size, we
250
                    // can't dedup.
251
                    if file_size == current_size {
×
252
                        if let Ok(pref_file_loaded) = Self::mmap_open_sync(prev_file) {
×
253
                            if pref_file_loaded.size() == file_size {
×
254
                                return Ok(pref_file_loaded);
×
255
                            }
×
256
                        }
×
257
                    }
×
258
                }
×
259
            }
×
260
        }
×
261
        bail!("Can not deduplicate.");
×
262
    }
×
263

264
    /// Open a file using mmap for performance.
265
    fn mmap_open_sync(path: &Path) -> MithrilResult<MemoryMapFile> {
×
266
        match MemoryMapFile::try_from(path) {
×
267
            Ok(mmap_file) => Ok(mmap_file),
×
268
            Err(error) => {
×
269
                error!(error=%error, file=%path.to_string_lossy(), "Failed to open file");
×
270
                Err(error.into())
×
271
            },
272
        }
273
    }
×
274
}
275

276
/// A snapshot downloader that accelerates Download using `aria2`.
277
pub struct MithrilTurboDownloader {
278
    /// inner arc wrapped configuration
279
    inner: Arc<Inner>,
280
}
281

282
impl MithrilTurboDownloader {
283
    /// Constructs a new `HttpSnapshotDownloader`.
284
    pub fn new(cfg: MithrilSnapshotConfig) -> Self {
×
285
        // Test if the HTTP Client can properly be created.
×
286
        let dl_config = cfg.dl_config.clone().unwrap_or_default();
×
287

×
288
        let cfg = cfg.with_dl_config(dl_config);
×
289

×
290
        Self {
×
291
            inner: Arc::new(Inner {
×
292
                cfg,
×
293
                new_chunks: Arc::new(DashSet::new()),
×
294
                new_files: AtomicU64::new(0),
×
295
                chg_files: AtomicU64::new(0),
×
296
                tot_files: AtomicU64::new(0),
×
297
                ext_size: AtomicU64::new(0),
×
298
                dedup_size: AtomicU64::new(0),
×
299
                dl_handler: OnceLock::new(),
×
300
            }),
×
301
        }
×
302
    }
×
303

304
    /// Take the hashmap for the previous download.
305
    pub fn get_new_chunks(&self) -> Arc<DashSet<PathBuf>> {
×
306
        self.inner.new_chunks.clone()
×
307
    }
×
308

309
    /// Create directories required to exist for download to succeed.
310
    async fn create_directories(&self, target_dir: &Path) -> MithrilResult<()> {
×
311
        if let Err(error) = create_dir_all(target_dir).await {
×
312
            let msg = format!(
×
313
                "Target directory {} could not be created: {}",
×
314
                target_dir.to_string_lossy(),
×
315
                error
×
316
            );
×
317
            Err(anyhow!(msg.clone()).context(msg))?;
×
318
        }
×
319

320
        Ok(())
×
321
    }
×
322

323
    /// Parallel Download, Extract and Dedup the Mithril Archive.
324
    async fn dl_and_dedup(&self, location: &str, target_dir: &Path) -> MithrilResult<()> {
×
325
        // Get a copy of the inner data to use in the sync download task.
×
326
        let inner = self.inner.clone();
×
327
        let location = location.to_owned();
×
328
        let target_dir = target_dir.to_owned();
×
329

330
        // This is fully synchronous IO, so do it on a sync thread.
331
        let result = spawn_blocking(move || {
×
332
            stats::start_thread(
×
333
                inner.cfg.chain,
×
334
                stats::thread::name::MITHRIL_DL_DEDUP,
×
335
                false,
×
336
            );
×
337
            let result = inner.dl_and_dedup(&location, &target_dir);
×
338
            stats::stop_thread(inner.cfg.chain, stats::thread::name::MITHRIL_DL_DEDUP);
×
339
            result
×
340
        })
×
341
        .await;
×
342

343
        // Must always be called when download ends, regardless of the reason.
344
        stats::mithril_dl_finished(self.inner.cfg.chain, None);
×
345

346
        if let Ok(result) = result {
×
347
            return result;
×
348
        }
×
349

×
350
        bail!("Download and Dedup task failed");
×
351
    }
×
352
}
353

354
/// Get the size of a particular file.  None = failed to get size (doesn't matter why).
355
fn get_file_size_sync(file: &Path) -> Option<u64> {
×
356
    let Ok(metadata) = file.metadata() else {
×
357
        return None;
×
358
    };
359
    Some(metadata.len())
×
360
}
×
361

362
#[async_trait]
363
impl FileDownloader for MithrilTurboDownloader {
364
    async fn download_unpack(
365
        &self, location: &FileDownloaderUri, file_size: u64, target_dir: &Path,
366
        compression_algorithm: Option<CompressionAlgorithm>, download_event_type: DownloadEvent,
367
    ) -> MithrilResult<()> {
×
368
        debug!(
×
369
            ?location,
370
            file_size,
371
            ?target_dir,
372
            ?compression_algorithm,
373
            ?download_event_type,
374
            "Download And Unpack Mithril."
×
375
        );
376

377
        // We only support full downloads for now.
378
        if !matches!(download_event_type, DownloadEvent::Full { .. }) {
×
379
            bail!("Unsupported Download Event Type: {:?}", download_event_type);
×
380
        }
×
381

×
382
        let location = location.as_str();
×
383

×
384
        // Probe was removed in FileDownloader, so call it directly
×
385
        self.probe(location).await?;
×
386

387
        self.create_directories(target_dir).await?;
×
388

389
        // DL Start stats set after DL actually started inside the probe call.
390
        self.dl_and_dedup(location, target_dir).await?;
×
391

392
        let tot_files = self.inner.tot_files.load(Ordering::SeqCst);
×
393
        let chg_files = self.inner.chg_files.load(Ordering::SeqCst);
×
394
        let new_files = self.inner.new_files.load(Ordering::SeqCst);
×
395

×
396
        stats::mithril_extract_finished(
×
397
            self.inner.cfg.chain,
×
398
            Some(self.inner.ext_size.load(Ordering::SeqCst)),
×
399
            self.inner.dedup_size.load(Ordering::SeqCst),
×
400
            tot_files
×
401
                .saturating_sub(chg_files)
×
402
                .saturating_sub(new_files),
×
403
            chg_files,
×
404
            new_files,
×
405
        );
×
406

×
407
        debug!("Download and Unpack finished='{location}' to '{target_dir:?}'.");
×
408

409
        Ok(())
×
410
    }
×
411
}
412

413
impl MithrilTurboDownloader {
414
    /// Set up the download.  
415
    /// Called `probe` as this used to exist in an earlier trait which was removed.
416
    async fn probe(&self, location: &str) -> MithrilResult<()> {
×
417
        debug!("Probe Snapshot location='{location}'.");
×
418
        let dl_config = self.inner.cfg.dl_config.clone().unwrap_or_default();
×
419
        let dl_processor =
×
420
            ParallelDownloadProcessor::new(location, dl_config, self.inner.cfg.chain).await?;
×
421

422
        // Decompress and extract and de-dupe each file in the archive.
423
        stats::mithril_extract_started(self.inner.cfg.chain);
×
424

×
425
        // We also immediately start downloading now.
×
426
        stats::mithril_dl_started(self.inner.cfg.chain);
×
427

428
        // Save the DownloadProcessor in the inner struct for use to process the downloaded data.
429
        if let Err(_error) = self.inner.dl_handler.set(dl_processor) {
×
430
            bail!("Failed to set the inner dl_handler. Must already be set?");
×
431
        }
×
432

×
433
        Ok(())
×
434
    }
×
435
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc