• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5921125008

21 Aug 2023 02:02AM UTC coverage: 74.902% (-1.2%) from 76.06%
5921125008

push

github

web-flow
Wait for connectors to stop on shutdown (#1865)

* Wait for connectors to stop on shutdown

* Fix shutdown of object store connector

* Propagate errors in object store connector

338 of 338 new or added lines in 14 files covered. (100.0%)

46077 of 61516 relevant lines covered (74.9%)

39792.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-cli/src/live/watcher.rs
1
use std::{sync::Arc, time::Duration};
2

3
use crate::shutdown::ShutdownReceiver;
4

5
use super::{state::LiveState, LiveError};
6

7
use dozer_types::log::info;
8
use notify::{RecursiveMode, Watcher};
9
use notify_debouncer_full::new_debouncer;
10
use tokio::runtime::Runtime;
11

12
pub async fn watch(
×
13
    runtime: &Arc<Runtime>,
×
14
    state: Arc<LiveState>,
×
15
    shutdown: ShutdownReceiver,
×
16
) -> Result<(), LiveError> {
×
17
    // setup debouncer
×
18
    let (tx, rx) = std::sync::mpsc::channel();
×
19

20
    let dir: std::path::PathBuf = std::env::current_dir()?;
×
21
    let mut debouncer = new_debouncer(Duration::from_millis(500), None, tx)?;
×
22

×
23
    let watcher = debouncer.watcher();
×
24

×
25
    watcher.watch(dir.as_path(), RecursiveMode::Recursive)?;
×
26

×
27
    debouncer
×
28
        .cache()
×
29
        .add_root(dir.as_path(), RecursiveMode::Recursive);
×
30

×
31
    let running = shutdown.get_running_flag().clone();
×
32
    loop {
×
33
        let event = rx.recv_timeout(Duration::from_millis(100));
×
34
        match event {
×
35
            Ok(result) => match result {
×
36
                Ok(_events) => {
×
37
                    build(runtime.clone(), state.clone()).await;
×
38
                }
×
39
                Err(errors) => errors.iter().for_each(|error| info!("{error:?}")),
×
40
            },
×
41
            Err(e) => {
×
42
                if !running.load(std::sync::atomic::Ordering::Relaxed) {
×
43
                    break;
×
44
                }
×
45
                if e == std::sync::mpsc::RecvTimeoutError::Disconnected {
×
46
                    break;
×
47
                }
×
48
            }
×
49
        }
50
    }
51

52
    Ok(())
×
53
}
×
54

×
55
pub async fn build(runtime: Arc<Runtime>, state: Arc<LiveState>) {
×
56
    state.set_dozer(None).await;
×
57

×
58
    state.broadcast().await;
×
59

×
60
    if let Err(res) = state.build(runtime).await {
×
61
        state.set_error_message(Some(res.to_string())).await;
×
62
    } else {
×
63
        state.set_error_message(None).await;
×
64
    }
65
    state.broadcast().await;
×
66
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc