• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ensc / r-tftpd / 15401197386

02 Jun 2025 07:37PM UTC coverage: 77.043%. First build
15401197386

Pull #14

github

web-flow
Merge 3873a9996 into f4721109a
Pull Request #14: CI: update github workflows

2 of 6 new or added lines in 4 files covered. (33.33%)

1923 of 2496 relevant lines covered (77.04%)

350.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.07
/mod-proxy/src/cache.rs
1
use std::{os::unix::prelude::AsRawFd, time::Instant};
2
use std::sync::Arc;
3
use tokio::sync::RwLock;
4
use std::collections::HashMap;
5
// use chrono::{ NaiveDateTime, Utc };
6
use std::time::Duration;
7

8
use super::http;
9
use http::Time;
10

11
use crate::{ Result, Error };
12
use crate::util::pretty_dump_wrap as pretty;
13

14
const READ_TIMEOUT:    std::time::Duration = std::time::Duration::from_secs(30);
15
const CONNECT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
16
const KEEPALIVE:       std::time::Duration = std::time::Duration::from_secs(300);
17

18
lazy_static::lazy_static!{
19
    static ref CACHE: std::sync::RwLock<CacheImpl> = std::sync::RwLock::new(CacheImpl::new());
20
}
21

22
#[derive(Clone, Copy, Debug, Default)]
23
struct Stats {
24
    pub tm:                Duration,
25
}
26

27
impl Stats {
28
    pub async fn chunk(&mut self, response: &mut reqwest::Response) -> reqwest::Result<Option<bytes::Bytes>>
232✔
29
    {
232✔
30
        let start = std::time::Instant::now();
232✔
31
        let chunk = response.chunk().await;
232✔
32
        self.tm += start.elapsed();
232✔
33

34
        chunk
232✔
35
    }
232✔
36
}
37

38
impl crate::util::PrettyDump for Stats {
39
    fn pretty_dump(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
×
40
        f.write_fmt(format_args!("{:.3}s", self.tm.as_secs_f32()))
×
41
    }
×
42
}
43

44
#[derive(Debug)]
45
enum State {
46
    None,
47

48
    Error(&'static str),
49

50
    Init {
51
        response:        reqwest::Response,
52
    },
53

54
    HaveMeta {
55
        response:        reqwest::Response,
56
        cache_info:        http::CacheInfo,
57
        file_size:        Option<u64>,
58
        stats:                Stats,
59
    },
60

61
    Downloading {
62
        response:        reqwest::Response,
63
        cache_info:        http::CacheInfo,
64
        file_size:        Option<u64>,
65
        file:                std::fs::File,
66
        file_pos:        u64,
67
        stats:                Stats,
68
    },
69

70
    Complete {
71
        cache_info:        http::CacheInfo,
72
        file:                std::fs::File,
73
        file_size:        u64,
74
    },
75

76
    Refresh {
77
        response:        reqwest::Response,
78
        cache_info:        http::CacheInfo,
79
        file:                std::fs::File,
80
        file_size:        u64,
81
    },
82
}
83

84
impl crate::util::PrettyDump for State {
85
    fn pretty_dump(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
472✔
86
        match self {
472✔
87
            State::None =>
88
                f.write_str("no state"),
×
89
            State::Error(e) =>
×
NEW
90
                f.write_fmt(format_args!("error {e:?}")),
×
91
            State::Init { response } =>
×
92
                f.write_fmt(format_args!("INIT({})", pretty(response))),
×
93
            State::HaveMeta { response, cache_info, file_size, stats } =>
×
94
                f.write_fmt(format_args!("META({}, {}, {}, {})",
×
95
                                         pretty(response), pretty(cache_info),
×
96
                                         pretty(file_size), pretty(stats))),
×
97
            State::Downloading { response, cache_info, file_size, file, file_pos, stats } =>
×
98
                f.write_fmt(format_args!("DOWNLOADING({}, {}, {}, {}@{}, {})",
×
99
                                         pretty(response), pretty(cache_info),
×
100
                                         pretty(file_size), pretty(file),
×
101
                                         file_pos, pretty(stats))),
×
102
            State::Complete { cache_info, file, file_size } =>
472✔
103
                f.write_fmt(format_args!("COMPLETE({}, {}/{})",
472✔
104
                                         pretty(cache_info), pretty(file), file_size)),
472✔
105
            State::Refresh { response, cache_info, file, file_size } =>
×
106
                f.write_fmt(format_args!("REFRESH({},.{}, [{}/{}])",
×
107
                                         pretty(response), pretty(cache_info), pretty(file),
×
108
                                         file_size)),
109
        }
110
    }
472✔
111
}
112

113
impl State {
114
    pub fn take(&mut self, hint: &'static str) -> Self {
448✔
115
        std::mem::replace(self, State::Error(hint))
448✔
116
    }
448✔
117

118
    pub fn is_none(&self) -> bool {
×
119
        matches!(self, Self::None)
×
120
    }
×
121

122
    pub fn is_init(&self) -> bool {
1,398✔
123
        matches!(self, Self::Init { .. })
1,398✔
124
    }
1,398✔
125

126
    pub fn is_error(&self) -> bool {
108✔
127
        matches!(self, Self::Error(_))
108✔
128
    }
108✔
129

130
    pub fn is_refresh(&self) -> bool {
×
131
        matches!(self, Self::Refresh { .. })
×
132
    }
×
133

134
    pub fn is_have_meta(&self) -> bool {
116✔
135
        matches!(self, Self::HaveMeta { .. })
116✔
136
    }
116✔
137

138
    pub fn is_downloading(&self) -> bool {
116✔
139
        matches!(self, Self::Downloading { .. })
116✔
140
    }
116✔
141

142
    pub fn is_complete(&self) -> bool {
×
143
        matches!(self, Self::Complete { .. })
×
144
    }
×
145

146
    pub fn get_file_size(&self) -> Option<u64> {
108✔
147
        match self {
108✔
148
            Self::None |
149
            Self::Init { .. }        => None,
×
150

151
            Self::HaveMeta { file_size, .. }        => *file_size,
108✔
152
            Self::Downloading { file_size, .. }        => *file_size,
×
153

154
            Self::Complete { file_size, .. }        => Some(*file_size),
×
155
            Self::Refresh { file_size, .. }        => Some(*file_size),
×
156

157
            Self::Error(hint)        => panic!("get_file_size called in error state ({hint})"),
×
158
        }
159
    }
108✔
160

161
    pub fn get_cache_info(&self) -> Option<&http::CacheInfo> {
116✔
162
        match self {
116✔
163
            Self::None |
164
            Self::Error(_) |
165
            Self::Init { .. }        => None,
100✔
166

167
            Self::HaveMeta { cache_info, .. } |
×
168
            Self::Downloading { cache_info, .. } |
×
169
            Self::Complete { cache_info, .. } |
16✔
170
            Self::Refresh { cache_info, .. }        => Some(cache_info),
16✔
171
        }
172
    }
116✔
173

174
    fn read_file(file: &std::fs::File, ofs: u64, buf: &mut [u8], max: u64) -> Result<usize> {
1,058✔
175
        use nix::libc;
176

177
        assert!(max > ofs);
1,058✔
178

179
        let len = (buf.len() as u64).min(max - ofs) as usize;
1,058✔
180
        let buf_ptr = buf.as_mut_ptr() as *mut libc::c_void;
1,058✔
181

182
        // TODO: this would be nice, but does not work because we can not get
183
        // a mutable reference to 'file'
184
        //file.flush()?;
185

186
        let rc = unsafe { libc::pread(file.as_raw_fd(), buf_ptr, len, ofs as i64) };
1,058✔
187

188
        if rc < 0 {
1,058✔
189
            return Err(std::io::Error::last_os_error().into());
×
190
        }
1,058✔
191

192
        Ok(len)
1,058✔
193
    }
1,058✔
194

195
    pub fn read(&self, ofs: u64, buf: &mut [u8]) -> Result<Option<usize>> {
1,290✔
196
        match &self {
1,182✔
197
            State::Downloading { file, file_pos, .. } if ofs < *file_pos        => {
1,182✔
198
                Self::read_file(file, ofs, buf, *file_pos)
1,058✔
199
            },
200

201
            State::Complete { file, file_size, .. } if ofs < *file_size                => {
×
202
                Self::read_file(file, ofs, buf, *file_size)
×
203
            }
204

205
            State::Complete { file_size, .. } if ofs == *file_size        => Ok(0),
×
206

207
            State::Complete { file_size, .. } if ofs >= *file_size        =>
×
208
                Err(Error::Internal("file out-of-bound read")),
×
209

210
            _        => return Ok(None)
232✔
211
        }.map(Some)
1,058✔
212
    }
1,290✔
213

214
    pub fn is_outdated(&self, reftm: Instant, max_lt: Duration) -> bool {
×
215
        match self.get_cache_info() {
×
216
            None        => true,
×
217
            Some(info)        => info.is_outdated(reftm, max_lt),
×
218
        }
219
    }
×
220
}
221

222
#[derive(Debug)]
223
pub struct EntryData {
224
    pub key:                url::Url,
225
    state:                State,
226
    reftm:                Time,
227
}
228

229
impl std::fmt::Display for EntryData {
230
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
472✔
231
        f.write_fmt(format_args!("{}: reftm={}, state={}", self.key,
472✔
232
                    pretty(&self.reftm.local), pretty(&self.state)))
472✔
233
    }
472✔
234
}
235

236
impl EntryData {
237
    pub fn new(url: &url::Url) -> Self {
100✔
238
        Self {
100✔
239
            key:                url.clone(),
100✔
240
            state:                State::None,
100✔
241
            reftm:                Time::now(),
100✔
242
        }
100✔
243
    }
100✔
244

245
    pub fn is_complete(&self) -> bool {
×
246
        self.state.is_complete()
×
247
    }
×
248

249
    pub fn is_error(&self) -> bool {
108✔
250
        self.state.is_error()
108✔
251
    }
108✔
252

253
    pub fn is_running(&self) -> bool {
116✔
254
        self.state.is_have_meta() || self.state.is_downloading()
116✔
255
    }
116✔
256

257
    pub fn update_localtm(&mut self) {
116✔
258
        self.reftm = Time::now();
116✔
259
    }
116✔
260

261
    pub fn set_response(&mut self, response: reqwest::Response) {
108✔
262
        self.state = match self.state.take("set_respone") {
108✔
263
            State::None |
264
            State::Error(_)        => State::Init { response },
108✔
265

266
            State::Complete { cache_info, file, file_size } |
×
267
            State::Refresh { cache_info, file, file_size, .. } => State::Refresh {
×
268
                cache_info:        cache_info,
×
269
                file:                file,
×
270
                file_size:        file_size,
×
271
                response:        response,
×
272
            },
×
273

274
            s                        => panic!("unexpected state {s:?}"),
×
275
        }
276
    }
108✔
277

278
    pub fn is_outdated(&self, reftm: Instant, max_lt: Duration) -> bool {
×
279
        self.state.is_outdated(reftm, max_lt)
×
280
    }
×
281

282
    pub fn get_cache_info(&self) -> Option<&http::CacheInfo> {
×
283
        self.state.get_cache_info()
×
284
    }
×
285

286
    pub async fn fill_meta(&mut self) -> Result<()> {
108✔
287
        if !self.state.is_init() && !self.state.is_none() && !self.state.is_refresh() {
108✔
288
            return Ok(());
×
289
        }
108✔
290

291
        self.state = match self.state.take("fill_meta") {
108✔
292
            State::None                        => panic!("unexpected state"),
×
293

294
            State::Init{ response }        => {
108✔
295
                let hdrs = response.headers();
108✔
296

297
                State::HaveMeta {
298
                    cache_info:        http::CacheInfo::new(self.reftm, hdrs)?,
108✔
299
                    file_size:        response.content_length(),
108✔
300
                    response:        response,
108✔
301
                    stats:        Stats::default(),
108✔
302
                }
303
            },
304

305
            State::Refresh { file, file_size, response, cache_info }        => {
×
306
                let hdrs = response.headers();
×
307

308
                State::Complete {
309
                    cache_info:        cache_info.update(self.reftm, hdrs)?,
×
310
                    file:        file,
×
311
                    file_size:        file_size,
×
312
                }
313
            },
314

315
            _                                => unreachable!(),
×
316
        };
317

318
        Ok(())
108✔
319
    }
108✔
320

321
    fn signal_complete(&self, stats: Stats) {
232✔
322
        if let State::Complete { file_size, .. } = self.state {
232✔
323
            info!("downloaded {} with {} bytes in {}ms", self.key, file_size, stats.tm.as_millis());
108✔
324
        }
124✔
325
    }
232✔
326

327
    #[instrument(level = "trace")]
328
    pub async fn get_filesize(&mut self) -> Result<u64> {
329
        use std::io::Write;
330

331
        if let Some(sz) = self.state.get_file_size() {
332
            return Ok(sz);
333
        }
334

335
        match self.state.take("get_filesize") {
336
            State::HaveMeta { mut response, file_size: None, mut stats, cache_info }        => {
337
                let mut file = Cache::new_file()?;
338
                let mut pos = 0;
339

340

341
                while let Some(chunk) = stats.chunk(&mut response).await? {
342
                    pos += chunk.len() as u64;
343
                    file.write_all(&chunk)?;
344
                }
345

346
                self.state = State::Complete {
347
                    file:        file,
348
                    file_size:        pos,
349
                    cache_info:        cache_info,
350
                };
351

352
                self.signal_complete(stats);
353

354
                Ok(pos)
355
            },
356

357
            State::Downloading { mut response, mut file, file_pos, file_size: None, mut stats, cache_info } => {
358
                let mut pos = file_pos;
359

360
                while let Some(chunk) = stats.chunk(&mut response).await? {
361
                    pos += chunk.len() as u64;
362
                    file.write_all(&chunk)?;
363
                }
364

365
                self.state = State::Complete {
366
                    file:        file,
367
                    file_size:        pos,
368
                    cache_info:        cache_info,
369
                };
370

371
                self.signal_complete(stats);
372

373
                Ok(pos)
374
            }
375

376
            s                => panic!("unexpected state: {s:?}"),
377
        }
378
    }
379

380
    pub fn fill_request(&self, req: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
116✔
381
        match self.state.get_cache_info() {
116✔
382
            Some(info)        => info.fill_request(self.reftm.mono, req),
16✔
383
            None        => req,
100✔
384
        }
385
    }
116✔
386

387
    pub fn matches(&self, etag: Option<&str>) -> bool {
×
388
        let cache_info = self.state.get_cache_info();
×
389

390
        match cache_info.and_then(|c| c.not_after) {
×
391
            Some(t) if t < Time::now().mono                => return false,
×
392
            _                                                => {},
×
393
        }
394

395
        let self_etag = match cache_info {
×
396
            Some(c)        => c.etag.as_ref(),
×
397
            None        => None,
×
398
        };
399

400
        match (self_etag, etag) {
×
401
            (Some(a), Some(b)) if a == b                => {},
×
402
            (None, None)                                => {},
×
403
            _                                                => return false,
×
404
        }
405

406
        true
×
407
    }
×
408

409
    pub fn invalidate(&mut self)
108✔
410
    {
411
        match &self.state {
108✔
412
            State::Refresh { .. }        => self.state = State::None,
×
413
            State::Complete { .. }        => self.state = State::None,
16✔
414
            _                                => {},
92✔
415
        }
416
    }
108✔
417

418
    pub async fn read_some(&mut self, ofs: u64, buf: &mut [u8]) -> Result<usize>
1,290✔
419
    {
1,290✔
420
        use std::io::Write;
421

422
        trace!("state={:?}, ofs={}, #buf={}", self.state, ofs, buf.len());
1,290✔
423

424
        async fn fetch(response: &mut reqwest::Response, file: &mut std::fs::File,
232✔
425
                       buf: &mut [u8], stats: &mut Stats) -> Result<(usize, usize)> {
232✔
426
            match stats.chunk(response).await? {
232✔
427
                Some(data)        => {
124✔
428
                    let len = buf.len().min(data.len());
124✔
429

430
                    buf[0..len].clone_from_slice(&data.as_ref()[0..len]);
124✔
431
                    file.write_all(&data)?;
124✔
432

433
                    // TODO: it would be better to do this in State::read_file()
434
                    file.flush()?;
124✔
435

436
                    Ok((len, data.len()))
124✔
437
                },
438

439
                None                => Ok((0, 0))
108✔
440
            }
441
        }
232✔
442

443
        if self.state.is_init() {
1,290✔
444
            self.fill_meta().await?;
×
445
        }
1,290✔
446

447
        if let Some(sz) = self.state.read(ofs, buf)? {
1,290✔
448
            return Ok(sz);
1,058✔
449
        }
232✔
450

451
        match self.state.take("read_some") {
232✔
452
            State::HaveMeta { mut response, cache_info, file_size, mut stats }        => {
108✔
453
                let mut file = Cache::new_file()?;
108✔
454

455
                let res = fetch(&mut response, &mut file, buf, &mut stats).await?;
108✔
456

457
                self.state = match res {
108✔
458
                    (_, 0)        => State::Complete {
4✔
459
                        cache_info:        cache_info,
4✔
460
                        file:                file,
4✔
461
                        file_size:        0,
4✔
462
                    },
4✔
463

464
                    (_, sz)        => State::Downloading {
104✔
465
                        response:        response,
104✔
466
                        cache_info:        cache_info,
104✔
467
                        file_size:        file_size,
104✔
468
                        file:                file,
104✔
469
                        file_pos:        sz as u64,
104✔
470
                        stats:                stats,
104✔
471
                    }
104✔
472
                };
473

474
                self.signal_complete(stats);
108✔
475

476
                Ok(res.0)
108✔
477
            },
478

479
            // catched by 'self.state.read()' above
480
            State::Downloading { file_pos, .. } if ofs < file_pos        => unreachable!(),
124✔
481

482
            State::Downloading { mut response, cache_info, file_size, mut file, file_pos, mut stats } => {
124✔
483
                let res = fetch(&mut response, &mut file, buf, &mut stats).await?;
124✔
484

485
                self.state = match res {
124✔
486
                    (_, 0)        => State::Complete {
104✔
487
                        cache_info:        cache_info,
104✔
488
                        file:                file,
104✔
489
                        file_size:        file_pos,
104✔
490
                    },
104✔
491

492
                    (_, sz)        => State::Downloading {
20✔
493
                        response:        response,
20✔
494
                        cache_info:        cache_info,
20✔
495
                        file_size:        file_size,
20✔
496
                        file:                file,
20✔
497
                        file_pos:        file_pos + (sz as u64),
20✔
498
                        stats:                stats,
20✔
499
                    }
20✔
500
                };
501

502
                self.signal_complete(stats);
124✔
503

504
                Ok(res.0)
124✔
505
            }
506

507
            s                => panic!("unexpected state: {s:?}"),
×
508
        }
509

510
    }
1,290✔
511
}
512

513
pub type Entry = Arc<RwLock<EntryData>>;
514

515
struct CacheImpl {
516
    tmpdir:        std::path::PathBuf,
517
    entries:        HashMap<url::Url, Entry>,
518
    client:        Arc<reqwest::Client>,
519
    refcnt:        u32,
520

521
    abort_ch:        Option<tokio::sync::watch::Sender<()>>,
522
    gc:                Option<tokio::task::JoinHandle<()>>,
523
}
524

525
pub enum LookupResult {
526
    Found(Entry),
527
    Missing,
528
}
529

530
impl CacheImpl {
531
    fn new() -> Self {
1✔
532
        let client = reqwest::Client::builder()
1✔
533
            .read_timeout(READ_TIMEOUT)
1✔
534
            .connect_timeout(CONNECT_TIMEOUT)
1✔
535
            .tcp_keepalive(KEEPALIVE)
1✔
536
            .build()
1✔
537
            .unwrap();
1✔
538

539
        Self {
1✔
540
            tmpdir:        std::env::temp_dir(),
1✔
541
            entries:        HashMap::new(),
1✔
542
            client:        Arc::new(client),
1✔
543
            abort_ch:        None,
1✔
544
            refcnt:        0,
1✔
545
            gc:                None,
1✔
546
        }
1✔
547
    }
1✔
548

549
    pub fn is_empty(&self) -> bool {
2✔
550
        self.entries.is_empty()
2✔
551
    }
2✔
552

553
    pub fn clear(&mut self) {
12✔
554
        self.entries.clear();
12✔
555
    }
12✔
556

557
    pub fn get_client(&self) -> Arc<reqwest::Client> {
116✔
558
        self.client.clone()
116✔
559
    }
116✔
560

561
    pub fn lookup_or_create(&mut self, key: &url::Url) -> Entry {
44✔
562
        match self.entries.get(key) {
44✔
563
            Some(v)        => v.clone(),
16✔
564
            None        => self.create(key),
28✔
565
        }
566
    }
44✔
567

568
    pub fn create(&mut self, key: &url::Url) -> Entry {
100✔
569
        Entry::new(RwLock::new(EntryData::new(key)))
100✔
570
    }
100✔
571

572
    pub fn replace(&mut self, key: &url::Url, entry: &Entry) {
108✔
573
        self.entries.insert(key.clone(), entry.clone());
108✔
574
    }
108✔
575

576
    pub fn remove(&mut self, key: &url::Url) {
×
577
        self.entries.remove(key);
×
578
    }
×
579

580
    /// Removes the `num` oldest cache entries
581
    pub fn gc_oldest(&mut self, mut num: usize) {
×
582
        if num == 0 {
×
583
            return;
×
584
        }
×
585

586
        let mut tmp = Vec::with_capacity(self.entries.len());
×
587

588
        for (key, e) in &self.entries {
×
589
            let entry = match e.try_read() {
×
590
                Ok(e)        => e,
×
591
                _        => continue,
×
592
            };
593

594
            tmp.push((key.clone(), entry.get_cache_info().map(|c| c.local_time)));
×
595
        }
596

597
        tmp.sort_by(|(_, tm_a), (_, tm_b)| tm_a.cmp(tm_b));
×
598

599
        let mut rm_cnt = 0;
×
600

601
        for (key, _) in tmp {
×
602
            if num == 0 {
×
603
                break;
×
604
            }
×
605

606
            debug!("gc: removing old {}", key);
×
607
            self.entries.remove(&key);
×
608
            num -= 1;
×
609
            rm_cnt += 1;
×
610
        }
611

612
        if rm_cnt > 0 {
×
613
            info!("gc: removed {} old entries", rm_cnt);
×
614
        }
×
615
    }
×
616

617
    /// Removes cache entries which are older than `max_lt`.
618
    ///
619
    /// Returns the number of remaining cache entries.
620
    pub fn gc_outdated(&mut self, max_lt: Duration) -> usize {
×
621
        let now = Instant::now();
×
622
        let mut outdated = Vec::new();
×
623
        let mut cnt = 0;
×
624

625
        for (key, e) in &self.entries {
×
626
            match e.try_read().map(|v| v.is_outdated(now, max_lt)) {
×
627
                Ok(true)        => outdated.push(key.clone()),
×
628
                _                => cnt += 1,
×
629
            }
630
        }
631

632
        let rm_cnt = outdated.len();
×
633

634
        for e in outdated {
×
635
            debug!("gc: removing outdated {}", e);
×
636
            self.entries.remove(&e);
×
637
        }
638

639
        if rm_cnt > 0 {
×
640
            info!("gc: removed {} obsolete entries", rm_cnt);
×
641
        }
×
642

643
        cnt
×
644
    }
×
645
}
646

647
#[derive(Debug)]
648
pub struct GcProperties {
649
    pub max_elements:        usize,
650
    pub max_lifetime:        Duration,
651
    pub sleep:                Duration,
652
}
653

654
async fn gc_runner(props: GcProperties, mut abort_ch: tokio::sync::watch::Receiver<()>) {
2✔
655
    const RETRY_DELAY: std::time::Duration = std::time::Duration::from_secs(1);
656

657
    loop {
658
        use std::sync::TryLockError;
659

660
        let sleep = {
2✔
661
            let cache = CACHE.try_write();
2✔
662

663
            match cache {
2✔
664
                Ok(mut cache) if !cache.is_empty()        => {
2✔
665
                    let cache_cnt = cache.gc_outdated(props.max_lifetime);
×
666

667
                    if cache_cnt > props.max_elements {
×
668
                        cache.gc_oldest(props.max_elements - cache_cnt)
×
669
                    }
×
670

671
                    props.sleep
×
672
                }
673
                Ok(_)                                => props.sleep,
2✔
674
                Err(TryLockError::WouldBlock)        => RETRY_DELAY,
×
675
                Err(e)                                => {
×
676
                    error!("cache gc failed with {:?}", e);
×
677
                    break;
×
678
                }
679
            }
680
        };
681

682
        if tokio::time::timeout(sleep, abort_ch.changed()).await.is_ok() {
2✔
683
            debug!("cache gc runner gracefully closed");
2✔
684
            break;
2✔
685
        }
×
686
    }
687
}
2✔
688

689
pub struct Cache();
690

691
impl Cache {
692
    #[instrument(level = "trace")]
693
    pub fn instanciate(tmpdir: &std::path::Path, props: GcProperties) {
694
        let mut cache = CACHE.write().unwrap();
695

696
        if cache.refcnt == 0 {
697
            let (tx, rx) = tokio::sync::watch::channel(());
698

699
            cache.tmpdir = tmpdir.into();
700
            cache.abort_ch = Some(tx);
701

702
            cache.gc = Some(tokio::task::spawn(gc_runner(props, rx)));
703
        }
704

705
        cache.refcnt += 1;
706
    }
707

708
    #[instrument(level = "trace")]
709
    // https://github.com/rust-lang/rust-clippy/issues/6446
710
    #[allow(clippy::await_holding_lock)]
711
    pub async fn close() {
712
        let mut cache = CACHE.write().unwrap();
713

714
        assert!(cache.refcnt > 0);
715

716
        cache.refcnt -= 1;
717

718
        if cache.refcnt == 0 {
719
            cache.entries.clear();
720

721
            let abort_ch = cache.abort_ch.take().unwrap();
722
            let gc = cache.gc.take().unwrap();
723

724
            drop(cache);
725

726
            abort_ch.send(()).unwrap();
727
            gc.await.unwrap();
728
        }
729
    }
730

731
    #[instrument(level = "trace", ret)]
732
    pub fn lookup_or_create(key: &url::Url) -> Entry {
733
        let mut cache = CACHE.write().unwrap();
734

735
        cache.lookup_or_create(key)
736
    }
737

738
    #[instrument(level = "trace", ret)]
739
    pub fn create(key: &url::Url) -> Entry {
740
        let mut cache = CACHE.write().unwrap();
741

742
        cache.create(key)
743
    }
744

745
    #[instrument(level = "trace")]
746
    pub fn replace(key: &url::Url, entry: &Entry) {
747
        let mut cache = CACHE.write().unwrap();
748

749
        cache.replace(key, entry)
750
    }
751

752
    #[instrument(level = "trace")]
753
    pub fn remove(key: &url::Url) {
754
        let mut cache = CACHE.write().unwrap();
755

756
        cache.remove(key)
757
    }
758

759
    pub fn get_client() -> Arc<reqwest::Client> {
116✔
760
        let cache = CACHE.read().unwrap();
116✔
761

762
        cache.get_client()
116✔
763
    }
116✔
764

765
    pub fn new_file() -> Result<std::fs::File> {
108✔
766
        let cache = CACHE.read().unwrap();
108✔
767

768
        Ok(tempfile::tempfile_in(&cache.tmpdir)?)
108✔
769
    }
108✔
770

771
    pub async fn dump() {
184✔
772
        let mut entries = Vec::new();
184✔
773

774
        {
184✔
775
            let cache = CACHE.read().unwrap();
184✔
776

184✔
777
            entries.reserve(cache.entries.len());
184✔
778
            entries.extend(cache.entries.values().cloned());
184✔
779
        }
184✔
780

781
        println!("Cache information ({} entries)", entries.len());
184✔
782

783
        for e in entries {
656✔
784
            println!("{}", e.read().await);
472✔
785
        }
786
    }
184✔
787

788
    pub async fn clear() {
12✔
789
        let mut cache = CACHE.write().unwrap();
12✔
790

791
        cache.clear();
12✔
792
    }
12✔
793
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc