• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

input-output-hk / catalyst-libs / 15142981637

20 May 2025 04:33PM UTC coverage: 65.619%. First build
15142981637

Pull #342

github

web-flow
Merge e2d902de3 into 2971d7241
Pull Request #342: feat(docs): Add payload examples

10944 of 16678 relevant lines covered (65.62%)

2481.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/rust/cardano-chain-follower/src/turbo_downloader/mod.rs
1
//! Serializable Parallel Download Processor
2
//!
3
//! Provides the capability to quickly download a large file using parallel connections,
4
//! but still process the data sequentially, without requiring the entire file to be
5
//! downloaded at once.
6
//!
7
//! NOTE: This uses synchronous threading and HTTP Gets because Async proved to be highly
8
//! variable in its performance.
9

10
use std::{
11
    io::Read,
12
    net::SocketAddr,
13
    sync::{
14
        atomic::{AtomicU64, AtomicUsize, Ordering},
15
        Arc, Mutex, OnceLock,
16
    },
17
    thread,
18
    time::Duration,
19
};
20

21
use anyhow::{anyhow, bail, Context, Result};
22
use cardano_blockchain_types::Network;
23
use catalyst_types::conversion::from_saturating;
24
use crossbeam_channel::{Receiver, RecvError};
25
use dashmap::DashMap;
26
use http::{
27
    header::{ACCEPT_RANGES, CONTENT_LENGTH, RANGE},
28
    StatusCode,
29
};
30
use tracing::{debug, error};
31

32
use crate::stats;
33

34
/// A Simple DNS Balancing Resolver
35
struct BalancingResolver {
36
    /// The actual resolver
37
    resolver: hickory_resolver::Resolver,
38
    /// A Cache of the Sockets we already resolved for a URL.
39
    cache: moka::sync::Cache<String, Arc<Vec<SocketAddr>>>,
40
}
41

42
/// We only have one resolver.
43
static RESOLVER: OnceLock<BalancingResolver> = OnceLock::new();
44

45
impl BalancingResolver {
46
    /// Initialize the resolver, only does something once, but safe to call multiple
47
    /// times.
48
    fn init(_cfg: &DlConfig) -> Result<()> {
×
49
        // Can ONLY init the Resolver once, just return if we try and do it multiple times.
×
50
        if RESOLVER.get().is_none() {
×
51
            // Construct a new Resolver with default configuration options
52
            let resolver = match hickory_resolver::Resolver::from_system_conf() {
×
53
                Ok(r) => r,
×
54
                Err(e) => {
×
55
                    error!("Failed to initialize DNS Balancing Resolver from system configuration, using Google DNS as fallback: {}", e);
×
56
                    hickory_resolver::Resolver::new(
×
57
                        hickory_resolver::config::ResolverConfig::default(),
×
58
                        hickory_resolver::config::ResolverOpts::default(),
×
59
                    )?
×
60
                },
61
            };
62

63
            let cache = moka::sync::Cache::builder()
×
64
                // We should nto be caching lots of different URL's
×
65
                .max_capacity(10)
×
66
                // Time to live (TTL): 60 minutes
×
67
                .time_to_live(Duration::from_secs(60 * 60))
×
68
                // Time to idle (TTI):  5 minutes
×
69
                .time_to_idle(Duration::from_secs(5 * 60))
×
70
                // Create the cache.
×
71
                .build();
×
72

×
73
            // We don't really care if this is already set.
×
74
            let _unused = RESOLVER.set(BalancingResolver { resolver, cache });
×
75
        }
×
76
        Ok(())
×
77
    }
×
78

79
    /// Resolve the given URL with the configured resolver.
80
    fn resolve(&self, url: &str, worker: usize) -> std::io::Result<Vec<SocketAddr>> {
×
81
        // debug!("Resolving: {url} for {worker}");
82
        let addresses = if let Some(addresses) = self.cache.get(url) {
×
83
            addresses
×
84
        } else {
85
            let Some((host, port_str)) = url.split_once(':') else {
×
86
                return Err(std::io::Error::new(
×
87
                    std::io::ErrorKind::InvalidData,
×
88
                    "Could not parse URL",
×
89
                ));
×
90
            };
91

92
            let port: u16 = port_str.parse().map_err(|_| {
×
93
                std::io::Error::new(
×
94
                    std::io::ErrorKind::InvalidData,
×
95
                    "Could not parse port number",
×
96
                )
×
97
            })?;
×
98

99
            let mut all_addresses: Vec<SocketAddr> = Vec::new();
×
100
            for addr in self.resolver.lookup_ip(host.to_string())?.iter() {
×
101
                all_addresses.push(SocketAddr::new(addr, port));
×
102
            }
×
103

104
            let addresses = Arc::new(all_addresses);
×
105
            self.cache.insert(url.to_string(), addresses.clone());
×
106
            addresses
×
107
        };
108
        let worker_addresses = worker.checked_rem(addresses.len()).ok_or_else(|| {
×
109
            std::io::Error::new(
×
110
                std::io::ErrorKind::Other,
×
111
                format!(
×
112
                    "Unexpected index: worker = {}, addresses len = {}",
×
113
                    worker,
×
114
                    addresses.len()
×
115
                ),
×
116
            )
×
117
        })?;
×
118
        // Safe because we bound the index with the length of `addresses`.
119
        #[allow(clippy::indexing_slicing)]
120
        Ok(vec![addresses[worker_addresses]])
×
121
    }
×
122
}
123

124
// Timeout if connection can not be made in 10 seconds.
125
// const CONNECTION_TIMEOUT: Duration = Duration::from_secs(10);
126

127
// Timeout if no data received for 5 seconds.
128
// const DATA_READ_TIMEOUT: Duration = Duration::from_secs(5);
129

130
/// Minimum rational size of a chunk in bytes.
131
const MIN_CHUNK_SIZE: usize = 1024 * 4; // 4 KB
132

133
/// Parallel Downloader Tuning parameters
134
#[derive(Clone, Debug)]
135
#[allow(clippy::struct_excessive_bools)]
136
pub struct DlConfig {
137
    /// Maximum number of parallel connections to use.
138
    pub workers: usize,
139
    /// Size of a chunk in bytes (except the last).
140
    pub chunk_size: usize,
141
    /// Maximum number of chunks queued ahead to workers.
142
    pub queue_ahead: usize,
143
    /// Timeout for each connection.
144
    pub connection_timeout: Option<Duration>,
145
    /// Timeout for each data read.
146
    pub data_read_timeout: Option<Duration>,
147
}
148

149
impl DlConfig {
150
    /// Create a new `DlConfig`
151
    #[must_use]
152
    pub fn new() -> Self {
×
153
        DlConfig::default()
×
154
    }
×
155

156
    /// Change the number of workers
157
    #[must_use]
158
    pub fn with_workers(mut self, workers: usize) -> Self {
×
159
        self.workers = workers;
×
160
        self
×
161
    }
×
162

163
    /// Change the chunk size
164
    #[must_use]
165
    pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
×
166
        self.chunk_size = chunk_size;
×
167
        self
×
168
    }
×
169

170
    /// Change the number of chunks queued ahead to workers
171
    #[must_use]
172
    pub fn with_queue_ahead(mut self, queue_ahead: usize) -> Self {
×
173
        self.queue_ahead = queue_ahead;
×
174
        self
×
175
    }
×
176

177
    /// Change the connection timeout
178
    #[must_use]
179
    pub fn with_connection_timeout(mut self, connection_timeout: Duration) -> Self {
×
180
        self.connection_timeout = Some(connection_timeout);
×
181
        self
×
182
    }
×
183

184
    /// Change the data read timeout
185
    #[must_use]
186
    pub fn with_data_read_timeout(mut self, data_read_timeout: Duration) -> Self {
×
187
        self.data_read_timeout = Some(data_read_timeout);
×
188
        self
×
189
    }
×
190

191
    /// Resolve DNS addresses using Hickory Resolver
192
    fn resolve(url: &str, worker: usize) -> std::io::Result<Vec<SocketAddr>> {
×
193
        let Some(resolver) = RESOLVER.get() else {
×
194
            return Err(std::io::Error::new(
×
195
                std::io::ErrorKind::Other,
×
196
                "Resolver not initialized.",
×
197
            ));
×
198
        };
199

200
        resolver.resolve(url, worker)
×
201
    }
×
202

203
    /// Builds a `UReq` Agent.  
204
    ///
205
    /// Because we need multiple clients to prevent all traffic being forced onto a single
206
    /// connection when HTTP2 is used, the client can NOT be supplied by the user.
207
    /// Instead we create a new one here based on their configuration.
208
    pub(crate) fn make_http_agent(&self, worker: usize) -> ureq::Agent {
×
209
        let mut agent = ureq::AgentBuilder::new();
×
210

211
        if let Some(timeout) = self.connection_timeout {
×
212
            agent = agent.timeout_connect(timeout);
×
213
        }
×
214

215
        if let Some(timeout) = self.data_read_timeout {
×
216
            agent = agent.timeout_read(timeout);
×
217
        }
×
218

219
        let agent = agent.resolver(move |url: &str| Self::resolve(url, worker));
×
220

×
221
        agent.build()
×
222
    }
×
223
}
224

225
impl Default for DlConfig {
226
    fn default() -> Self {
×
227
        DlConfig {
×
228
            workers: 16,
×
229
            chunk_size: 2 * 1024 * 1024,
×
230
            queue_ahead: 3,
×
231
            connection_timeout: None,
×
232
            data_read_timeout: None,
×
233
        }
×
234
    }
×
235
}
236

237
/// An Individual Downloaded block of data.
238
/// Wrapped in an ARC so its cheap to clone and pass between threads.
239
type DlBlock = Arc<Vec<u8>>;
240

241
/// Downloaded Chunk (or error if it fails).
242
#[derive(Clone)]
243
struct DlChunk {
244
    /// Index of the worker that fetched the chunk.
245
    worker: usize,
246
    /// Index of the chunk in the file.
247
    chunk_num: usize,
248
    /// The data from the chunk. (None == failed)
249
    chunk: Option<DlBlock>,
250
}
251

252
/// Download Chunk Work Order.
253
/// This is simply the number of the chunk next to fetch.
254
/// When finished, the queue is just closed.
255
type DlWorkOrder = usize;
256

257
/// Parallel Download Processor Inner struct.
258
///
259
/// Note: Maximum Potential Working set in memory will ==  `dl_chunk` * ((`workers` *
260
/// `queue_ahead`) + 1)
261
struct ParallelDownloadProcessorInner {
262
    /// URL to download from.
263
    url: String,
264
    /// Configuration
265
    cfg: DlConfig,
266
    /// Size of the file we expect to download.
267
    file_size: usize,
268
    /// The last chunk we can request
269
    last_chunk: usize,
270
    /// Skip map used to reorder incoming chunks back into sequential order.
271
    reorder_queue: DashMap<usize, DlChunk>,
272
    /// A queue for each worker to send them new work orders.
273
    work_queue: DashMap<usize, crossbeam_channel::Sender<DlWorkOrder>>,
274
    /// New Chunk Queue - Just says we added a new chunk to the reorder queue.
275
    new_chunk_queue_tx: crossbeam_channel::Sender<Option<()>>,
276
    /// New Chunk Queue - Just says we added a new chunk to the reorder queue.
277
    new_chunk_queue_rx: crossbeam_channel::Receiver<Option<()>>,
278
    /// Statistic tracking number of bytes downloaded per worker.
279
    bytes_downloaded: Vec<AtomicU64>,
280
    /// Left Over Bytes (from the reader)
281
    left_over_bytes: Mutex<Option<(Arc<Vec<u8>>, usize)>>,
282
    /// Next Expected Chunk
283
    next_expected_chunk: AtomicUsize,
284
    /// Next Chunk to Request
285
    next_requested_chunk: AtomicUsize,
286
}
287

288
impl Drop for ParallelDownloadProcessorInner {
289
    /// Cleanup the channel and workers.
290
    fn drop(&mut self) {
×
291
        debug!("Drop ParallelDownloadProcessorInner");
×
292
        self.reorder_queue.clear();
×
293
        self.reorder_queue.shrink_to_fit();
×
294
        self.work_queue.clear();
×
295
        self.work_queue.shrink_to_fit();
×
296
    }
×
297
}
298

299
impl ParallelDownloadProcessorInner {
300
    /// Get how many bytes were downloaded, total.
301
    pub(crate) fn total_bytes(&self) -> u64 {
×
302
        self.bytes_downloaded
×
303
            .iter()
×
304
            .map(|x| x.load(Ordering::SeqCst))
×
305
            .sum::<u64>()
×
306
    }
×
307

308
    /// Get start offset of a chunk.
309
    fn chunk_start(&self, chunk: usize) -> usize {
×
310
        self.cfg.chunk_size.saturating_mul(chunk)
×
311
    }
×
312

313
    /// Get inclusive end offset of a chunk.
314
    fn chunk_end(&self, chunk: usize) -> usize {
×
315
        let start = self.chunk_start(chunk);
×
316
        if start.saturating_add(self.cfg.chunk_size) >= self.file_size {
×
317
            self.file_size.saturating_sub(1)
×
318
        } else {
319
            start.saturating_add(self.cfg.chunk_size).saturating_sub(1)
×
320
        }
321
    }
×
322

323
    /// Sends a GET request to download a chunk of the file at the specified range
324
    fn get_range(&self, agent: &ureq::Agent, chunk: usize) -> Result<Arc<Vec<u8>>> {
×
325
        let range_start = self.chunk_start(chunk);
×
326
        let range_end_inclusive = self.chunk_end(chunk);
×
327
        let range_header = format!("bytes={range_start}-{range_end_inclusive}");
×
328
        let get_range_response = agent
×
329
            .get(&self.url)
×
330
            .set(RANGE.as_str(), &range_header)
×
331
            .call()
×
332
            .context("GET ranged request failed")?;
×
333
        // let addr = get_range_response.remote_addr();
334
        // debug!("Chunk {chunk} from {addr:?}");
335
        if get_range_response.status() != StatusCode::PARTIAL_CONTENT {
×
336
            bail!(
×
337
                "Response to range request has an unexpected status code (expected {}, found {})",
×
338
                StatusCode::PARTIAL_CONTENT,
×
339
                get_range_response.status()
×
340
            )
×
341
        }
×
342

×
343
        let range_size = range_end_inclusive
×
344
            .saturating_sub(range_start)
×
345
            .saturating_add(1);
×
346
        let mut bytes: Vec<u8> = Vec::with_capacity(range_size);
×
347

348
        let bytes_read = get_range_response
×
349
            .into_reader()
×
350
            .take(from_saturating(range_size))
×
351
            .read_to_end(&mut bytes)?;
×
352

353
        if bytes_read != range_size {
×
354
            bail!("Expected {range_size} bytes in response, but only read {bytes_read}")
×
355
        }
×
356

×
357
        Ok(Arc::new(bytes))
×
358
    }
×
359

360
    /// Queue Chunk to processor.
361
    ///
362
    /// Reorders chunks and sends to the consumer.
363
    fn reorder_queue(&self, chunk: DlChunk) -> Result<()> {
×
364
        self.reorder_queue.insert(chunk.chunk_num, chunk);
×
365
        self.new_chunk_queue_tx.send(Some(()))?;
×
366
        Ok(())
×
367
    }
×
368
}
369

370
/// Parallel Download Processor.
371
///
372
/// Uses multiple connection to speed up downloads, but returns data sequentially
373
/// so it can be processed without needing to store the whole file in memory or disk.
374
#[derive(Clone)]
375
pub(crate) struct ParallelDownloadProcessor(Arc<ParallelDownloadProcessorInner>);
376

377
impl ParallelDownloadProcessor {
378
    /// Creates a new instance of the Parallel Download Processor.
379
    ///
380
    /// Can Fail IF there is no HTTP client provided or the URL does not support getting
381
    /// the content length.
382
    pub(crate) async fn new(url: &str, mut cfg: DlConfig, chain: Network) -> Result<Self> {
×
383
        if cfg.chunk_size < MIN_CHUNK_SIZE {
×
384
            bail!(
×
385
                "Download chunk size must be at least {} bytes",
×
386
                MIN_CHUNK_SIZE
×
387
            );
×
388
        }
×
389
        let file_size = get_content_length_async(url).await?;
×
390

391
        // Get the minimum number of workers we need, just in case the chunk size is bigger than
392
        // the requested workers can process.
393
        cfg.workers = file_size.div_ceil(cfg.chunk_size).min(cfg.workers);
×
394

×
395
        let last_chunk = file_size.div_ceil(cfg.chunk_size);
×
396

×
397
        // Initialize the download statistics
×
398
        let mut bytes_downloaded = Vec::with_capacity(cfg.workers);
×
399
        for _ in 0..cfg.workers {
×
400
            bytes_downloaded.push(AtomicU64::new(0));
×
401
        }
×
402

403
        let new_chunk_queue = crossbeam_channel::unbounded();
×
404

×
405
        let processor = ParallelDownloadProcessor(Arc::new(ParallelDownloadProcessorInner {
×
406
            url: String::from(url),
×
407
            cfg: cfg.clone(),
×
408
            file_size,
×
409
            last_chunk,
×
410
            reorder_queue: DashMap::with_capacity(
×
411
                cfg.workers
×
412
                    .saturating_mul(cfg.queue_ahead)
×
413
                    .saturating_add(1),
×
414
            ),
×
415
            work_queue: DashMap::with_capacity(cfg.workers.saturating_add(1)),
×
416
            new_chunk_queue_rx: new_chunk_queue.1,
×
417
            new_chunk_queue_tx: new_chunk_queue.0,
×
418
            bytes_downloaded,
×
419
            left_over_bytes: Mutex::new(None),
×
420
            next_expected_chunk: AtomicUsize::new(0),
×
421
            next_requested_chunk: AtomicUsize::new(0),
×
422
        }));
×
423

×
424
        processor.start_workers(chain)?;
×
425

426
        Ok(processor)
×
427
    }
×
428

429
    /// Starts the worker tasks, they will not start doing any work until `download` is
430
    /// called, which happens immediately after they are started.
431
    fn start_workers(&self, chain: Network) -> Result<()> {
×
432
        for worker in 0..self.0.cfg.workers {
×
433
            // The channel is unbounded, because work distribution is controlled to be at most
×
434
            // `work_queue` deep per worker. And we don't want anything unexpected to
×
435
            // cause the processor to block.
×
436
            let (work_queue_tx, work_queue_rx) = crossbeam_channel::unbounded::<DlWorkOrder>();
×
437
            let params = self.0.clone();
×
438
            thread::spawn(move || {
×
439
                let worker_name = &format!("{}::{worker}", stats::thread::name::PARALLEL_DL_WORKER);
×
440

×
441
                stats::start_thread(chain, worker_name, false);
×
442
                Self::worker(&params, worker, worker_name, &work_queue_rx, chain);
×
443
                stats::stop_thread(chain, worker_name);
×
444
            });
×
445

×
446
            let _unused = self.0.work_queue.insert(worker, work_queue_tx);
×
447
        }
×
448

449
        self.download()
×
450
    }
×
451

452
    /// Call the work queue receiver.
453
    /// This is a helper function to pause and resume the stats thread.
454
    fn call_work_queue_receiver(
×
455
        chain: Network, worker_name: &str, work_queue: &Receiver<usize>,
×
456
    ) -> Result<usize, RecvError> {
×
457
        stats::pause_thread(chain, worker_name);
×
458
        let recv = work_queue.recv();
×
459
        stats::resume_thread(chain, worker_name);
×
460
        recv
×
461
    }
×
462

463
    /// The worker task - It is running in parallel and downloads chunks of the file as
464
    /// requested.
465
    fn worker(
×
466
        params: &Arc<ParallelDownloadProcessorInner>, worker_id: usize, worker_name: &str,
×
467
        work_queue: &crossbeam_channel::Receiver<DlWorkOrder>, chain: Network,
×
468
    ) {
×
469
        debug!("Worker {worker_id} started");
×
470

471
        // Each worker has its own http_client, so there is no cross worker pathology
472
        // Each worker should be expected to make multiple requests to the same host.
473
        // Resolver should never fail to initialize.  However, if it does, we can;t start the
474
        // worker.
475
        if let Err(error) = BalancingResolver::init(&params.cfg) {
×
476
            error!("Failed to initialize DNS resolver for worker {worker_id}: {error:?}");
×
477
            return;
×
478
        }
×
479
        let http_agent = params.cfg.make_http_agent(worker_id);
×
480

481
        while let Ok(next_chunk) = Self::call_work_queue_receiver(chain, worker_name, work_queue) {
×
482
            // Add a small delay to the first chunks for each worker.
483
            // So that the leading chunks are more likely to finish downloading first.
484
            if next_chunk > 0 && next_chunk < params.cfg.workers {
×
485
                if let Some(delay) = (next_chunk as u64).checked_mul(2) {
×
486
                    thread::sleep(Duration::from_millis(delay));
×
487
                } else {
×
488
                    // This should never happen.
489
                    error!("Next chunk delay overflow");
×
490
                }
491
            }
×
492
            let mut retries = 0u8;
×
493
            let mut block;
494
            // debug!("Worker {worker_id} DL chunk {next_chunk}");
495
            loop {
496
                block = match params.get_range(&http_agent, next_chunk) {
×
497
                    Ok(block) => Some(block),
×
498
                    Err(error) => {
×
499
                        error!("Error getting chunk: {:?}, error: {:?}", next_chunk, error);
×
500
                        None
×
501
                    },
502
                };
503

504
                // Quickly retry on error, in case its transient.
505
                if block.is_some() || retries > 3 {
×
506
                    break;
×
507
                }
×
508
                retries = retries.saturating_add(1);
×
509
            }
510
            // debug!("Worker {worker_id} DL chunk done {next_chunk}: {retries}");
511

512
            if let Some(ref block) = block {
×
513
                if let Some(dl_stat) = params.bytes_downloaded.get(worker_id) {
×
514
                    let this_bytes_downloaded = from_saturating(block.len());
×
515
                    let _last_bytes_downloaded =
×
516
                        dl_stat.fetch_add(this_bytes_downloaded, Ordering::SeqCst);
×
517
                    // debug!("Worker {worker_id} DL chunk {next_chunk}:
×
518
                    // {last_bytes_downloaded} + {this_bytes_downloaded} = {}",
×
519
                    // last_bytes_downloaded+this_bytes_downloaded);
×
520
                } else {
×
521
                    error!("Failed to get bytes downloaded for worker {worker_id}");
×
522
                }
523
            }
×
524

525
            if let Err(error) = params.reorder_queue(DlChunk {
×
526
                worker: worker_id,
×
527
                chunk_num: next_chunk,
×
528
                chunk: block,
×
529
            }) {
×
530
                error!("Error sending chunk: {:?}, error: {:?}", next_chunk, error);
×
531
                break;
×
532
            };
×
533
            // debug!("Worker {worker_id} DL chunk queued {next_chunk}");
534
        }
535
        debug!("Worker {worker_id} ended");
×
536
    }
×
537

538
    /// Send a work order to a worker.
539
    fn send_work_order(&self, this_worker: usize, order: DlWorkOrder) -> Result<usize> {
×
540
        let next_worker = this_worker
×
541
            .checked_add(1)
×
542
            .and_then(|w| w.checked_rem(self.0.cfg.workers))
×
543
            .ok_or_else(|| anyhow!("Unable to calculate next worker: this_worker = {this_worker}, num workers = {}", self.0.cfg.workers))?;
×
544
        if order < self.0.last_chunk {
×
545
            // let params = self.0.clone();
546
            if let Some(worker_queue) = self.0.work_queue.get(&this_worker) {
×
547
                let queue = worker_queue.value();
×
548
                queue.send(order)?;
×
549
            } else {
550
                bail!("Expected a work queue for worker: {:?}", this_worker);
×
551
            }
552
        } else {
553
            // No more work, so remove the work queue from the map.
554
            if let Some((_, work_queue)) = self.0.work_queue.remove(&this_worker) {
×
555
                // Close the work queue, which should terminate the worker.
×
556
                drop(work_queue);
×
557
            }
×
558
        }
559
        Ok(next_worker)
×
560
    }
×
561

562
    /// Starts Downloading the file using parallel connections.
563
    ///
564
    /// Should only be called once on self.
565
    fn download(&self) -> Result<()> {
×
566
        let params = self.0.clone();
×
567
        // Pre fill the work queue with orders.
×
568
        let max_pre_orders = params.cfg.queue_ahead.saturating_mul(params.cfg.workers);
×
569
        let pre_orders = max_pre_orders.min(params.last_chunk);
×
570

×
571
        let mut this_worker: usize = 0;
×
572

573
        // Fill up the pre-orders into the workers queues.
574
        for pre_order in 0..pre_orders {
×
575
            this_worker = self.send_work_order(this_worker, pre_order)?;
×
576
        }
577

578
        params
×
579
            .next_requested_chunk
×
580
            .store(pre_orders, Ordering::SeqCst);
×
581

×
582
        Ok(())
×
583
    }
×
584

585
    /// Get current size of data we downloaded.
586
    pub(crate) fn dl_size(&self) -> u64 {
×
587
        self.0.total_bytes()
×
588
    }
×
589

590
    /// Actual Read function, done like this so we can have a single cleanup on error or
591
    /// EOF.
592
    fn inner_read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
×
593
        // There should only ever be one reader, the purpose of this mutex is to give us
594
        // mutability it should never actually block.
595
        let mut left_over_buffer = self
×
596
            .0
×
597
            .left_over_bytes
×
598
            .lock()
×
599
            .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, format!("{e:?}")))?;
×
600

601
        let (left_over_bytes, offset) =
×
602
            if let Some((left_over_bytes, offset)) = left_over_buffer.take() {
×
603
                (left_over_bytes, offset)
×
604
            } else {
605
                // Get the next chunk and inc the one we would want next.
606
                let next_chunk = self.0.next_expected_chunk.fetch_add(1, Ordering::SeqCst);
×
607

608
                // Wait here until we actually have the next chunk in the reorder queue.
609
                while !self.0.reorder_queue.contains_key(&next_chunk) {
×
610
                    if let Err(error) = self.0.new_chunk_queue_rx.recv() {
×
611
                        return Err(std::io::Error::new(
×
612
                            std::io::ErrorKind::Other,
×
613
                            format!("Next Chunk Queue Error: {error:?}"),
×
614
                        ));
×
615
                    }
×
616
                }
617

618
                let Some((_, chunk)) = self.0.reorder_queue.remove(&next_chunk) else {
×
619
                    return Err(std::io::Error::new(
×
620
                        std::io::ErrorKind::Other,
×
621
                        format!("Expected Chunk {next_chunk} Didn't get any"),
×
622
                    ));
×
623
                };
624

625
                if chunk.chunk_num != next_chunk {
×
626
                    return Err(std::io::Error::new(
×
627
                        std::io::ErrorKind::Other,
×
628
                        format!("Expected Chunk {next_chunk} Got {}", chunk.chunk_num),
×
629
                    ));
×
630
                }
×
631
                let Some(ref block) = chunk.chunk else {
×
632
                    return Ok(0); // EOF
×
633
                };
634

635
                // Got a chunk so lets queue more work from the worker that gave us this block.
636
                // Because we are pre-incrementing here, its possible for this to be > maximum
637
                // chunks and thats OK.
638
                let next_work_order = self.0.next_requested_chunk.fetch_add(1, Ordering::SeqCst);
×
639

640
                // Send more work to the worker that just finished a work order.
641
                // Or Stop the worker if there is no more work they can do.
642
                if let Err(error) = self.send_work_order(chunk.worker, next_work_order) {
×
643
                    return Err(std::io::Error::new(
×
644
                        std::io::ErrorKind::Other,
×
645
                        format!("Failed to send work order to {} : {error:?}", chunk.worker),
×
646
                    ));
×
647
                }
×
648

×
649
                // If this was the last chunk, we can stop all the workers and cleanup.
×
650
                if next_chunk == self.0.last_chunk {
×
651
                    debug!("Last Chunk read from workers. Cleaning Up.");
×
652
                    self.cleanup();
×
653
                }
×
654

655
                (block.to_owned(), 0)
×
656
            };
657

658
        // Send whats leftover or new.
659
        let bytes_left = left_over_bytes.len().checked_sub(offset).ok_or_else(|| {
×
660
            std::io::Error::new(
×
661
                std::io::ErrorKind::Other,
×
662
                format!(
×
663
                    "Invalid left over bytes value: {}, offset = {}",
×
664
                    left_over_bytes.len(),
×
665
                    offset
×
666
                ),
×
667
            )
×
668
        })?;
×
669
        let bytes_to_copy = bytes_left.min(buf.len());
×
670
        let sub_buf = offset
×
671
            .checked_add(bytes_to_copy)
×
672
            .and_then(|upper_bound| left_over_bytes.get(offset..upper_bound))
×
673
            .ok_or_else(|| {
×
674
                std::io::Error::new(std::io::ErrorKind::Other, "Slicing Sub Buffer failed")
×
675
            })?;
×
676
        if let Err(error) = memx::memcpy(buf, sub_buf) {
×
677
            error!(error=?error, "memx::memcpy failed");
×
678
            return Err(std::io::Error::new(
×
679
                std::io::ErrorKind::Other,
×
680
                "memx::memcpy failed",
×
681
            ));
×
682
        }
×
683

×
684
        // Save whats leftover back inside the mutex, if there is anything.
×
685
        if offset.saturating_add(bytes_to_copy) != left_over_bytes.len() {
×
686
            *left_over_buffer = Some((left_over_bytes, offset.saturating_add(bytes_to_copy)));
×
687
        }
×
688

689
        Ok(bytes_to_copy)
×
690
    }
×
691

692
    /// Cleanup workers and queues when done.
693
    fn cleanup(&self) {
×
694
        // Close all the workers left running.
695
        for x in &self.0.work_queue {
×
696
            let worker = x.key();
×
697
            if let Some((_, queue)) = self.0.work_queue.remove(worker) {
×
698
                debug!("Force Closing worker {}", worker);
×
699
                drop(queue);
×
700
            }
×
701
        }
702
    }
×
703
}
704

705
impl Drop for ParallelDownloadProcessor {
706
    fn drop(&mut self) {
×
707
        debug!("ParallelDownloadProcessor::drop");
×
708
        self.cleanup();
×
709
    }
×
710
}
711

712
impl Read for ParallelDownloadProcessor {
713
    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
×
714
        let result = self.inner_read(buf);
×
715
        match result {
×
716
            Ok(0) | Err(_) => {
717
                debug!("Read finished with Error or EOF - Cleaning up.");
×
718
                self.cleanup();
×
719
            },
720
            _ => {},
×
721
        }
722

723
        result
×
724
    }
×
725
}
726

727
/// Send a HEAD request to obtain the length of the file we want to download (necessary
728
/// for calculating the offsets of the chunks)
729
///
730
/// This exists because the `Probe` call made by Mithril is Async, and this makes
731
/// interfacing to that easier.
732
async fn get_content_length_async(url: &str) -> Result<usize> {
×
733
    let url = url.to_owned();
×
734
    match tokio::task::spawn_blocking(move || get_content_length(&url)).await {
×
735
        Ok(result) => result,
×
736
        Err(error) => {
×
737
            error!("get_content_length failed");
×
738
            Err(anyhow::anyhow!("get_content_length failed: {}", error))
×
739
        },
740
    }
741
}
×
742

743
/// Send a HEAD request to obtain the length of the file we want to download (necessary
744
/// for calculating the offsets of the chunks)
745
fn get_content_length(url: &str) -> Result<usize> {
×
746
    let response = ureq::head(url).call()?;
×
747

748
    if response.status() != StatusCode::OK {
×
749
        bail!(
×
750
            "HEAD request did not return a successful response: {}",
×
751
            response.status_text()
×
752
        );
×
753
    }
×
754

755
    if let Some(accept_ranges) = response.header(ACCEPT_RANGES.as_str()) {
×
756
        if accept_ranges != "bytes" {
×
757
            bail!(
×
758
                "Server doesn't support HTTP range byte requests (Accept-Ranges = {})",
×
759
                accept_ranges
×
760
            );
×
761
        }
×
762
    } else {
763
        bail!("Server doesn't support HTTP range requests (missing ACCEPT_RANGES header)");
×
764
    };
765

766
    let content_length = if let Some(content_length) = response.header(CONTENT_LENGTH.as_str()) {
×
767
        let content_length: usize = content_length
×
768
            .parse()
×
769
            .context("Content-Length was not a valid unsigned integer")?;
×
770
        content_length
×
771
    } else {
772
        bail!("HEAD response did not contain a Content-Length header");
×
773
    };
774

775
    Ok(content_length)
×
776
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc