• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5869050642

pending completion
5869050642

Pull #1858

github

supergi0
updated readme
Pull Request #1858: feat: Implement graph for dozer-live ui

419 of 419 new or added lines in 15 files covered. (100.0%)

46002 of 59761 relevant lines covered (76.98%)

52423.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-cli/src/live/state.rs
1
use std::thread::JoinHandle;
2

3
use clap::Parser;
4
use dozer_core::{app::AppPipeline, dag_schemas::DagSchemas, petgraph::dot, Dag};
5
use dozer_ingestion::connectors::get_connector;
6
use dozer_sql::pipeline::builder::{statement_to_pipeline, SchemaSQLContext};
7
use dozer_types::{
8
    grpc_types::live::{
9
        ConnectResponse, DotResponse, LiveApp, LiveResponse, RunRequest, SchemasResponse,
10
        SqlResponse,
11
    },
12
    indicatif::MultiProgress,
13
    log::info,
14
    models::{
15
        api_config::{ApiConfig, AppGrpcOptions},
16
        api_endpoint::ApiEndpoint,
17
        telemetry::{TelemetryConfig, TelemetryMetricsConfig},
18
    },
19
    parking_lot::RwLock,
20
};
21

22
use crate::{
23
    cli::{init_dozer, types::Cli},
24
    errors::OrchestrationError,
25
    pipeline::PipelineBuilder,
26
    shutdown::{self, ShutdownReceiver, ShutdownSender},
27
    simple::{helper::validate_config, SimpleOrchestrator},
28
};
29

30
use super::{
31
    graph::{map_dag_schemas, transform_dag_ui},
32
    helper::map_schema,
33
    progress::progress_stream,
34
    LiveError,
35
};
36

37
pub struct LiveState {
38
    dozer: RwLock<Option<SimpleOrchestrator>>,
39
    run_thread: RwLock<Option<ShutdownSender>>,
40
    error_message: RwLock<Option<String>>,
41
    sender: RwLock<Option<tokio::sync::broadcast::Sender<ConnectResponse>>>,
42
}
43

44
impl LiveState {
×
45
    pub fn new() -> Self {
×
46
        Self {
×
47
            dozer: RwLock::new(None),
×
48
            run_thread: RwLock::new(None),
×
49
            sender: RwLock::new(None),
×
50
            error_message: RwLock::new(None),
×
51
        }
×
52
    }
×
53

×
54
    pub fn get_dozer(&self) -> Option<SimpleOrchestrator> {
×
55
        self.dozer.read().as_ref().cloned()
×
56
    }
×
57

58
    pub fn set_sender(&self, sender: tokio::sync::broadcast::Sender<ConnectResponse>) {
×
59
        let mut lock = self.sender.write();
×
60
        *lock = Some(sender);
×
61
    }
×
62

63
    pub fn broadcast(&self) -> Result<(), LiveError> {
×
64
        let sender = self.sender.read();
×
65
        info!("broadcasting current state");
×
66
        if let Some(sender) = sender.as_ref() {
×
67
            let res = self.get_current();
×
68
            return match sender.send(ConnectResponse {
×
69
                live: Some(res),
×
70
                progress: None,
×
71
            }) {
×
72
                Ok(_) => Ok(()),
×
73
                Err(e) => Err(LiveError::BoxedError(Box::new(e))),
×
74
            };
×
75
        }
×
76

×
77
        Ok(())
×
78
    }
×
79

×
80
    pub fn set_dozer(&self, dozer: Option<SimpleOrchestrator>) {
×
81
        let mut lock = self.dozer.write();
×
82
        *lock = dozer;
×
83
    }
×
84

×
85
    pub fn set_error_message(&self, error_message: Option<String>) {
×
86
        let mut lock = self.error_message.write();
×
87
        *lock = error_message;
×
88
    }
×
89

×
90
    pub fn build(&self) -> Result<(), LiveError> {
×
91
        // Taking lock to ensure that we don't have multiple builds running at the same time
×
92
        let mut lock = self.dozer.write();
×
93

×
94
        let cli = Cli::parse();
×
95

×
96
        let res = init_dozer(
×
97
            cli.config_paths.clone(),
×
98
            cli.config_token.clone(),
×
99
            cli.config_overrides.clone(),
×
100
            cli.ignore_pipe,
×
101
            None,
×
102
        )?;
×
103

104
        *lock = Some(res);
×
105
        Ok(())
×
106
    }
×
107
    pub fn get_current(&self) -> LiveResponse {
×
108
        let app = self.get_dozer().map(|dozer| {
×
109
            let connections = dozer
×
110
                .config
×
111
                .connections
×
112
                .into_iter()
×
113
                .map(|c| c.name)
×
114
                .collect();
×
115
            let endpoints = dozer.config.endpoints.into_iter().map(|c| c.name).collect();
×
116
            LiveApp {
×
117
                app_name: dozer.config.app_name,
×
118
                connections,
×
119
                endpoints,
×
120
            }
×
121
        });
×
122

×
123
        LiveResponse {
×
124
            initialized: app.is_some(),
×
125
            running: self.run_thread.read().is_some(),
×
126
            error_message: self.error_message.read().as_ref().cloned(),
×
127
            app,
×
128
        }
×
129
    }
×
130

×
131
    pub fn get_sql(&self) -> Result<SqlResponse, LiveError> {
×
132
        let dozer = self
×
133
            .get_dozer()
×
134
            .map_or(Err(LiveError::NotInitialized), Ok)?;
×
135
        let sql = dozer.config.sql.clone().unwrap_or_default();
×
136
        Ok(SqlResponse { sql })
×
137
    }
×
138
    pub fn get_endpoints_schemas(&self) -> Result<SchemasResponse, LiveError> {
×
139
        let dozer = self
×
140
            .get_dozer()
×
141
            .map_or(Err(LiveError::NotInitialized), Ok)?;
×
142
        get_endpoint_schemas(dozer).map_err(|e| LiveError::BoxedError(Box::new(e)))
×
143
    }
×
144
    pub async fn get_source_schemas(
×
145
        &self,
×
146
        connection_name: String,
×
147
    ) -> Result<SchemasResponse, LiveError> {
×
148
        let dozer = self
×
149
            .get_dozer()
×
150
            .map_or(Err(LiveError::NotInitialized), Ok)?;
×
151
        get_source_schemas(dozer, connection_name)
×
152
            .await
×
153
            .map_err(|e| LiveError::BoxedError(Box::new(e)))
×
154
    }
×
155

×
156
    pub fn get_graph_schemas(&self) -> Result<SchemasResponse, LiveError> {
×
157
        let dozer = self
×
158
            .get_dozer()
×
159
            .map_or(Err(LiveError::NotInitialized), Ok)?;
×
160
        get_graph_schemas(dozer).map_err(|e| LiveError::BoxedError(Box::new(e)))
×
161
    }
×
162

163
    pub fn generate_dot(&self) -> Result<DotResponse, LiveError> {
×
164
        let dozer = self
×
165
            .get_dozer()
×
166
            .map_or(Err(LiveError::NotInitialized), Ok)?;
×
167
        generate_dot(dozer).map_err(|e| LiveError::BoxedError(Box::new(e)))
×
168
    }
×
169

×
170
    pub fn run(&self, request: RunRequest) -> Result<(), LiveError> {
×
171
        let dozer = self
×
172
            .get_dozer()
×
173
            .map_or(Err(LiveError::NotInitialized), Ok)?;
×
174

×
175
        // kill if a handle already exists
×
176
        self.stop()?;
×
177

×
178
        let (shutdown_sender, shutdown_receiver) = shutdown::new(&dozer.runtime);
×
179
        let metrics_shutdown = shutdown_receiver.clone();
×
180
        let _handle = run(dozer, request, shutdown_receiver)
×
181
            .map_err(|e| LiveError::BoxedError(Box::new(e)))?;
×
182

×
183
        // Initialize progress
×
184
        let metrics_sender = self.sender.read().as_ref().unwrap().clone();
×
185
        tokio::spawn(async {
×
186
            progress_stream(metrics_sender, metrics_shutdown)
×
187
                .await
×
188
                .unwrap()
×
189
        });
×
190

×
191
        let mut lock = self.run_thread.write();
×
192
        *lock = Some(shutdown_sender);
×
193

×
194
        Ok(())
×
195
    }
×
196

×
197
    pub fn stop(&self) -> Result<(), LiveError> {
×
198
        let mut lock = self.run_thread.write();
×
199
        if let Some(shutdown) = lock.take() {
×
200
            shutdown.shutdown()
×
201
        }
×
202
        *lock = None;
×
203
        Ok(())
×
204
    }
×
205
}
×
206

×
207
pub async fn get_source_schemas(
×
208
    dozer: SimpleOrchestrator,
×
209
    connection_name: String,
×
210
) -> Result<SchemasResponse, OrchestrationError> {
×
211
    let connection = dozer
×
212
        .config
×
213
        .connections
×
214
        .iter()
×
215
        .find(|c| c.name == connection_name)
×
216
        .unwrap();
×
217

218
    let connector =
×
219
        get_connector(connection.clone()).map_err(|e| LiveError::BoxedError(Box::new(e)))?;
×
220

×
221
    let (tables, schemas) = connector
×
222
        .list_all_schemas()
×
223
        .await
×
224
        .map_err(|e| LiveError::BoxedError(Box::new(e)))?;
×
225

×
226
    let schemas = schemas
×
227
        .into_iter()
×
228
        .zip(tables)
×
229
        .map(|(s, table)| {
×
230
            let schema = s.schema;
×
231
            (table.name, map_schema(schema))
×
232
        })
×
233
        .collect();
×
234

×
235
    Ok(SchemasResponse { schemas })
×
236
}
×
237

238
pub fn get_dag(dozer: SimpleOrchestrator) -> Result<Dag<SchemaSQLContext>, OrchestrationError> {
×
239
    // Calculate schemas.
×
240
    let endpoint_and_logs = dozer
×
241
        .config
×
242
        .endpoints
×
243
        .iter()
×
244
        // We're not really going to run the pipeline, so we don't create logs.
×
245
        .map(|endpoint| (endpoint.clone(), None))
×
246
        .collect();
×
247
    let builder = PipelineBuilder::new(
×
248
        &dozer.config.connections,
×
249
        &dozer.config.sources,
×
250
        dozer.config.sql.as_deref(),
×
251
        endpoint_and_logs,
×
252
        MultiProgress::new(),
×
253
    );
×
254
    builder.build(dozer.runtime.clone())
×
255
}
×
256

257
pub fn get_endpoint_schemas(
×
258
    dozer: SimpleOrchestrator,
×
259
) -> Result<SchemasResponse, OrchestrationError> {
×
260
    let dag = get_dag(dozer)?;
×
261
    let dag_schemas = DagSchemas::new(dag)?;
×
262

×
263
    let schemas = dag_schemas.get_sink_schemas();
×
264

×
265
    let schemas = schemas
×
266
        .into_iter()
×
267
        .map(|(name, tuple)| {
×
268
            let (schema, _) = tuple;
×
269
            (name, map_schema(schema))
×
270
        })
×
271
        .collect();
×
272
    Ok(SchemasResponse { schemas })
×
273
}
×
274

275
pub fn generate_dot(dozer: SimpleOrchestrator) -> Result<DotResponse, OrchestrationError> {
×
276
    let dag = get_dag(dozer)?;
×
277
    let dag = transform_dag_ui(&dag);
×
278
    let dot_str = dot::Dot::new(dag.graph()).to_string();
×
279
    Ok(DotResponse { dot: dot_str })
×
280
}
×
281

×
282
pub fn get_graph_schemas(dozer: SimpleOrchestrator) -> Result<SchemasResponse, OrchestrationError> {
×
283
    let dag = get_dag(dozer)?;
×
284
    let dag_schemas = DagSchemas::new(dag)?;
×
285

×
286
    Ok(SchemasResponse {
×
287
        schemas: map_dag_schemas(dag_schemas),
×
288
    })
×
289
}
×
290

×
291
pub fn run(
×
292
    dozer: SimpleOrchestrator,
×
293
    request: RunRequest,
×
294
    shutdown_receiver: ShutdownReceiver,
×
295
) -> Result<JoinHandle<()>, OrchestrationError> {
×
296
    let mut dozer = get_dozer_run_instance(dozer, request)?;
×
297

×
298
    validate_config(&dozer.config).map_err(|e| LiveError::BoxedError(Box::new(e)))?;
×
299

×
300
    let runtime = dozer.runtime.clone();
×
301
    let run_thread = std::thread::spawn(move || {
×
302
        dozer.build(true).unwrap();
×
303
        dozer.run_all(shutdown_receiver)
×
304
    });
×
305

×
306
    let handle = std::thread::spawn(move || {
×
307
        runtime.block_on(async {
×
308
            run_thread.join().unwrap().unwrap();
×
309
        });
×
310
    });
×
311

×
312
    Ok(handle)
×
313
}
×
314

×
315
fn get_dozer_run_instance(
×
316
    dozer: SimpleOrchestrator,
×
317
    req: RunRequest,
×
318
) -> Result<SimpleOrchestrator, LiveError> {
×
319
    let mut dozer = dozer;
×
320

×
321
    match req.request {
×
322
        Some(dozer_types::grpc_types::live::run_request::Request::Sql(req)) => {
×
323
            let context = statement_to_pipeline(&req.sql, &mut AppPipeline::new(), None)
×
324
                .map_err(LiveError::PipelineError)?;
×
325

×
326
            //overwrite sql
×
327
            dozer.config.sql = Some(req.sql);
×
328

×
329
            dozer.config.endpoints = vec![];
×
330
            let endpoints = context.output_tables_map.keys().collect::<Vec<_>>();
×
331
            for endpoint in endpoints {
×
332
                let endpoint = ApiEndpoint {
×
333
                    name: endpoint.to_string(),
×
334
                    table_name: endpoint.to_string(),
×
335
                    path: format!("/{}", endpoint),
×
336
                    ..Default::default()
×
337
                };
×
338
                dozer.config.endpoints.push(endpoint);
×
339
            }
×
340
        }
×
341
        Some(dozer_types::grpc_types::live::run_request::Request::Source(req)) => {
×
342
            dozer.config.sql = None;
×
343
            dozer.config.endpoints = vec![];
×
344
            let endpoint = req.source;
×
345
            dozer.config.endpoints.push(ApiEndpoint {
×
346
                name: endpoint.to_string(),
×
347
                table_name: endpoint.to_string(),
×
348
                path: format!("/{}", endpoint),
×
349
                ..Default::default()
×
350
            });
×
351
        }
×
352
        None => {}
×
353
    };
354

×
355
    dozer.config.api = Some(ApiConfig {
×
356
        app_grpc: Some(AppGrpcOptions {
×
357
            port: 5678,
×
358
            host: "0.0.0.0".to_string(),
×
359
        }),
×
360

×
361
        ..Default::default()
×
362
    });
×
363

×
364
    dozer.config.home_dir = tempdir::TempDir::new("live")
×
365
        .unwrap()
×
366
        .into_path()
×
367
        .to_string_lossy()
×
368
        .to_string();
×
369

×
370
    dozer.config.telemetry = Some(TelemetryConfig {
×
371
        trace: None,
×
372
        metrics: Some(TelemetryMetricsConfig::Prometheus(())),
×
373
    });
×
374

×
375
    Ok(dozer)
×
376
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc