• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5974154639

25 Aug 2023 09:17AM UTC coverage: 76.271% (+0.02%) from 76.247%
5974154639

push

github

web-flow
Make file watcher non-blocking (#1918)

20 of 20 new or added lines in 1 file covered. (100.0%)

47189 of 61870 relevant lines covered (76.27%)

48383.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-cli/src/live/watcher.rs
1
use std::{sync::Arc, time::Duration};
2

3
use crate::shutdown::ShutdownReceiver;
4

5
use super::{state::LiveState, LiveError};
6

7
use crate::live::state::BroadcastType;
8
use dozer_types::log::info;
9
use notify::{RecursiveMode, Watcher};
10
use notify_debouncer_full::new_debouncer;
11
use tokio::{runtime::Runtime, select};
12

13
pub async fn watch(
×
14
    runtime: &Arc<Runtime>,
×
15
    state: Arc<LiveState>,
×
16
    shutdown: ShutdownReceiver,
×
17
) -> Result<(), LiveError> {
×
18
    // setup debouncer
×
19
    let (tx, rx) = std::sync::mpsc::channel();
×
20

21
    let dir: std::path::PathBuf = std::env::current_dir()?;
×
22
    let mut debouncer = new_debouncer(Duration::from_millis(500), None, tx)?;
×
23
    debouncer
×
24
        .cache()
×
25
        .add_root(dir.as_path(), RecursiveMode::Recursive);
×
26
    let watcher = debouncer.watcher();
×
27

×
28
    watcher.watch(dir.as_path(), RecursiveMode::NonRecursive)?;
×
29

30
    let additional_paths = vec![dir.join("sql")];
×
31

×
32
    for path in additional_paths {
×
33
        let _ = watcher.watch(path.as_path(), RecursiveMode::NonRecursive);
×
34
    }
×
35

×
36
    let (async_sender, mut async_receiver) = tokio::sync::mpsc::channel(10);
×
37

×
38
    // Thread that adapts the sync watcher channel to an async channel
×
39
    let adapter = runtime.spawn_blocking(move || loop {
×
40
        let res = rx.recv();
×
41
        let Ok(msg) = res else {
×
42
            break;
×
43
        };
×
44
        let _ = async_sender.blocking_send(msg);
×
45
    });
×
46

×
47
    loop {
48
        select! {
×
49
            Some(msg) = async_receiver.recv() => match msg {
×
50
                Ok(_events) => {
×
51
                    build(runtime.clone(), state.clone()).await;
×
52
                }
×
53
                Err(errors) => errors.iter().for_each(|error| info!("{error:?}")),
×
54
            },
×
55
            // We are shutting down
56
            _ = shutdown.create_shutdown_future() => break,
57
            // The watcher quit
58
            else => break
59
        }
×
60
    }
×
61

62
    // Drop the channels that may keep the adapter thread alive
×
63
    drop(async_receiver);
×
64
    drop(debouncer);
×
65

×
66
    let _ = adapter.await;
×
67

68
    Ok(())
×
69
}
×
70

×
71
async fn build(runtime: Arc<Runtime>, state: Arc<LiveState>) {
×
72
    state.broadcast(BroadcastType::Start).await;
×
73
    if let Err(res) = state.build(runtime).await {
×
74
        let message = res.to_string();
×
75
        state.broadcast(BroadcastType::Failed(message)).await;
×
76
    } else {
77
        state.broadcast(BroadcastType::Success).await;
×
78
    }
79
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc