• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5973865399

25 Aug 2023 09:11AM UTC coverage: 75.4%. First build
5973865399

Pull #1918

github

Jesse-Bakker
Make file watcher non-blocking
Pull Request #1918: Make file watcher non-blocking

20 of 20 new or added lines in 1 file covered. (100.0%)

46983 of 62312 relevant lines covered (75.4%)

47954.47 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-cli/src/live/progress.rs
1
use std::{collections::HashMap, ops::Deref, sync::atomic::Ordering, time::Duration};
2

3
use dozer_types::grpc_types::live::{ConnectResponse, Metric, ProgressResponse};
4
use prometheus_parse::Value;
5
use tokio::time::interval;
6

7
use crate::shutdown::ShutdownReceiver;
8

9
use super::LiveError;
10

11
const PROGRESS_POLL_FREQUENCY: u64 = 100;
12
const METRICS_ENDPOINT: &str = "http://localhost:9000/metrics";
13
pub async fn progress_stream(
×
14
    tx: tokio::sync::broadcast::Sender<ConnectResponse>,
×
15
    shutdown_receiver: ShutdownReceiver,
×
16
) -> Result<(), LiveError> {
×
17
    let mut retry_interval = interval(Duration::from_millis(PROGRESS_POLL_FREQUENCY));
×
18

×
19
    let mut progress: HashMap<String, Metric> = HashMap::new();
×
20

21
    loop {
22
        if !shutdown_receiver.get_running_flag().load(Ordering::Relaxed) {
×
23
            return Ok(());
×
24
        }
×
25
        let text = reqwest::get(METRICS_ENDPOINT)
×
26
            .await?
×
27
            .error_for_status()?
×
28
            .text()
×
29
            .await?;
×
30
        let lines = text.lines().map(|line| Ok(line.to_string()));
×
31

32
        if let Ok(metrics) = prometheus_parse::Scrape::parse(lines) {
×
33
            for sample in metrics.samples {
×
34
                if let Value::Counter(count) = sample.value {
×
35
                    progress.insert(
×
36
                        sample.metric,
×
37
                        Metric {
×
38
                            value: count as u32,
×
39
                            labels: sample.labels.deref().clone(),
×
40
                            ts: sample.timestamp.timestamp_millis() as u32,
×
41
                        },
×
42
                    );
×
43
                }
×
44
            }
45

46
            if tx
×
47
                .send(ConnectResponse {
×
48
                    live: None,
×
49
                    progress: Some(ProgressResponse {
×
50
                        progress: progress.clone(),
×
51
                    }),
×
52
                    build: None,
×
53
                })
×
54
                .is_err()
×
55
            {
56
                // If the receiver is dropped, we're done here.
57
                return Ok(());
×
58
            }
×
59
        }
×
60

61
        retry_interval.tick().await;
×
62
    }
63
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc