• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5888798292

17 Aug 2023 08:51AM UTC coverage: 76.025% (-1.4%) from 77.415%
5888798292

push

github

web-flow
feat: implement graph on live ui (#1847)

* feat: implement progress

* feat: implement enable progress flag

* feat: implement progress in live

* chore: fix clippy

* chore: always use telemetry metrics

* fix: Only run build once

---------

Co-authored-by: sagar <sagar@getdozer.io>
Co-authored-by: chubei <914745487@qq.com>

536 of 536 new or added lines in 21 files covered. (100.0%)

46101 of 60639 relevant lines covered (76.03%)

40410.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-cli/src/live/progress.rs
1
use std::{collections::HashMap, ops::Deref, sync::atomic::Ordering, time::Duration};
2

3
use dozer_types::grpc_types::live::{ConnectResponse, Metric, ProgressResponse};
4
use prometheus_parse::Value;
5
use tokio::time::interval;
6

7
use crate::shutdown::ShutdownReceiver;
8

9
use super::LiveError;
10

11
const PROGRESS_POLL_FREQUENCY: u64 = 100;
12
const METRICS_ENDPOINT: &str = "http://localhost:9000/metrics";
13
pub async fn progress_stream(
×
14
    tx: tokio::sync::broadcast::Sender<ConnectResponse>,
×
15
    shutdown_receiver: ShutdownReceiver,
×
16
) -> Result<(), LiveError> {
×
17
    let mut retry_interval = interval(Duration::from_millis(PROGRESS_POLL_FREQUENCY));
×
18

×
19
    let mut progress: HashMap<String, Metric> = HashMap::new();
×
20

21
    loop {
22
        if !shutdown_receiver.get_running_flag().load(Ordering::Relaxed) {
×
23
            return Ok(());
×
24
        }
×
25
        let text = reqwest::get(METRICS_ENDPOINT)
×
26
            .await?
×
27
            .error_for_status()?
×
28
            .text()
×
29
            .await?;
×
30
        let lines = text.lines().map(|line| Ok(line.to_string()));
×
31

32
        if let Ok(metrics) = prometheus_parse::Scrape::parse(lines) {
×
33
            for sample in metrics.samples {
×
34
                if let Value::Counter(count) = sample.value {
×
35
                    progress.insert(
×
36
                        sample.metric,
×
37
                        Metric {
×
38
                            value: count as u32,
×
39
                            labels: sample.labels.deref().clone(),
×
40
                            ts: sample.timestamp.timestamp_millis() as u32,
×
41
                        },
×
42
                    );
×
43
                }
×
44
            }
45

46
            if tx
×
47
                .send(ConnectResponse {
×
48
                    live: None,
×
49
                    progress: Some(ProgressResponse {
×
50
                        progress: progress.clone(),
×
51
                    }),
×
52
                })
×
53
                .is_err()
×
54
            {
55
                // If the receiver is dropped, we're done here.
56
                return Ok(());
×
57
            }
×
58
        }
×
59

60
        retry_interval.tick().await;
×
61
    }
62
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc