• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5888798292

17 Aug 2023 08:51AM UTC coverage: 76.025% (-1.4%) from 77.415%
5888798292

push

github

web-flow
feat: implement graph on live ui (#1847)

* feat: implement progress

* feat: implement enable progress flag

* feat: implement progress in live

* chore: fix clippy

* chore: always use telemetry metrics

* fix: Only run build once

---------

Co-authored-by: sagar <sagar@getdozer.io>
Co-authored-by: chubei <914745487@qq.com>

536 of 536 new or added lines in 21 files covered. (100.0%)

46101 of 60639 relevant lines covered (76.03%)

40410.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-cli/src/live/state.rs
1
use std::{collections::HashMap, sync::Arc, thread::JoinHandle};
2

3
use clap::Parser;
4
use dozer_core::{
5
    app::AppPipeline,
6
    dag_schemas::DagSchemas,
7
    petgraph::{
8
        dot,
9
        visit::{IntoEdgesDirected, IntoNodeReferences},
10
        Direction,
11
    },
12
    Dag, NodeKind,
13
};
14
use dozer_sql::pipeline::builder::{statement_to_pipeline, SchemaSQLContext};
15
use dozer_types::{
16
    grpc_types::live::{
17
        ConnectResponse, DotResponse, LiveApp, LiveResponse, RunRequest, SchemasResponse,
18
        SqlResponse,
19
    },
20
    indicatif::MultiProgress,
21
    log::info,
22
    models::{
23
        api_config::{ApiConfig, AppGrpcOptions},
24
        api_endpoint::ApiEndpoint,
25
        telemetry::{TelemetryConfig, TelemetryMetricsConfig},
26
    },
27
};
28
use tokio::{runtime::Runtime, sync::RwLock};
29

30
use crate::{
31
    cli::{init_dozer, types::Cli},
32
    errors::OrchestrationError,
33
    pipeline::PipelineBuilder,
34
    shutdown::{self, ShutdownReceiver, ShutdownSender},
35
    simple::{helper::validate_config, SimpleOrchestrator},
36
};
37

38
use super::{
39
    graph::{map_dag_schemas, transform_dag_ui},
40
    helper::map_schema,
41
    progress::progress_stream,
42
    LiveError,
43
};
44

×
45
struct DozerAndSchemas {
×
46
    dozer: SimpleOrchestrator,
×
47
    schemas: Option<DagSchemas<SchemaSQLContext>>,
×
48
}
×
49

×
50
pub struct LiveState {
51
    dozer: RwLock<Option<DozerAndSchemas>>,
×
52
    run_thread: RwLock<Option<ShutdownSender>>,
×
53
    error_message: RwLock<Option<String>>,
×
54
    sender: RwLock<Option<tokio::sync::broadcast::Sender<ConnectResponse>>>,
×
55
}
56

×
57
impl LiveState {
58
    pub fn new() -> Self {
×
59
        Self {
×
60
            dozer: RwLock::new(None),
×
61
            run_thread: RwLock::new(None),
×
62
            sender: RwLock::new(None),
×
63
            error_message: RwLock::new(None),
×
64
        }
×
65
    }
×
66

×
67
    async fn create_dag_if_missing(&self) -> Result<(), LiveError> {
×
68
        let mut dozer_and_schema_lock = self.dozer.write().await;
×
69
        if let Some(dozer_and_schema) = dozer_and_schema_lock.as_mut() {
×
70
            if dozer_and_schema.schemas.is_none() {
×
71
                let dag = create_dag(&dozer_and_schema.dozer).await?;
×
72
                let schemas = DagSchemas::new(dag)?;
×
73
                dozer_and_schema.schemas = Some(schemas);
×
74
            }
×
75
        }
×
76
        Ok(())
×
77
    }
×
78

×
79
    pub async fn set_sender(&self, sender: tokio::sync::broadcast::Sender<ConnectResponse>) {
×
80
        *self.sender.write().await = Some(sender);
×
81
    }
×
82

×
83
    pub async fn broadcast(&self) {
×
84
        let sender = self.sender.read().await;
×
85
        info!("broadcasting current state");
×
86
        if let Some(sender) = sender.as_ref() {
×
87
            let res = self.get_current().await;
×
88
            // Ignore broadcast error.
×
89
            let _ = sender.send(ConnectResponse {
×
90
                live: Some(res),
×
91
                progress: None,
×
92
            });
×
93
        }
×
94
    }
×
95

×
96
    pub async fn set_dozer(&self, dozer: Option<SimpleOrchestrator>) {
×
97
        *self.dozer.write().await = dozer.map(|dozer| DozerAndSchemas {
×
98
            dozer,
×
99
            schemas: None,
×
100
        });
×
101
    }
×
102

×
103
    pub async fn set_error_message(&self, error_message: Option<String>) {
×
104
        *self.error_message.write().await = error_message;
×
105
    }
×
106

×
107
    pub async fn build(&self, runtime: Arc<Runtime>) -> Result<(), LiveError> {
×
108
        // Taking lock to ensure that we don't have multiple builds running at the same time
×
109
        let mut lock = self.dozer.write().await;
×
110

×
111
        let cli = Cli::parse();
×
112

×
113
        let dozer = init_dozer(
×
114
            runtime,
×
115
            cli.config_paths.clone(),
×
116
            cli.config_token.clone(),
×
117
            cli.config_overrides.clone(),
×
118
            cli.ignore_pipe,
×
119
            false,
×
120
        )
×
121
        .await?;
×
122

×
123
        *lock = Some(DozerAndSchemas {
×
124
            dozer,
×
125
            schemas: None,
×
126
        });
×
127
        Ok(())
×
128
    }
×
129
    pub async fn get_current(&self) -> LiveResponse {
×
130
        let dozer = self.dozer.read().await;
×
131
        let app = dozer.as_ref().map(|dozer| {
×
132
            let connections = dozer
×
133
                .dozer
×
134
                .config
×
135
                .connections
×
136
                .iter()
×
137
                .map(|c| c.name.clone())
×
138
                .collect();
×
139
            let endpoints = dozer
×
140
                .dozer
×
141
                .config
×
142
                .endpoints
×
143
                .iter()
×
144
                .map(|c| c.name.clone())
×
145
                .collect();
×
146
            LiveApp {
×
147
                app_name: dozer.dozer.config.app_name.clone(),
×
148
                connections,
×
149
                endpoints,
×
150
            }
×
151
        });
×
152

×
153
        LiveResponse {
×
154
            initialized: app.is_some(),
×
155
            running: self.run_thread.read().await.is_some(),
×
156
            error_message: self.error_message.read().await.as_ref().cloned(),
×
157
            app,
×
158
        }
×
159
    }
×
160

×
161
    pub async fn get_sql(&self) -> Result<SqlResponse, LiveError> {
×
162
        let dozer = self.dozer.read().await;
×
163
        let dozer = dozer.as_ref().ok_or(LiveError::NotInitialized)?;
×
164

×
165
        let sql = dozer.dozer.config.sql.clone().unwrap_or_default();
×
166
        Ok(SqlResponse { sql })
×
167
    }
×
168
    pub async fn get_endpoints_schemas(&self) -> Result<SchemasResponse, LiveError> {
×
169
        self.create_dag_if_missing().await?;
×
170
        let dozer = self.dozer.read().await;
×
171
        let schemas = get_schemas(&dozer)?;
×
172

173
        Ok(get_endpoint_schemas(schemas))
×
174
    }
×
175
    pub async fn get_source_schemas(
×
176
        &self,
×
177
        connection_name: String,
×
178
    ) -> Result<SchemasResponse, LiveError> {
×
179
        self.create_dag_if_missing().await?;
×
180
        let dozer = self.dozer.read().await;
×
181
        let schemas = get_schemas(&dozer)?;
×
182

×
183
        get_source_schemas(schemas, connection_name)
×
184
    }
×
185

×
186
    pub async fn get_graph_schemas(&self) -> Result<SchemasResponse, LiveError> {
×
187
        self.create_dag_if_missing().await?;
×
188
        let dozer = self.dozer.read().await;
×
189
        let schemas = get_schemas(&dozer)?;
×
190

×
191
        Ok(SchemasResponse {
×
192
            schemas: map_dag_schemas(schemas),
×
193
        })
×
194
    }
×
195

196
    pub async fn generate_dot(&self) -> Result<DotResponse, LiveError> {
×
197
        self.create_dag_if_missing().await?;
×
198
        let dozer = self.dozer.read().await;
×
199
        let schemas = get_schemas(&dozer)?;
×
200

201
        Ok(generate_dot(schemas))
×
202
    }
×
203

×
204
    pub async fn run(&self, request: RunRequest) -> Result<(), LiveError> {
×
205
        let dozer = self.dozer.read().await;
×
206
        let dozer = &dozer.as_ref().ok_or(LiveError::NotInitialized)?.dozer;
×
207

×
208
        // kill if a handle already exists
×
209
        self.stop().await?;
×
210

×
211
        let (shutdown_sender, shutdown_receiver) = shutdown::new(&dozer.runtime);
×
212
        let metrics_shutdown = shutdown_receiver.clone();
×
213
        let _handle = run(dozer.clone(), request, shutdown_receiver)?;
×
214

×
215
        // Initialize progress
×
216
        let metrics_sender = self.sender.read().await.as_ref().unwrap().clone();
×
217
        tokio::spawn(async {
×
218
            progress_stream(metrics_sender, metrics_shutdown)
×
219
                .await
×
220
                .unwrap()
×
221
        });
×
222

×
223
        let mut lock = self.run_thread.write().await;
×
224
        *lock = Some(shutdown_sender);
×
225

×
226
        Ok(())
×
227
    }
×
228

×
229
    pub async fn stop(&self) -> Result<(), LiveError> {
×
230
        let mut lock = self.run_thread.write().await;
×
231
        if let Some(shutdown) = lock.take() {
×
232
            shutdown.shutdown()
×
233
        }
×
234
        *lock = None;
×
235
        Ok(())
×
236
    }
×
237
}
238

×
239
fn get_schemas(
×
240
    dozer_and_schema: &Option<DozerAndSchemas>,
×
241
) -> Result<&DagSchemas<SchemaSQLContext>, LiveError> {
×
242
    dozer_and_schema
×
243
        .as_ref()
×
244
        .ok_or(LiveError::NotInitialized)?
×
245
        .schemas
×
246
        .as_ref()
×
247
        .ok_or(LiveError::NotInitialized)
×
248
}
×
249

×
250
fn get_source_schemas(
×
251
    dag_schemas: &DagSchemas<SchemaSQLContext>,
×
252
    connection_name: String,
×
253
) -> Result<SchemasResponse, LiveError> {
×
254
    let graph = dag_schemas.graph();
×
255
    for (node_index, node) in graph.node_references() {
×
256
        if node.handle.id == connection_name {
×
257
            let NodeKind::Source(source) = &node.kind else {
×
258
                continue;
×
259
            };
×
260

×
261
            let mut schemas = HashMap::new();
×
262
            for edge in graph.edges_directed(node_index, Direction::Outgoing) {
×
263
                let edge = edge.weight();
×
264
                schemas.insert(
×
265
                    source.get_output_port_name(&edge.output_port),
×
266
                    map_schema(edge.schema.clone()),
×
267
                );
×
268
            }
×
269
            return Ok(SchemasResponse { schemas });
×
270
        }
×
271
    }
×
272

×
273
    Err(LiveError::ConnectionNotFound(connection_name))
×
274
}
×
275

276
async fn create_dag(
×
277
    dozer: &SimpleOrchestrator,
×
278
) -> Result<Dag<SchemaSQLContext>, OrchestrationError> {
×
279
    // Calculate schemas.
×
280
    let endpoint_and_logs = dozer
×
281
        .config
×
282
        .endpoints
×
283
        .iter()
×
284
        // We're not really going to run the pipeline, so we don't create logs.
×
285
        .map(|endpoint| (endpoint.clone(), None))
×
286
        .collect();
×
287
    let builder = PipelineBuilder::new(
×
288
        &dozer.config.connections,
×
289
        &dozer.config.sources,
×
290
        dozer.config.sql.as_deref(),
×
291
        endpoint_and_logs,
×
292
        MultiProgress::new(),
×
293
    );
×
294
    builder.build(&dozer.runtime).await
×
295
}
×
296

×
297
fn get_endpoint_schemas(dag_schemas: &DagSchemas<SchemaSQLContext>) -> SchemasResponse {
×
298
    let schemas = dag_schemas.get_sink_schemas();
×
299

×
300
    let schemas = schemas
×
301
        .into_iter()
×
302
        .map(|(name, tuple)| {
×
303
            let (schema, _) = tuple;
×
304
            (name, map_schema(schema))
×
305
        })
×
306
        .collect();
×
307
    SchemasResponse { schemas }
×
308
}
×
309

×
310
fn generate_dot(dag: &DagSchemas<SchemaSQLContext>) -> DotResponse {
×
311
    let dot_str = dot::Dot::new(transform_dag_ui(dag.graph()).graph()).to_string();
×
312
    DotResponse { dot: dot_str }
×
313
}
×
314

×
315
fn run(
×
316
    dozer: SimpleOrchestrator,
×
317
    request: RunRequest,
×
318
    shutdown_receiver: ShutdownReceiver,
×
319
) -> Result<JoinHandle<()>, OrchestrationError> {
×
320
    let mut dozer = get_dozer_run_instance(dozer, request)?;
×
321

×
322
    validate_config(&dozer.config)?;
×
323

×
324
    let runtime = dozer.runtime.clone();
×
325
    let run_thread = std::thread::spawn(move || {
×
326
        dozer.build(true).unwrap();
×
327
        dozer.run_all(shutdown_receiver)
×
328
    });
×
329

×
330
    let handle = std::thread::spawn(move || {
×
331
        runtime.block_on(async {
×
332
            run_thread.join().unwrap().unwrap();
×
333
        });
×
334
    });
×
335

×
336
    Ok(handle)
×
337
}
×
338

339
fn get_dozer_run_instance(
340
    mut dozer: SimpleOrchestrator,
×
341
    req: RunRequest,
×
342
) -> Result<SimpleOrchestrator, LiveError> {
×
343
    match req.request {
×
344
        Some(dozer_types::grpc_types::live::run_request::Request::Sql(req)) => {
×
345
            let context = statement_to_pipeline(&req.sql, &mut AppPipeline::new(), None)
×
346
                .map_err(LiveError::PipelineError)?;
×
347

×
348
            //overwrite sql
×
349
            dozer.config.sql = Some(req.sql);
×
350

×
351
            dozer.config.endpoints = vec![];
×
352
            let endpoints = context.output_tables_map.keys().collect::<Vec<_>>();
×
353
            for endpoint in endpoints {
×
354
                let endpoint = ApiEndpoint {
×
355
                    name: endpoint.to_string(),
×
356
                    table_name: endpoint.to_string(),
×
357
                    path: format!("/{}", endpoint),
×
358
                    ..Default::default()
×
359
                };
×
360
                dozer.config.endpoints.push(endpoint);
×
361
            }
×
362
        }
×
363
        Some(dozer_types::grpc_types::live::run_request::Request::Source(req)) => {
×
364
            dozer.config.sql = None;
×
365
            dozer.config.endpoints = vec![];
×
366
            let endpoint = req.source;
×
367
            dozer.config.endpoints.push(ApiEndpoint {
×
368
                name: endpoint.to_string(),
×
369
                table_name: endpoint.to_string(),
×
370
                path: format!("/{}", endpoint),
×
371
                ..Default::default()
×
372
            });
×
373
        }
×
374
        None => {}
×
375
    };
376

377
    dozer.config.api = Some(ApiConfig {
×
378
        app_grpc: Some(AppGrpcOptions {
×
379
            port: 5678,
×
380
            host: "0.0.0.0".to_string(),
×
381
        }),
×
382

×
383
        ..Default::default()
×
384
    });
×
385

×
386
    dozer.config.home_dir = tempdir::TempDir::new("live")
×
387
        .unwrap()
×
388
        .into_path()
×
389
        .to_string_lossy()
×
390
        .to_string();
×
391

×
392
    dozer.config.telemetry = Some(TelemetryConfig {
×
393
        trace: None,
×
394
        metrics: Some(TelemetryMetricsConfig::Prometheus(())),
×
395
    });
×
396

×
397
    Ok(dozer)
×
398
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc