• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4283961027

pending completion
4283961027

push

github

GitHub
feat: Blue green cache (#1061)

645 of 645 new or added lines in 45 files covered. (100.0%)

27779 of 39307 relevant lines covered (70.67%)

52489.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-orchestrator/src/cli/repl/sql.rs
1
use crate::errors::CliError;
2
use crate::utils::get_sql_history_path;
3
use std::collections::HashMap;
4

5
use std::io::{stdout, Stdout};
6
use std::sync::atomic::AtomicBool;
7
use std::sync::Arc;
8
use std::time::{Duration, Instant};
9

10
use crate::cli::{init_dozer, load_config};
11
use crate::errors::OrchestrationError;
12
use crossterm::{cursor, terminal, ExecutableCommand};
13
use dozer_cache::cache::index::get_primary_key;
14
use dozer_types::crossbeam::channel;
15
use dozer_types::log::{debug, error, info};
16
use dozer_types::prettytable::color;
17
use dozer_types::prettytable::{Cell, Row, Table};
18
use dozer_types::types::{Field, Operation, Schema};
19
use rustyline::error::ReadlineError;
20

21
use crate::Orchestrator;
22
use rustyline::validate::{ValidationContext, ValidationResult, Validator};
23
use rustyline_derive::{Completer, Helper, Highlighter, Hinter};
24

25
const HELP: &str = r#"
26
Enter your SQL below. 
27
Use semicolon at the end to run the query or Ctrl-C to cancel. 
28

29
"#;
30
#[derive(Completer, Helper, Highlighter, Hinter)]
31
struct SqlValidator {}
32

33
impl Validator for SqlValidator {
34
    fn validate(&self, ctx: &mut ValidationContext) -> rustyline::Result<ValidationResult> {
×
35
        use ValidationResult::{Incomplete, Valid};
×
36
        let input = ctx.input();
×
37
        let result = if !input.ends_with(';') {
×
38
            Incomplete
×
39
        } else {
40
            Valid(None)
×
41
        };
42
        Ok(result)
×
43
    }
×
44
}
45

46
pub fn editor(config_path: &String, running: Arc<AtomicBool>) -> Result<(), OrchestrationError> {
×
47
    use std::println as info;
×
48
    let h = SqlValidator {};
×
49
    let mut rl = rustyline::Editor::<SqlValidator>::new()
×
50
        .map_err(|e| OrchestrationError::CliError(CliError::ReadlineError(e)))?;
×
51

52
    rl.set_helper(Some(h));
×
53
    let config = load_config(config_path.clone())?;
×
54

55
    let history_path = get_sql_history_path(&config);
×
56
    if rl.load_history(history_path.as_path()).is_err() {
×
57
        debug!("No previous history file found.");
×
58
    }
×
59

60
    let mut stdout = stdout();
×
61
    // let mut out = terminal::
×
62

×
63
    info!("{HELP}");
×
64

65
    loop {
×
66
        let readline = rl.readline("sql>");
×
67
        match readline {
×
68
            Ok(line) => {
×
69
                rl.add_history_entry(line.as_str());
×
70
                if !line.is_empty() {
×
71
                    stdout
×
72
                        .execute(cursor::Hide)
×
73
                        .map_err(CliError::TerminalError)?;
×
74

75
                    query(line, config_path, running.clone(), &mut stdout)?;
×
76
                    stdout
×
77
                        .execute(cursor::Show)
×
78
                        .map_err(CliError::TerminalError)?;
×
79
                }
×
80
            }
81
            Err(ReadlineError::Interrupted) => {
82
                info!("Exiting..");
×
83
                break;
×
84
            }
85
            Err(ReadlineError::Eof) => {
86
                break;
×
87
            }
88
            Err(err) => {
×
89
                error!("Error: {:?}", err);
×
90

91
                break;
×
92
            }
93
        }
94
    }
95
    rl.save_history(&history_path)
×
96
        .map_err(|e| OrchestrationError::CliError(CliError::ReadlineError(e)))?;
×
97

98
    Ok(())
×
99
}
×
100

101
pub fn query(
×
102
    sql: String,
×
103
    config_path: &String,
×
104
    running: Arc<AtomicBool>,
×
105
    stdout: &mut Stdout,
×
106
) -> Result<(), OrchestrationError> {
×
107
    let dozer = init_dozer(config_path.to_owned())?;
×
108
    let (sender, receiver) = channel::unbounded::<Operation>();
×
109

×
110
    // set running
×
111
    running.store(true, std::sync::atomic::Ordering::Relaxed);
×
112

×
113
    stdout
×
114
        .execute(cursor::SavePosition)
×
115
        .map_err(CliError::TerminalError)?;
×
116
    let res = dozer.query(sql, sender, running);
×
117
    match res {
×
118
        Ok(schema) => {
×
119
            let mut record_map: HashMap<Vec<u8>, Vec<Field>> = HashMap::new();
×
120
            let mut idx: u64 = 0;
×
121

×
122
            let instant = Instant::now();
×
123
            let mut last_shown = Duration::from_millis(0);
×
124
            let mut prev_len = 0;
×
125
            let pkey_index = if schema.primary_index.is_empty() {
×
126
                vec![]
×
127
            } else {
128
                schema.primary_index.clone()
×
129
            };
130

131
            let mut updates_map = HashMap::new();
×
132

133
            loop {
×
134
                let msg = receiver.recv_timeout(Duration::from_millis(100));
×
135

×
136
                match msg {
×
137
                    Ok(msg) => {
×
138
                        match msg {
×
139
                            Operation::Delete { old } => {
×
140
                                let pkey = if pkey_index.is_empty() {
×
141
                                    idx.to_le_bytes().to_vec()
×
142
                                } else {
143
                                    get_primary_key(&pkey_index, &old.values)
×
144
                                };
145
                                record_map.remove(&pkey);
×
146
                                updates_map.insert(pkey, (0, instant.elapsed()));
×
147
                            }
148
                            Operation::Insert { new } => {
×
149
                                let pkey = if pkey_index.is_empty() {
×
150
                                    idx.to_le_bytes().to_vec()
×
151
                                } else {
152
                                    get_primary_key(&pkey_index, &new.values)
×
153
                                };
154
                                record_map.insert(pkey.clone(), new.values);
×
155
                                updates_map.insert(pkey, (1, instant.elapsed()));
×
156
                            }
157
                            Operation::Update { old, new } => {
×
158
                                let pkey = if pkey_index.is_empty() {
×
159
                                    idx.to_le_bytes().to_vec()
×
160
                                } else {
161
                                    get_primary_key(&pkey_index, &old.values)
×
162
                                };
163
                                let pkey2 = if pkey_index.is_empty() {
×
164
                                    idx.to_le_bytes().to_vec()
×
165
                                } else {
166
                                    get_primary_key(&pkey_index, &new.values)
×
167
                                };
168
                                record_map.remove(&pkey);
×
169

×
170
                                record_map.insert(pkey2.clone(), new.values);
×
171
                                if pkey2 == pkey {
×
172
                                    updates_map.insert(pkey2, (2, instant.elapsed()));
×
173
                                } else {
×
174
                                    updates_map.insert(pkey2, (2, instant.elapsed()));
×
175
                                    updates_map.insert(pkey.clone(), (0, instant.elapsed()));
×
176
                                }
×
177
                            }
178
                            Operation::SnapshottingDone { .. } => (),
×
179
                        }
×
180
                        idx += 1;
×
181
                    }
×
182
                    Err(e) => match e {
×
183
                        channel::RecvTimeoutError::Timeout => {}
×
184
                        channel::RecvTimeoutError::Disconnected => {
×
185
                            break;
×
186
                        }
187
                    },
188
                }
189

×
190
                if instant.elapsed() - last_shown > Duration::from_millis(100) {
×
191
                    last_shown = instant.elapsed();
×
192
                    display(
×
193
                        stdout,
×
194
                        instant,
×
195
                        &schema,
×
196
                        &record_map,
×
197
                        &updates_map,
×
198
                        prev_len,
×
199
                    )
×
200
                    .map_err(CliError::TerminalError)?;
×
201
                    prev_len = record_map.len();
×
202
                }
×
203
            }
204
            // Exit the pipeline
205
        }
206

×
207
        Err(e) => {
×
208
            error!("{}", e);
×
209
        }
210
    }
×
211
    Ok(())
×
212
}
×
213

×
214
fn display(
×
215
    stdout: &mut Stdout,
×
216
    instant: Instant,
×
217
    schema: &Schema,
×
218
    record_map: &HashMap<Vec<u8>, Vec<Field>>,
×
219
    updates_map: &HashMap<Vec<u8>, (i32, Duration)>,
×
220
    prev_len: usize,
×
221
) -> Result<(), crossterm::ErrorKind> {
×
222
    let mut table = Table::new();
×
223

×
224
    // Fields Row
×
225

×
226
    let mut cells = vec![];
×
227
    for f in &schema.fields {
×
228
        cells.push(Cell::new(&f.name));
×
229
    }
×
230
    table.add_row(Row::new(cells));
×
231

×
232
    for (key, values) in record_map {
×
233
        let mut cells = vec![];
×
234
        for v in values {
×
235
            let val_str = v.to_string().map_or("".to_string(), |v| v);
×
236
            let mut c = Cell::new(&val_str);
×
237

×
238
            let upd = updates_map.get(key);
×
239

×
240
            let co = match upd {
×
241
                Some((idx, dur)) => {
×
242
                    if (instant.elapsed() - *dur) < Duration::from_millis(1000) {
×
243
                        if *idx == 0 {
×
244
                            color::BRIGHT_RED
×
245
                        } else if *idx == 1 {
×
246
                            color::GREEN
×
247
                        } else {
×
248
                            color::YELLOW
×
249
                        }
250
                    } else {
×
251
                        color::WHITE
×
252
                    }
253
                }
×
254
                None => color::WHITE,
×
255
            };
256

×
257
            c.style(dozer_types::prettytable::Attr::ForegroundColor(co));
×
258
            cells.push(c);
×
259
        }
×
260
        table.add_row(Row::new(cells));
×
261
    }
×
262
    stdout.execute(cursor::RestorePosition)?;
×
263
    table.printstd();
×
264
    stdout.execute(cursor::MoveUp(1))?;
×
265

×
266
    let diff = prev_len as i64 - record_map.len() as i64;
×
267
    if diff > 0 {
×
268
        for _i in 0..diff * 3 {
×
269
            stdout.execute(cursor::MoveDown(1))?;
×
270
            stdout.execute(terminal::Clear(terminal::ClearType::CurrentLine))?;
×
271
        }
×
272
    }
×
273
    stdout.execute(cursor::MoveDown(1))?;
×
274
    Ok(())
×
275
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc