• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4020902227

pending completion
4020902227

Pull #743

github

GitHub
Merge 57279c6b6 into a12da35a5
Pull Request #743: Chore clippy fix

165 of 165 new or added lines in 60 files covered. (100.0%)

23638 of 35485 relevant lines covered (66.61%)

38417.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-orchestrator/src/cli/repl/sql.rs
1
use crate::errors::CliError;
2
use crate::utils::get_sql_history_path;
3
use std::collections::HashMap;
4

5
use std::io::{stdout, Stdout};
6
use std::sync::atomic::AtomicBool;
7
use std::sync::Arc;
8
use std::time::{Duration, Instant};
9

10
use crate::cli::{init_dozer, load_config};
11
use crate::errors::OrchestrationError;
12
use crate::Orchestrator;
13
use crossterm::{cursor, terminal, ExecutableCommand};
14
use dozer_cache::cache::index::get_primary_key;
15
use dozer_types::crossbeam::channel;
16
use dozer_types::log::{debug, error, info};
17
use dozer_types::prettytable::color;
18
use dozer_types::prettytable::{Cell, Row, Table};
19
use dozer_types::types::{Field, Operation, Schema};
20
use rustyline::error::ReadlineError;
21

22
use rustyline::validate::{ValidationContext, ValidationResult, Validator};
23
use rustyline_derive::{Completer, Helper, Highlighter, Hinter};
24

25
const HELP: &str = r#"
26
Enter your SQL below. 
27
Use semicolon at the end to run the query or Ctrl-C to cancel. 
28

29
"#;
30
#[derive(Completer, Helper, Highlighter, Hinter)]
31
struct SqlValidator {}
32

33
impl Validator for SqlValidator {
34
    fn validate(&self, ctx: &mut ValidationContext) -> rustyline::Result<ValidationResult> {
×
35
        use ValidationResult::{Incomplete, Valid};
×
36
        let input = ctx.input();
×
37
        let result = if !input.ends_with(';') {
×
38
            Incomplete
×
39
        } else {
40
            Valid(None)
×
41
        };
42
        Ok(result)
×
43
    }
×
44
}
45

46
pub fn editor(config_path: &String, running: Arc<AtomicBool>) -> Result<(), OrchestrationError> {
×
47
    use std::println as info;
×
48
    let h = SqlValidator {};
×
49
    let mut rl = rustyline::Editor::<SqlValidator>::new()
×
50
        .map_err(|e| OrchestrationError::CliError(CliError::ReadlineError(e)))?;
×
51

52
    rl.set_helper(Some(h));
×
53
    let config = load_config(config_path.clone())?;
×
54

55
    let history_path = get_sql_history_path(&config);
×
56
    if rl.load_history(history_path.as_path()).is_err() {
×
57
        debug!("No previous history file found.");
×
58
    }
×
59

60
    let mut stdout = stdout();
×
61
    // let mut out = terminal::
×
62

×
63
    info!("{HELP}");
×
64

65
    loop {
×
66
        let readline = rl.readline("sql>");
×
67
        match readline {
×
68
            Ok(line) => {
×
69
                rl.add_history_entry(line.as_str());
×
70
                if !line.is_empty() {
×
71
                    stdout
×
72
                        .execute(cursor::Hide)
×
73
                        .map_err(CliError::TerminalError)?;
×
74

75
                    query(line, config_path, running.clone(), &mut stdout)?;
×
76
                    stdout
×
77
                        .execute(cursor::Show)
×
78
                        .map_err(CliError::TerminalError)?;
×
79
                }
×
80
            }
81
            Err(ReadlineError::Interrupted) => {
82
                info!("Exiting..");
×
83
                break;
×
84
            }
85
            Err(ReadlineError::Eof) => {
86
                break;
×
87
            }
88
            Err(err) => {
×
89
                error!("Error: {:?}", err);
×
90

91
                break;
×
92
            }
93
        }
94
    }
95
    rl.save_history(&history_path)
×
96
        .map_err(|e| OrchestrationError::CliError(CliError::ReadlineError(e)))?;
×
97

98
    Ok(())
×
99
}
×
100

101
pub fn query(
×
102
    sql: String,
×
103
    config_path: &String,
×
104
    running: Arc<AtomicBool>,
×
105
    stdout: &mut Stdout,
×
106
) -> Result<(), OrchestrationError> {
×
107
    let dozer = init_dozer(config_path.to_owned())?;
×
108
    let (sender, receiver) = channel::unbounded::<Operation>();
×
109

×
110
    // set running
×
111
    running.store(true, std::sync::atomic::Ordering::Relaxed);
×
112

×
113
    stdout
×
114
        .execute(cursor::SavePosition)
×
115
        .map_err(CliError::TerminalError)?;
×
116
    let res = dozer.query(sql, sender, running);
×
117
    match res {
×
118
        Ok(schema) => {
×
119
            let mut record_map: HashMap<Vec<u8>, Vec<Field>> = HashMap::new();
×
120
            let mut idx: u64 = 0;
×
121

×
122
            let instant = Instant::now();
×
123
            let mut last_shown = Duration::from_millis(0);
×
124
            let mut prev_len = 0;
×
125
            let pkey_index = if schema.primary_index.is_empty() {
×
126
                vec![]
×
127
            } else {
128
                schema.primary_index.clone()
×
129
            };
130

131
            let mut updates_map = HashMap::new();
×
132

133
            loop {
×
134
                let msg = receiver.recv_timeout(Duration::from_millis(100));
×
135

×
136
                match msg {
×
137
                    Ok(msg) => {
×
138
                        match msg {
×
139
                            Operation::Delete { old } => {
×
140
                                let pkey = if pkey_index.is_empty() {
×
141
                                    idx.to_le_bytes().to_vec()
×
142
                                } else {
143
                                    get_primary_key(&pkey_index, &old.values)
×
144
                                };
145
                                record_map.remove(&pkey);
×
146
                                updates_map.insert(pkey, (0, instant.elapsed()));
×
147
                            }
148
                            Operation::Insert { new } => {
×
149
                                let pkey = if pkey_index.is_empty() {
×
150
                                    idx.to_le_bytes().to_vec()
×
151
                                } else {
152
                                    get_primary_key(&pkey_index, &new.values)
×
153
                                };
154
                                record_map.insert(pkey.clone(), new.values);
×
155
                                updates_map.insert(pkey, (1, instant.elapsed()));
×
156
                            }
157
                            Operation::Update { old, new } => {
×
158
                                let pkey = if pkey_index.is_empty() {
×
159
                                    idx.to_le_bytes().to_vec()
×
160
                                } else {
161
                                    get_primary_key(&pkey_index, &old.values)
×
162
                                };
163
                                let pkey2 = if pkey_index.is_empty() {
×
164
                                    idx.to_le_bytes().to_vec()
×
165
                                } else {
166
                                    get_primary_key(&pkey_index, &new.values)
×
167
                                };
168
                                record_map.remove(&pkey);
×
169

×
170
                                record_map.insert(pkey2.clone(), new.values);
×
171
                                if pkey2 == pkey {
×
172
                                    updates_map.insert(pkey2, (2, instant.elapsed()));
×
173
                                } else {
×
174
                                    updates_map.insert(pkey2, (2, instant.elapsed()));
×
175
                                    updates_map.insert(pkey.clone(), (0, instant.elapsed()));
×
176
                                }
×
177
                            }
178
                        }
179
                        idx += 1;
×
180
                    }
181
                    Err(e) => match e {
×
182
                        channel::RecvTimeoutError::Timeout => {}
×
183
                        channel::RecvTimeoutError::Disconnected => {
184
                            break;
×
185
                        }
186
                    },
187
                }
188

189
                if instant.elapsed() - last_shown > Duration::from_millis(100) {
×
190
                    last_shown = instant.elapsed();
×
191
                    display(
×
192
                        stdout,
×
193
                        instant,
×
194
                        &schema,
×
195
                        &record_map,
×
196
                        &updates_map,
×
197
                        prev_len,
×
198
                    )
×
199
                    .map_err(CliError::TerminalError)?;
×
200
                    prev_len = record_map.len();
×
201
                }
×
202
            }
203
            // Exit the pipeline
204
        }
205

206
        Err(e) => {
×
207
            error!("{}", e);
×
208
        }
209
    }
210
    Ok(())
×
211
}
×
212

213
fn display(
×
214
    stdout: &mut Stdout,
×
215
    instant: Instant,
×
216
    schema: &Schema,
×
217
    record_map: &HashMap<Vec<u8>, Vec<Field>>,
×
218
    updates_map: &HashMap<Vec<u8>, (i32, Duration)>,
×
219
    prev_len: usize,
×
220
) -> Result<(), crossterm::ErrorKind> {
×
221
    let mut table = Table::new();
×
222

×
223
    // Fields Row
×
224

×
225
    let mut cells = vec![];
×
226
    for f in &schema.fields {
×
227
        cells.push(Cell::new(&f.name));
×
228
    }
×
229
    table.add_row(Row::new(cells));
×
230

231
    for (key, values) in record_map {
×
232
        let mut cells = vec![];
×
233
        for v in values {
×
234
            let val_str = v.to_string().map_or("".to_string(), |v| v);
×
235
            let mut c = Cell::new(&val_str);
×
236

×
237
            let upd = updates_map.get(key);
×
238

239
            let co = match upd {
×
240
                Some((idx, dur)) => {
×
241
                    if (instant.elapsed() - *dur) < Duration::from_millis(1000) {
×
242
                        if *idx == 0 {
×
243
                            color::BRIGHT_RED
×
244
                        } else if *idx == 1 {
×
245
                            color::GREEN
×
246
                        } else {
247
                            color::YELLOW
×
248
                        }
249
                    } else {
250
                        color::WHITE
×
251
                    }
252
                }
253
                None => color::WHITE,
×
254
            };
255

256
            c.style(dozer_types::prettytable::Attr::ForegroundColor(co));
×
257
            cells.push(c);
×
258
        }
259
        table.add_row(Row::new(cells));
×
260
    }
261
    stdout.execute(cursor::RestorePosition)?;
×
262
    table.printstd();
×
263
    stdout.execute(cursor::MoveUp(1))?;
×
264

265
    let diff = prev_len as i64 - record_map.len() as i64;
×
266
    if diff > 0 {
×
267
        for _i in 0..diff * 3 {
×
268
            stdout.execute(cursor::MoveDown(1))?;
×
269
            stdout.execute(terminal::Clear(terminal::ClearType::CurrentLine))?;
×
270
        }
271
    }
×
272
    stdout.execute(cursor::MoveDown(1))?;
×
273
    Ok(())
×
274
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc