• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5869050642

pending completion
5869050642

Pull #1858

github

supergi0
updated readme
Pull Request #1858: feat: Implement graph for dozer-live ui

419 of 419 new or added lines in 15 files covered. (100.0%)

46002 of 59761 relevant lines covered (76.98%)

52423.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-cli/src/live/progress.rs
1
use std::{collections::HashMap, ops::Deref, sync::atomic::Ordering, time::Duration};
2

3
use dozer_types::grpc_types::live::{ConnectResponse, Metric, ProgressResponse};
4
use prometheus_parse::Value;
5
use tokio::time::interval;
6

7
use crate::shutdown::ShutdownReceiver;
8

9
use super::LiveError;
10

11
const PROGRESS_POLL_FREQUENCY: u64 = 100;
12
const METRICS_ENDPOINT: &str = "http://localhost:9000/metrics";
13
pub async fn progress_stream(
×
14
    tx: tokio::sync::broadcast::Sender<ConnectResponse>,
×
15
    shutdown_receiver: ShutdownReceiver,
×
16
) -> Result<(), LiveError> {
×
17
    let mut retry_interval = interval(Duration::from_millis(PROGRESS_POLL_FREQUENCY));
×
18

×
19
    let mut progress: HashMap<String, Metric> = HashMap::new();
×
20

21
    loop {
×
22
        if !shutdown_receiver.get_running_flag().load(Ordering::Relaxed) {
×
23
            return Ok(());
×
24
        }
×
25
        let lines = match reqwest::get(METRICS_ENDPOINT).await {
×
26
            Ok(texts) => texts.text().await.map_or(vec![], |body| {
×
27
                body.lines().map(|s| Ok(s.to_owned())).collect()
×
28
            }),
×
29
            Err(e) => {
×
30
                return Err(LiveError::BoxedError(Box::new(e)));
×
31
            }
32
        };
33

34
        if let Ok(metrics) = prometheus_parse::Scrape::parse(lines.into_iter()) {
×
35
            for sample in metrics.samples {
×
36
                if let Value::Counter(count) = sample.value {
×
37
                    progress.insert(
×
38
                        sample.metric,
×
39
                        Metric {
×
40
                            value: count as u32,
×
41
                            labels: sample.labels.deref().clone(),
×
42
                            ts: sample.timestamp.timestamp_millis() as u32,
×
43
                        },
×
44
                    );
×
45
                }
×
46
            }
47

48
            tx.send(ConnectResponse {
×
49
                live: None,
×
50
                progress: Some(ProgressResponse {
×
51
                    progress: progress.clone(),
×
52
                }),
×
53
            })
×
54
            .map_err(|e| LiveError::BoxedError(Box::new(e)))?;
×
55
            retry_interval.tick().await;
×
56
        } else {
57
            retry_interval.tick().await;
×
58
        }
59
    }
60
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc