• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5829059745

pending completion
5829059745

Pull #1844

github

supergi01
added comments for downloader.rs
Pull Request #1844: feat/live-reload, download and start react server

735 of 735 new or added lines in 11 files covered. (100.0%)

45536 of 61287 relevant lines covered (74.3%)

51206.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-cli/src/live/state.rs
1
use std::thread::JoinHandle;
2

3
use clap::Parser;
4
use dozer_api::grpc::types_helper::map_field_definitions;
5
use dozer_cache::dozer_log::{
6
    reader::{LogReaderBuilder, LogReaderOptions},
7
    replication::LogOperation,
8
};
9
use dozer_core::{app::AppPipeline, dag_schemas::DagSchemas, petgraph::dot};
10
use dozer_ingestion::connectors::get_connector;
11
use dozer_sql::pipeline::builder::statement_to_pipeline;
12
use dozer_types::{
13
    grpc_types::{
14
        live::{DotResponse, LiveApp, LiveResponse, Schema, SchemasResponse, SqlResponse},
15
        types::Operation,
16
    },
17
    indicatif::MultiProgress,
18
    log::info,
19
    models::{
20
        api_config::{ApiConfig, AppGrpcOptions},
21
        api_endpoint::ApiEndpoint,
22
    },
23
    parking_lot::RwLock,
24
};
25

26
use crate::{
27
    cli::{init_dozer, types::Cli},
28
    errors::OrchestrationError,
29
    live::helper::map_operation,
30
    pipeline::PipelineBuilder,
31
    shutdown::{self, ShutdownReceiver, ShutdownSender},
32
    simple::SimpleOrchestrator,
33
    utils::get_app_grpc_config,
34
};
35

36
use super::LiveError;
37

38
pub struct LiveState {
39
    dozer: RwLock<Option<SimpleOrchestrator>>,
40
    sql_thread: RwLock<Option<ShutdownSender>>,
41
}
×
42

×
43
impl LiveState {
×
44
    pub fn new() -> Self {
×
45
        Self {
×
46
            dozer: RwLock::new(None),
×
47
            sql_thread: RwLock::new(None),
×
48
        }
×
49
    }
×
50

×
51
    pub fn get_dozer(&self) -> Result<SimpleOrchestrator, LiveError> {
×
52
        match self.dozer.read().as_ref() {
×
53
            Some(dozer) => Ok(dozer.clone()),
×
54
            None => Err(LiveError::NotInitialized),
×
55
        }
×
56
    }
×
57

×
58
    pub fn set_dozer(&self, dozer: SimpleOrchestrator) {
×
59
        let mut lock = self.dozer.write();
×
60
        *lock = Some(dozer);
×
61
    }
×
62

×
63
    pub fn build(&self) -> Result<(), LiveError> {
×
64
        let cli = Cli::parse();
×
65

×
66
        let res = init_dozer(
×
67
            cli.config_paths.clone(),
×
68
            cli.config_token.clone(),
×
69
            cli.config_overrides.clone(),
×
70
        )?;
×
71

×
72
        self.set_dozer(res);
×
73
        Ok(())
×
74
    }
×
75
    pub fn get_current(&self) -> Result<LiveResponse, LiveError> {
×
76
        let dozer = self.get_dozer();
×
77
        match dozer {
×
78
            Ok(dozer) => {
×
79
                let connections = dozer
×
80
                    .config
×
81
                    .connections
×
82
                    .into_iter()
×
83
                    .map(|c| c.name)
×
84
                    .collect();
×
85
                let endpoints = dozer.config.endpoints.into_iter().map(|c| c.name).collect();
×
86
                let app = LiveApp {
×
87
                    app_name: dozer.config.app_name,
×
88
                    connections,
×
89
                    endpoints,
×
90
                };
×
91
                Ok(LiveResponse {
×
92
                    initialized: true,
×
93
                    error_message: None,
×
94
                    app: Some(app),
×
95
                })
×
96
            }
×
97
            Err(e) => Ok(LiveResponse {
×
98
                initialized: false,
×
99
                error_message: Some(e.to_string()),
×
100
                app: None,
×
101
            }),
×
102
        }
×
103
    }
×
104

×
105
    pub fn get_sql(&self) -> Result<SqlResponse, LiveError> {
×
106
        let dozer = self.get_dozer()?;
×
107
        let sql = dozer.config.sql.clone().unwrap_or_default();
×
108
        Ok(SqlResponse { sql })
×
109
    }
×
110
    pub fn get_endpoints_schemas(&self) -> Result<SchemasResponse, LiveError> {
×
111
        let dozer = self.get_dozer()?;
×
112
        get_endpoint_schemas(dozer).map_err(|e| LiveError::BuildError(Box::new(e)))
×
113
    }
×
114
    pub async fn get_source_schemas(
×
115
        &self,
×
116
        connection_name: String,
×
117
    ) -> Result<SchemasResponse, LiveError> {
×
118
        let dozer = self.get_dozer()?;
×
119
        get_source_schemas(dozer, connection_name)
×
120
            .await
×
121
            .map_err(|e| LiveError::BuildError(Box::new(e)))
×
122
    }
×
123

×
124
    pub fn generate_dot(&self) -> Result<DotResponse, LiveError> {
×
125
        let dozer = self.get_dozer()?;
×
126
        generate_dot(dozer).map_err(|e| LiveError::BuildError(Box::new(e)))
×
127
    }
×
128

×
129
    pub fn build_sql(&self, sql: String) -> Result<SchemasResponse, LiveError> {
×
130
        let mut dozer = self.get_dozer()?;
×
131

132
        let context = statement_to_pipeline(&sql, &mut AppPipeline::new(), None)
×
133
            .map_err(LiveError::PipelineError)?;
×
134

×
135
        //overwrite sql
×
136
        dozer.config.sql = Some(sql);
×
137

×
138
        dozer.config.endpoints = vec![];
×
139
        let endpoints = context.output_tables_map.keys().collect::<Vec<_>>();
×
140
        for endpoint in endpoints {
×
141
            let endpoint = ApiEndpoint {
×
142
                name: endpoint.to_string(),
×
143
                table_name: endpoint.to_string(),
×
144
                path: format!("/{}", endpoint),
×
145
                ..Default::default()
×
146
            };
×
147
            dozer.config.endpoints.push(endpoint);
×
148
        }
×
149

×
150
        get_endpoint_schemas(dozer).map_err(|e| LiveError::BuildError(Box::new(e)))
×
151
    }
×
152

×
153
    pub fn run_sql(
×
154
        &self,
×
155
        sql: String,
×
156
        endpoints: Vec<String>,
×
157
        sender: tokio::sync::mpsc::Sender<Result<Operation, tonic::Status>>,
×
158
    ) -> Result<(), LiveError> {
×
159
        let dozer = self.get_dozer()?;
×
160

×
161
        // kill if a handle already exists
×
162
        self.stop_sql();
×
163

×
164
        let (shutdown_sender, shutdown_receiver) = shutdown::new(&dozer.runtime);
×
165
        let _handle = run_sql(dozer, sql, endpoints, sender, shutdown_receiver)
×
166
            .map_err(|e| LiveError::BuildError(Box::new(e)))?;
×
167
        let mut lock = self.sql_thread.write();
×
168
        *lock = Some(shutdown_sender);
×
169
        Ok(())
×
170
    }
×
171

×
172
    pub fn stop_sql(&self) {
×
173
        let mut lock = self.sql_thread.write();
×
174
        if let Some(shutdown) = lock.take() {
×
175
            shutdown.shutdown()
×
176
        }
×
177
        *lock = None;
×
178
    }
×
179
}
×
180

×
181
pub async fn get_source_schemas(
×
182
    dozer: SimpleOrchestrator,
×
183
    connection_name: String,
×
184
) -> Result<SchemasResponse, OrchestrationError> {
×
185
    let connection = dozer
×
186
        .config
×
187
        .connections
×
188
        .iter()
×
189
        .find(|c| c.name == connection_name)
×
190
        .unwrap();
×
191

×
192
    let connector =
×
193
        get_connector(connection.clone()).map_err(|e| LiveError::BuildError(Box::new(e)))?;
×
194

×
195
    let (tables, schemas) = connector
×
196
        .list_all_schemas()
×
197
        .await
×
198
        .map_err(|e| LiveError::BuildError(Box::new(e)))?;
×
199

×
200
    let schemas = schemas
×
201
        .into_iter()
×
202
        .zip(tables)
×
203
        .map(|(s, table)| {
×
204
            let schema = s.schema;
×
205
            let primary_index = schema.primary_index.into_iter().map(|i| i as i32).collect();
×
206
            let schema = Schema {
×
207
                primary_index,
×
208
                fields: map_field_definitions(schema.fields),
×
209
            };
×
210
            (table.name, schema)
×
211
        })
×
212
        .collect();
×
213

×
214
    Ok(SchemasResponse { schemas })
×
215
}
×
216

×
217
pub fn get_endpoint_schemas(
×
218
    dozer: SimpleOrchestrator,
×
219
) -> Result<SchemasResponse, OrchestrationError> {
×
220
    // Calculate schemas.
×
221
    let endpoint_and_logs = dozer
×
222
        .config
×
223
        .endpoints
×
224
        .iter()
×
225
        // We're not really going to run the pipeline, so we don't create logs.
×
226
        .map(|endpoint| (endpoint.clone(), None))
×
227
        .collect();
×
228
    let builder = PipelineBuilder::new(
×
229
        &dozer.config.connections,
×
230
        &dozer.config.sources,
×
231
        dozer.config.sql.as_deref(),
×
232
        endpoint_and_logs,
×
233
        MultiProgress::new(),
×
234
    );
×
235
    let dag = builder.build(dozer.runtime.clone())?;
×
236
    // Populate schemas.
×
237
    let dag_schemas = DagSchemas::new(dag)?;
×
238

×
239
    let schemas = dag_schemas.get_sink_schemas();
×
240

×
241
    let schemas = schemas
×
242
        .into_iter()
×
243
        .map(|(name, tuple)| {
×
244
            let (schema, _) = tuple;
×
245
            let primary_index = schema.primary_index.into_iter().map(|i| i as i32).collect();
×
246
            let schema = Schema {
×
247
                primary_index,
×
248
                fields: map_field_definitions(schema.fields),
×
249
            };
×
250
            (name, schema)
×
251
        })
×
252
        .collect();
×
253
    Ok(SchemasResponse { schemas })
×
254
}
×
255

×
256
pub fn generate_dot(dozer: SimpleOrchestrator) -> Result<DotResponse, OrchestrationError> {
×
257
    // Calculate schemas.
×
258
    let endpoint_and_logs = dozer
×
259
        .config
×
260
        .endpoints
×
261
        .iter()
×
262
        // We're not really going to run the pipeline, so we don't create logs.
×
263
        .map(|endpoint| (endpoint.clone(), None))
×
264
        .collect();
×
265
    let builder = PipelineBuilder::new(
×
266
        &dozer.config.connections,
×
267
        &dozer.config.sources,
×
268
        dozer.config.sql.as_deref(),
×
269
        endpoint_and_logs,
×
270
        MultiProgress::new(),
×
271
    );
×
272
    let dag = builder.build(dozer.runtime.clone())?;
×
273
    // Populate schemas.
×
274

×
275
    let dot_str = dot::Dot::new(dag.graph()).to_string();
×
276

×
277
    Ok(DotResponse { dot: dot_str })
×
278
}
×
279

×
280
pub fn run_sql(
×
281
    dozer: SimpleOrchestrator,
×
282
    sql: String,
×
283
    endpoints: Vec<String>,
×
284
    sender: tokio::sync::mpsc::Sender<Result<Operation, tonic::Status>>,
×
285
    shutdown_receiver: ShutdownReceiver,
×
286
) -> Result<JoinHandle<()>, OrchestrationError> {
×
287
    let mut dozer = dozer;
×
288

×
289
    let runtime = dozer.runtime.clone();
×
290
    //overwrite sql
×
291
    dozer.config.sql = Some(sql);
×
292

×
293
    dozer.config.endpoints = vec![];
×
294
    for endpoint in &endpoints {
×
295
        dozer.config.endpoints.push(ApiEndpoint {
×
296
            name: endpoint.clone(),
×
297
            table_name: endpoint.clone(),
×
298
            path: format!("/{}", endpoint),
×
299
            ..Default::default()
×
300
        })
×
301
    }
×
302

×
303
    dozer.config.api = Some(ApiConfig {
×
304
        app_grpc: Some(AppGrpcOptions {
×
305
            port: 5678,
×
306
            host: "0.0.0.0".to_string(),
×
307
        }),
×
308

×
309
        ..Default::default()
×
310
    });
×
311

×
312
    dozer.config.home_dir = tempdir::TempDir::new("live")
×
313
        .unwrap()
×
314
        .into_path()
×
315
        .to_string_lossy()
×
316
        .to_string();
×
317

×
318
    let internal_grpc_config = get_app_grpc_config(&dozer.config);
×
319
    let app_server_addr = format!(
×
320
        "http://{}:{}",
×
321
        internal_grpc_config.host, internal_grpc_config.port
×
322
    );
×
323
    let (tx, rx) = dozer_types::crossbeam::channel::unbounded::<bool>();
×
324
    let pipeline_thread = std::thread::spawn(move || {
×
325
        dozer.build(true).unwrap();
×
326
        dozer.run_apps(shutdown_receiver, Some(tx), None)
×
327
    });
×
328
    let endpoint_name = endpoints[0].clone();
×
329

×
330
    let recv_res = rx.recv();
×
331
    if recv_res.is_err() {
×
332
        return match pipeline_thread.join() {
×
333
            Ok(Err(e)) => Err(e),
×
334
            Ok(Ok(())) => panic!("An error must have happened"),
×
335
            Err(e) => {
×
336
                std::panic::panic_any(e);
×
337
            }
×
338
        };
×
339
    }
×
340

×
341
    info!("Starting log reader {:?}", endpoint_name);
×
342

×
343
    let handle = std::thread::spawn(move || {
×
344
        runtime.block_on(async {
×
345
            let mut log_reader = LogReaderBuilder::new(
×
346
                app_server_addr,
×
347
                LogReaderOptions::new(endpoint_name.clone()),
×
348
            )
×
349
            .await
×
350
            .unwrap()
×
351
            .build(0, None);
×
352
            loop {
353
                let (op, _) = log_reader.next_op().await.unwrap();
×
354
                if let LogOperation::Op { op } = op {
×
355
                    let op = map_operation(endpoint_name.clone(), op);
×
356
                    sender.send(Ok(op)).await.unwrap();
×
357
                }
×
358
            }
359
        });
×
360

×
361
        pipeline_thread.join().unwrap().unwrap();
×
362
    });
×
363

×
364
    Ok(handle)
×
365
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc